編譯PHP時需要加入 --enable-maintainer-zts 選項才能安裝pthreads
# pecl install pthread
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF extension=pthreads.so EOF
$ php -m |grep pthreads pthreads
pthreads 3.0.0 之後不再支持5.6.x,PHP 版本大於等於 7.0.0
# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16 No valid packages found install failed
# pecl install pthreads-2.0.10
<?php class test extends Thread { public $name = ''; public $runing = false; public function __construct($name) { $this->name = $name; $this->runing = true; } public function run() { $n = 0; while ($this->runing) { printf("name: %s %s\n",$this->name, $n); $n++; sleep(1); } } } $pool[] = new test('a'); $pool[] = new test('b'); $pool[] = new test('c'); foreach ($pool as $w) { $w->start(); }
$pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while ( true ){ if(count($pool) < 2000){ //定義綫程池數量,小於綫程池數量則開啟新的綫程直到小於2000為止 $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ //如果綫程已經運行結束,銷毀綫程,給新的任務使用 if(! $worker->isRunning()){ unset($pool[$name]); } } } } }
<?php class ExampleWork extends Stackable { public function __construct($data) { $this->local = $data; } public function run() { // print_r($this->local);echo "\r\n"; echo '------------------- '. $this->local . " -----------------\r\n"; sleep(1); } } class ExampleWorker extends Worker { public function __construct($name) { $this->name = $name; $this->data = array(); } public function run(){ $this->name = sprintf("(%lu)", $this->getThreadId()); } } /* Dead simple pthreads pool */ class Pool { /* to hold worker threads */ public $workers; /* to hold exit statuses */ public $status; /* prepare $size workers */ public function __construct($size = 10) { $this->size = $size; } /* submit Stackable to Worker */ public function submit(Stackable $stackable) { if (count($this->workers)<$this->size) { $id = count($this->workers); $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id)); $this->workers[$id]->start(PTHREADS_INHERIT_NONE); if ($this->workers[$id]->stack($stackable)) { return $stackable; } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING); }else{ for ($i=0;$i<count($this->workers);$i++){ if( ! $this->workers[$i]->isWorking()){ $this->workers[$i]->stack($stackable); return $stackable; } } } return false; } public function status(){ for ($i=0;$i<count($this->workers);$i++){ printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking()); } printf("\r\n"); } /* Shutdown the pool of threads cleanly, retaining exit status locally */ public function shutdown() { foreach($this->workers as $worker) { $this->status[$worker->getThreadId()]=$worker->shutdown(); } } } /* Create a pool of ten threads */ $pool = new Pool(100); /* Create and submit an array of Stackables */ $work = array(); for ($target = 0; $target < 1000; $target++){ $work[$target]=$pool->submit(new ExampleWork($target)); if($work[$target] == false){ $target--; sleep(1); continue; } for ($i=0;$i<count($work);$i++){ if($work[$i]->isRunning()){ printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning()); } } printf("\r\n"); } $pool->shutdown(); exit();
pthreads 自帶 Pool
<?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); sleep(1); printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number); $this->status = "OK"; } } class Logging extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } } $pool = new Pool(5, \ExampleWorker::class, [new Logging()]); foreach (range(0, 100) as $number) { $pool->submit(new Work($number)); } $pool->shutdown(); var_dump($pool); ?>
例 5.7. Threads - Pool
# cat pool.php <?php class MyWork extends Stackable { public $name; public function __construct($name) { echo "Stackable executed $name\n"; $this->name = $name; } public function run() { echo "Stackable $this->name start running\n"; for ($i = 1; $i <= 5; $i++) { echo "Run $this->name : $i\n"; sleep(1); } } } class MyWorker extends Worker { public function __construct($name) { $this->name = $name; } public function run() { echo "Worker started $this->name\n"; } } $pool = new Pool(3, \MyWorker::class, array("pthreads")); $pool->submit(new MyWork("A")); $pool->submit(new MyWork("B")); $pool->submit(new MyWork("C")); $pool->shutdown();
PHP Fatal error: Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38 Stack trace: #0 /home/www/threads.php(38): PDO->__sleep() #1 [internal function]: SQLWorker->run() #2 {main} thrown in /home/www/threads.php on line 38 not ready
<?php class MyWorker extends Worker{ public static $pdo; function __construct($conf){ $this->conf = $conf; } function run(){ self::$pdo = new PDO( 'mysql:host=localhost;dbname=test'); } function get_connection(){ return self::$pdo; } } ?>
<?php $counter = 0; //$handle=fopen("php://memory", "rw"); //$handle=fopen("php://temp", "rw"); $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //沒有互斥鎖 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥鎖 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
重新編譯加入 --enable-maintainer-zts
[root@localhost src]# pecl search pthreads Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.0.7 (stable) Threading API
[root@localhost src]# pecl install pthreads pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13 No valid packages found install failed
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz [root@localhost src]# tar zxvf pthreads-3.0.6.tgz [root@localhost src]# cd pthreads-3.0.6 [root@localhost pthreads-3.0.6]# phpize [root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config [root@localhost pthreads-3.0.6]# make && make install
故障出現在PHP 7.x,pecl 已經升級至 3.1.6
Stackable 是 Threaded 的一個別名,這個類使用直到 pthreads v.2.0.0,之後便取消Stackable。
$ pecl search pthread Retrieving data...0% Matched packages, channel pecl.php.net: ======================================= Package Stable/(Latest) Local pthreads 3.1.6 (stable) Threading API
$stackable = new Stackable; var_dump(get_class($stackable)); Outputs: string(8) "Threaded"
/** * Stackable is an alias of Threaded. This class name was used in pthreads until * version 2.0.0 * @link http://www.php.net/manual/en/class.threaded.php */ class Stackable extends Threaded implements Traversable, Countable, ArrayAccess { }
解決方案,將 Stackable 改為 Threaded