Home | 簡體中文 | 繁體中文 | 雜文 | Search | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | Email

3.15. pthreads

3.15.1. Thread

		
<?php

class test extends Thread {

    public $name   = '';
    public $runing = false;

    public function __construct($name) {

        $this->name   = $name;
        $this->runing = true;
    }

    public function run() {
	$n = 0;
        while ($this->runing) {
		printf("name: %s %s\n",$this->name, $n);
		$n++;
                sleep(1);
        }
    }

}

$pool[] = new test('a');
$pool[] = new test('b');
$pool[] = new test('c');

foreach ($pool as $w) {
    $w->start();
}
		
		

綫程池實現方法

		
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while ( true ){
			if(count($pool) < 2000){ //定義綫程池數量,小於綫程池數量則開啟新的綫程直到小於2000為止
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					//如果綫程已經運行結束,銷毀綫程,給新的任務使用
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}

		}

	}
		
		

3.15.2. Pool

		
<?php
class ExampleWork extends Stackable {
        public function __construct($data) {
                $this->local = $data;
        }
        public function run() {
	//	print_r($this->local);echo "\r\n";
	echo '------------------- '. $this->local . " -----------------\r\n";
	sleep(1);
        }
}
class ExampleWorker extends Worker {

        public function __construct($name) {
                $this->name = $name;
                $this->data = array();
        }
        public function run(){
                $this->name = sprintf("(%lu)", $this->getThreadId());
        }
}
/* Dead simple pthreads pool */
class Pool {
        /* to hold worker threads */
        public $workers;
        /* to hold exit statuses */
        public $status;
        /* prepare $size workers */
        public function __construct($size = 10) {
                $this->size = $size;
        }
        /* submit Stackable to Worker */
        public function submit(Stackable $stackable) {
            if (count($this->workers)<$this->size) {
                    $id = count($this->workers);
                    $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id));
                    $this->workers[$id]->start(PTHREADS_INHERIT_NONE);

                    if ($this->workers[$id]->stack($stackable)) {
                           return $stackable;
                    } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING);
            }else{
				for ($i=0;$i<count($this->workers);$i++){
                	if( ! $this->workers[$i]->isWorking()){
						$this->workers[$i]->stack($stackable);
						return $stackable;
				}
        	}
		}

                return false;
        }
	public function status(){
		for ($i=0;$i<count($this->workers);$i++){
		printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking());
		}
		printf("\r\n");

	}
        /* Shutdown the pool of threads cleanly, retaining exit status locally */
        public function shutdown() {
                foreach($this->workers as $worker) {
                        $this->status[$worker->getThreadId()]=$worker->shutdown();
                }
        }
}
/* Create a pool of ten threads */
$pool = new Pool(100);
/* Create and submit an array of Stackables */
$work = array();
for ($target = 0; $target < 1000; $target++){

    $work[$target]=$pool->submit(new ExampleWork($target));

	if($work[$target] == false){
		$target--;
		sleep(1);
		continue;
	}
	for ($i=0;$i<count($work);$i++){
		if($work[$i]->isRunning()){
			printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning());
		}
	}
	printf("\r\n");
}
$pool->shutdown();
exit();
		
		

3.15.3. FAQ

3.15.3.1. You cannot serialize or unserialize PDO instances

			
PHP Fatal error:  Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38
Stack trace:
#0 /home/www/threads.php(38): PDO->__sleep()
#1 [internal function]: SQLWorker->run()
#2 {main}
  thrown in /home/www/threads.php on line 38
 not ready
 			
			
			
<?php
class MyWorker extends Worker{
    public static $pdo;

    function __construct($conf){
        $this->conf = $conf;
    }

    function run(){
        self::$pdo = new PDO(
            'mysql:host=localhost;dbname=test');
    }

    function get_connection(){
        return self::$pdo;
    }
}
?>
			
			
comments powered by Disqus