Home | 簡體中文 | 繁體中文 | 雜文 | 打賞(Donations) | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | 知乎專欄 | Search | Email

5.14. pthreads

編譯PHP時需要加入 --enable-maintainer-zts 選項才能安裝pthreads

# pecl install pthread
	

配置檔案

	
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF
extension=pthreads.so
EOF
	
	
$ php -m |grep pthreads
pthreads
	

5.14.1. pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16

pthreads 3.0.0 之後不再支持5.6.x,PHP 版本大於等於 7.0.0

			
# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16
No valid packages found
install failed
			
			

解決方法是安裝較低版本的pthreads-2.0.10

# pecl install pthreads-2.0.10
			

5.14.2. Thread

		
<?php

class test extends Thread {

    public $name   = '';
    public $runing = false;

    public function __construct($name) {

        $this->name   = $name;
        $this->runing = true;
    }

    public function run() {
	$n = 0;
        while ($this->runing) {
		printf("name: %s %s\n",$this->name, $n);
		$n++;
                sleep(1);
        }
    }

}

$pool[] = new test('a');
$pool[] = new test('b');
$pool[] = new test('c');

foreach ($pool as $w) {
    $w->start();
}
		
		

綫程池實現方法

		
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while ( true ){
			if(count($pool) < 2000){ //定義綫程池數量,小於綫程池數量則開啟新的綫程直到小於2000為止
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					//如果綫程已經運行結束,銷毀綫程,給新的任務使用
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}

		}

	}
		
		

5.14.3. Pool

		
<?php
class ExampleWork extends Stackable {
        public function __construct($data) {
                $this->local = $data;
        }
        public function run() {
	//	print_r($this->local);echo "\r\n";
	echo '------------------- '. $this->local . " -----------------\r\n";
	sleep(1);
        }
}
class ExampleWorker extends Worker {

        public function __construct($name) {
                $this->name = $name;
                $this->data = array();
        }
        public function run(){
                $this->name = sprintf("(%lu)", $this->getThreadId());
        }
}
/* Dead simple pthreads pool */
class Pool {
        /* to hold worker threads */
        public $workers;
        /* to hold exit statuses */
        public $status;
        /* prepare $size workers */
        public function __construct($size = 10) {
                $this->size = $size;
        }
        /* submit Stackable to Worker */
        public function submit(Stackable $stackable) {
            if (count($this->workers)<$this->size) {
                    $id = count($this->workers);
                    $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id));
                    $this->workers[$id]->start(PTHREADS_INHERIT_NONE);

                    if ($this->workers[$id]->stack($stackable)) {
                           return $stackable;
                    } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING);
            }else{
				for ($i=0;$i<count($this->workers);$i++){
                	if( ! $this->workers[$i]->isWorking()){
						$this->workers[$i]->stack($stackable);
						return $stackable;
				}
        	}
		}

                return false;
        }
	public function status(){
		for ($i=0;$i<count($this->workers);$i++){
		printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking());
		}
		printf("\r\n");

	}
        /* Shutdown the pool of threads cleanly, retaining exit status locally */
        public function shutdown() {
                foreach($this->workers as $worker) {
                        $this->status[$worker->getThreadId()]=$worker->shutdown();
                }
        }
}
/* Create a pool of ten threads */
$pool = new Pool(100);
/* Create and submit an array of Stackables */
$work = array();
for ($target = 0; $target < 1000; $target++){

    $work[$target]=$pool->submit(new ExampleWork($target));

	if($work[$target] == false){
		$target--;
		sleep(1);
		continue;
	}
	for ($i=0;$i<count($work);$i++){
		if($work[$i]->isRunning()){
			printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning());
		}
	}
	printf("\r\n");
}
$pool->shutdown();
exit();
		
		

pthreads 自帶 Pool

		
<?php
class ExampleWorker extends Worker {
 
	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}
 
	protected $logger;	
}
 
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		sleep(1);
		printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number);
		$this->status = "OK";
	}
}
 
class Logging extends Stackable {
 
	protected function log($message, $args = []) {
		$args = func_get_args();	
 
		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}
}
 
$pool = new Pool(5, \ExampleWorker::class, [new Logging()]);
 
foreach (range(0, 100) as $number) {
	$pool->submit(new Work($number));
}
 
 
$pool->shutdown();
 
