編譯PHP時需要加入 --enable-maintainer-zts 選項才能安裝pthreads
# pecl install pthread
配置檔案
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF extension=pthreads.so EOF
$ php -m |grep pthreads pthreads
pthreads 3.0.0 之後不再支持5.6.x,PHP 版本大於等於 7.0.0
# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16 No valid packages found install failed
解決方法是安裝較低版本的pthreads-2.0.10
# pecl install pthreads-2.0.10
<?php class test extends Thread { public $name = ''; public $runing = false; public function __construct($name) { $this->name = $name; $this->runing = true; } public function run() { $n = 0; while ($this->runing) { printf("name: %s %s\n",$this->name, $n); $n++; sleep(1); } } } $pool[] = new test('a'); $pool[] = new test('b'); $pool[] = new test('c'); foreach ($pool as $w) { $w->start(); }
綫程池實現方法
$pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while ( true ){ if(count($pool) < 2000){ //定義綫程池數量,小於綫程池數量則開啟新的綫程直到小於2000為止 $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ //如果綫程已經運行結束,銷毀綫程,給新的任務使用 if(! $worker->isRunning()){ unset($pool[$name]); } } } } }
<?php class ExampleWork extends Stackable { public function __construct($data) { $this->local = $data; } public function run() { // print_r($this->local);echo "\r\n"; echo '------------------- '. $this->local . " -----------------\r\n"; sleep(1); } } class ExampleWorker extends Worker { public function __construct($name) { $this->name = $name; $this->data = array(); } public function run(){ $this->name = sprintf("(%lu)", $this->getThreadId()); } } /* Dead simple pthreads pool */ class Pool { /* to hold worker threads */ public $workers; /* to hold exit statuses */ public $status; /* prepare $size workers */ public function __construct($size = 10) { $this->size = $size; } /* submit Stackable to Worker */ public function submit(Stackable $stackable) { if (count($this->workers)<$this->size) { $id = count($this->workers); $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id)); $this->workers[$id]->start(PTHREADS_INHERIT_NONE); if ($this->workers[$id]->stack($stackable)) { return $stackable; } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING); }else{ for ($i=0;$i<count($this->workers);$i++){ if( ! $this->workers[$i]->isWorking()){ $this->workers[$i]->stack($stackable); return $stackable; } } } return false; } public function status(){ for ($i=0;$i<count($this->workers);$i++){ printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking()); } printf("\r\n"); } /* Shutdown the pool of threads cleanly, retaining exit status locally */ public function shutdown() { foreach($this->workers as $worker) { $this->status[$worker->getThreadId()]=$worker->shutdown(); } } } /* Create a pool of ten threads */ $pool = new Pool(100); /* Create and submit an array of Stackables */ $work = array(); for ($target = 0; $target < 1000; $target++){ $work[$target]=$pool->submit(new ExampleWork($target)); if($work[$target] == false){ $target--; sleep(1); continue; } for ($i=0;$i<count($work);$i++){ if($work[$i]->isRunning()){ printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning()); } } printf("\r\n"); } $pool->shutdown(); exit();
pthreads 自帶 Pool
<?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); sleep(1); printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number); $this->status = "OK"; } } class Logging extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } } $pool = new Pool(5, \ExampleWorker::class, [new Logging()]); foreach (range(0, 100) as $number) { $pool->submit(new Work($number)); } $pool->shutdown(); var_dump($pool); ?>
例 5.7. Threads - Pool
# cat pool.php <?php class MyWork extends Stackable { public $name; public function __construct($name) { echo "Stackable executed $name\n"; $this->name = $name; } public function run() { echo "Stackable $this->name start running\n"; for ($i = 1; $i <= 5; $i++) { echo "Run $this->name : $i\n"; sleep(1); } } } class MyWorker extends Worker { public function __construct($name) { $this->name = $name; } public function run() { echo "Worker started $this->name\n"; } } $pool = new Pool(3, \MyWorker::class, array("pthreads")); $pool->submit(new MyWork("A")); $pool->submit(new MyWork("B")); $pool->submit(new MyWork("C")); $pool->shutdown();
PHP Fatal error: Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38 Stack trace: #0 /home/www/threads.php(38): PDO->__sleep() #1 [internal function]: SQLWorker->run() #2 {main} thrown in /home/www/threads.php on line 38 not ready
<?php class MyWorker extends Worker{ public static $pdo; function __construct($conf){ $this->conf = $conf; } function run(){ self::$pdo = new PDO( 'mysql:host=localhost;dbname=test'); } function get_connection(){ return self::$pdo; } } ?>
什麼情況下會用到互斥鎖?在你需要控制多個綫程同一時刻只能有一個綫程工作的情況下可以使用。
下面我們舉一個例子,一個簡單的計數器程序,說明有無互斥鎖情況下的不同。
<?php $counter = 0; //$handle=fopen("php://memory", "rw"); //$handle=fopen("php://temp", "rw"); $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //沒有互斥鎖 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥鎖 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
我們使用檔案/tmp/counter.txt保存計數器值,每次打開該檔案將數值加一,然後寫回檔案。當多個綫程同時操作一個檔案的時候,就會綫程運行先後取到的數值不同,寫回的數值也不同,最終計數器的數值會混亂。
沒有加入鎖的結果是計數始終被覆蓋,最終結果是2
而加入互斥鎖後,只有其中的一個進程完成加一工作並釋放鎖,其他綫程才能得到解鎖信號,最終順利完成計數器累加操作
上面例子也可以通過對檔案加鎖實現,這裡主要講的是多綫程鎖,後面會涉及檔案鎖。
重新編譯加入 --enable-maintainer-zts
[root@localhost src]# pecl search pthreads Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.0.7 (stable) Threading API
[root@localhost src]# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13 No valid packages found install failed
解決方法,手工編譯舊的安裝包
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz [root@localhost src]# tar zxvf pthreads-3.0.6.tgz [root@localhost src]# cd pthreads-3.0.6 [root@localhost pthreads-3.0.6]# phpize [root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config [root@localhost pthreads-3.0.6]# make && make install
故障出現在PHP 7.x,pecl 已經升級至 3.1.6
Stackable 是 Threaded 的一個別名,這個類使用直到 pthreads v.2.0.0,之後便取消Stackable。
$ pecl search pthread Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.1.6 (stable) Threading API
測試
$stackable = new Stackable; var_dump(get_class($stackable)); Outputs: string(8) "Threaded"
源碼
/** * Stackable is an alias of Threaded. This class name was used in pthreads until * version 2.0.0 * @link http://www.php.net/manual/en/class.threaded.php */ class Stackable extends Threaded implements Traversable, Countable, ArrayAccess { }
解決方案,將 Stackable 改為 Threaded