var_dump($pool);
?>
		
		

例 5.7. Threads - Pool

			
# cat pool.php 
<?php

class MyWork extends Stackable {

    public $name;

    public function __construct($name) {
        echo "Stackable executed  $name\n";
        $this->name = $name;
    }

    public function run() {
        echo "Stackable $this->name start running\n";
        for ($i = 1; $i <= 5; $i++) {
            echo "Run $this->name : $i\n";
            sleep(1);
        }
    }

}

class MyWorker extends Worker {
    public function __construct($name) {
	$this->name = $name;
    }
    public function run() {
        echo "Worker started $this->name\n";
    }
}

$pool = new Pool(3, \MyWorker::class, array("pthreads"));
$pool->submit(new MyWork("A"));
$pool->submit(new MyWork("B"));
$pool->submit(new MyWork("C"));
$pool->shutdown();
			
			

5.14.4. FAQ

5.14.4.1. You cannot serialize or unserialize PDO instances

			
PHP Fatal error:  Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38
Stack trace:
#0 /home/www/threads.php(38): PDO->__sleep()
#1 [internal function]: SQLWorker->run()
#2 {main}
  thrown in /home/www/threads.php on line 38
 not ready
 			
			
			
<?php
class MyWorker extends Worker{
    public static $pdo;

    function __construct($conf){
        $this->conf = $conf;
    }

    function run(){
        self::$pdo = new PDO(
            'mysql:host=localhost;dbname=test');
    }

    function get_connection(){
        return self::$pdo;
    }
}
?>
			
			

5.14.5. 互斥鎖

什麼情況下會用到互斥鎖?在你需要控制多個綫程同一時刻只能有一個綫程工作的情況下可以使用。

下面我們舉一個例子,一個簡單的計數器程序,說明有無互斥鎖情況下的不同。

		
<?php
$counter = 0;
//$handle=fopen("php://memory", "rw"); 
//$handle=fopen("php://temp", "rw"); 
$handle=fopen("/tmp/counter.txt", "w"); 
fwrite($handle, $counter ); 
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter ); 
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
		
		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//沒有互斥鎖
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥鎖
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>
		
		

我們使用檔案/tmp/counter.txt保存計數器值,每次打開該檔案將數值加一,然後寫回檔案。當多個綫程同時操作一個檔案的時候,就會綫程運行先後取到的數值不同,寫回的數值也不同,最終計數器的數值會混亂。

沒有加入鎖的結果是計數始終被覆蓋,最終結果是2

而加入互斥鎖後,只有其中的一個進程完成加一工作並釋放鎖,其他綫程才能得到解鎖信號,最終順利完成計數器累加操作

上面例子也可以通過對檔案加鎖實現,這裡主要講的是多綫程鎖,後面會涉及檔案鎖。

5.14.6. FAQ

5.14.6.1. configure: error: pthreads requires ZTS, please re-compile PHP with ZTS enabled

重新編譯加入 --enable-maintainer-zts

5.14.6.2. pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13

[root@localhost src]# pecl search pthreads
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.0.7 (stable)        Threading API
			
			
[root@localhost src]# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
No valid packages found
install failed
			

解決方法,手工編譯舊的安裝包

			
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz
[root@localhost src]# tar zxvf pthreads-3.0.6.tgz 		
[root@localhost src]# cd pthreads-3.0.6
[root@localhost pthreads-3.0.6]# phpize
[root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config
[root@localhost pthreads-3.0.6]# make && make install
			
			

5.14.6.3. Class 'Stackable' not found in /path/to/file.php

故障出現在PHP 7.x,pecl 已經升級至 3.1.6

Stackable 是 Threaded 的一個別名,這個類使用直到 pthreads v.2.0.0,之後便取消Stackable。

$ pecl search pthread
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.1.6 (stable)        Threading API			
			

測試

$stackable = new Stackable;
var_dump(get_class($stackable));

Outputs: string(8) "Threaded"			
			

源碼

/**
 * Stackable is an alias of Threaded. This class name was used in pthreads until
 * version 2.0.0
 * @link http://www.php.net/manual/en/class.threaded.php
 */
class Stackable extends Threaded implements Traversable, Countable, ArrayAccess {

}
			

解決方案,將 Stackable 改為 Threaded