Home | 簡體中文 | 繁體中文 | 雜文 | 打賞(Donations) | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | 知乎專欄 | Search | Email

5.13. Process Control Extensions

5.13.1. pcntl

編譯的時候沒有帶--enable-pcntl參數的補救辦法

		
cd php-5.2.17/ext/pcntl

/srv/php-5.2.17/bin/phpsize
./configure --with-php-config=/srv/php-5.2.17/bin/php-config
make && make install
		
		

加入到php.ini

		
cat > /srv/php-5.2.17/etc/conf.d/pcntl.ini <<EOF
extension=pcntl.so
EOF
		
		

php -m | grep pcntl
		
SIGHUP     終止進程     終端線路掛斷
SIGINT     終止進程     中斷進程
SIGQUIT    建立CORE檔案終止進程,並且生成core檔案
SIGILL   建立CORE檔案       非法指令
SIGTRAP    建立CORE檔案       跟蹤自陷
SIGBUS   建立CORE檔案       匯流排錯誤
SIGSEGV   建立CORE檔案        段非法錯誤
SIGFPE   建立CORE檔案       浮點異常
SIGIOT   建立CORE檔案        執行I/O自陷
SIGKILL   終止進程     殺死進程
SIGPIPE   終止進程      向一個沒有讀進程的管道寫數據
SIGALARM   終止進程     計時器到時
SIGTERM   終止進程      軟件終止信號
SIGSTOP   停止進程     非終端來的停止信號
SIGTSTP   停止進程      終端來的停止信號
SIGCONT   忽略信號     繼續執行一個停止的進程
SIGURG   忽略信號      I/O緊急信號
SIGIO     忽略信號     描述符上可以進行I/O
SIGCHLD   忽略信號      當子進程停止或退出時通知父進程
SIGTTOU   停止進程     後台進程寫終端
SIGTTIN   停止進程      後台進程讀終端
SIGXGPU   終止進程     CPU時限超時
SIGXFSZ   終止進程     檔案長度過長
SIGWINCH    忽略信號     窗口大小發生變化
SIGPROF   終止進程     統計分佈圖用計時器到時
SIGUSR1   終止進程      用戶定義信號1
SIGUSR2   終止進程     用戶定義信號2
SIGVTALRM 終止進程     虛擬計時器到時
 
1)  SIGHUP 本信號在用戶終端連接(正常或非正常)結束時發出, 通常是在終端的控 
制進程結束時, 通知同一session內的各個作業,  這時它們與控制終端 
不再關聯. 
2) SIGINT 程序終止(interrupt)信號, 在用戶鍵入INTR字元(通常是Ctrl-C)時發出  
3) SIGQUIT 和SIGINT類似, 但由QUIT字元(通常是Ctrl-)來控制. 進程在因收到 
SIGQUIT退出時會產生core檔案,  在這個意義上類似於一個程序錯誤信 
號. 
4) SIGILL 執行了非法指令. 通常是因為執行檔本身出現錯誤, 或者試圖執行 
數據段.  堆棧溢出時也有可能產生這個信號. 
5) SIGTRAP 由斷點指令或其它trap指令產生. 由debugger使用. 
6) SIGABRT  程序自己發現錯誤並調用abort時產生. 
6) SIGIOT 在PDP-11上由iot指令產生, 在其它機器上和SIGABRT一樣. 
7)  SIGBUS 非法地址, 包括內存地址對齊(alignment)出錯. eg: 訪問一個四個字長 
的整數, 但其地址不是4的倍數. 
8)  SIGFPE 在發生致命的算術運算錯誤時發出. 不僅包括浮點運算錯誤, 還包括溢 
出及除數為0等其它所有的算術的錯誤. 
9) SIGKILL  用來立即結束程序的運行. 本信號不能被阻塞, 處理和忽略. 
10) SIGUSR1 留給用戶使用 
11) SIGSEGV  試圖訪問未分配給自己的內存, 或試圖往沒有寫權限的內存地址寫數據. 
12) SIGUSR2 留給用戶使用 
13) SIGPIPE Broken  pipe 
14) SIGALRM 時鐘定時信號, 計算的是實際的時間或時鐘時間. alarm函數使用該 
信號. 
15) SIGTERM  程序結束(terminate)信號, 與SIGKILL不同的是該信號可以被阻塞和 
處理. 通常用來要求程序自己正常退出.  shell命令kill預設產生這 
個信號. 
17) SIGCHLD 子進程結束時, 父進程會收到這個信號. 
18) SIGCONT  讓一個停止(stopped)的進程繼續執行. 本信號不能被阻塞. 可以用 
一個handler來讓程序在由stopped狀態變為繼續執行時完成特定的  
工作. 例如, 重新顯示提示符 
19) SIGSTOP 停止(stopped)進程的執行.  注意它和terminate以及interrupt的區別: 
該進程還未結束, 只是暫停執行. 本信號不能被阻塞, 處理或忽略. 
20)  SIGTSTP 停止進程的運行, 但該信號可以被處理和忽略. 用戶鍵入SUSP字元時 
(通常是Ctrl-Z)發出這個信號 
21) SIGTTIN  當後台作業要從用戶終端讀數據時, 該作業中的所有進程會收到SIGTTIN 
信號. 預設時這些進程會停止執行. 
22) SIGTTOU  類似於SIGTTIN, 但在寫終端(或修改終端模式)時收到. 
23) SIGURG 有"緊急"數據或out-of-band數據到達socket時產生.  
24) SIGXCPU 超過CPU時間資源限制. 這個限制可以由getrlimit/setrlimit來讀取/ 
改變 
25)  SIGXFSZ 超過檔案大小資源限制. 
26) SIGVTALRM 虛擬時鐘信號. 類似於SIGALRM, 但是計算的是該進程占用的CPU時間.  
27) SIGPROF 類似於SIGALRM/SIGVTALRM, 但包括該進程用的CPU時間以及系統調用的 
時間. 
28)  SIGWINCH 窗口大小改變時發出. 
29) SIGIO 檔案描述符準備就緒, 可以開始進行輸入/輸出操作. 
30) SIGPWR Power  failure 
 
有 兩個信號可以停止進程:SIGTERM和SIGKILL。  SIGTERM比較友好,進程能捕捉這個信號,根據您的需要來關閉程序。在關閉程序之前,您 可以結束打開的記錄檔案和完成正在做的任務。在某些情況下,假  如進程正在進行作業而且不能中斷,那麼進程可以忽略這個SIGTERM信號。
 
對於SIGKILL信號,進程是不能忽略的。這是一個  “我不管您在做什麼,立刻停止”的信號。假如您發送SIGKILL信號給進程,Linux就將進程停止在那裡。		
		

5.13.1.1. 

			
<?php
declare(ticks = 1);

pcntl_signal(SIGHUP, function ($signal) {
	echo 'HANDLE SIGNAL ' . $signal . PHP_EOL;
});
pcntl_signal(SIGTERM, function ($signal) {
	echo 'HANDLE SIGNAL ' . $signal . PHP_EOL;
	exit(1);
});

posix_kill(posix_getpid(), SIGHUP);			
			
			

在class中使用pcntl_signal

			
<?php
declare(ticks = 1);

class SignalHandler
{
   function __construct() {
       $this->_init();
   }
   function _init() {
       pcntl_signal(SIGTERM, array(&$this,"handleSignals"));
   }
   function handleSignals($signal)
   {
       echo "$signal\n";
   }

}
$o = new SignalHandler();
posix_kill(getmypid(),SIGTERM);
// prints 15
?>			
			
			

5.13.2. pthreads

編譯PHP時需要加入 --enable-maintainer-zts 選項才能安裝pthreads

# pecl install pthread
	

配置檔案

	
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF
extension=pthreads.so
EOF
	
	
$ php -m |grep pthreads
pthreads
	

5.13.2.1. pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16

pthreads 3.0.0 之後不再支持5.6.x,PHP 版本大於等於 7.0.0

			
# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16
No valid packages found
install failed
			
			

解決方法是安裝較低版本的pthreads-2.0.10

# pecl install pthreads-2.0.10
			

5.13.2.2. Thread

		
<?php

class test extends Thread {

    public $name   = '';
    public $runing = false;

    public function __construct($name) {

        $this->name   = $name;
        $this->runing = true;
    }

    public function run() {
	$n = 0;
        while ($this->runing) {
		printf("name: %s %s\n",$this->name, $n);
		$n++;
                sleep(1);
        }
    }

}

$pool[] = new test('a');
$pool[] = new test('b');
$pool[] = new test('c');

foreach ($pool as $w) {
    $w->start();
}
		
		

綫程池實現方法

		
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while ( true ){
			if(count($pool) < 2000){ //定義綫程池數量,小於綫程池數量則開啟新的綫程直到小於2000為止
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					//如果綫程已經運行結束,銷毀綫程,給新的任務使用
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}

		}

	}
		
		

5.13.2.3. Pool

		
<?php
class ExampleWork extends Stackable {
        public function __construct($data) {
                $this->local = $data;
        }
        public function run() {
	//	print_r($this->local);echo "\r\n";
	echo '------------------- '. $this->local . " -----------------\r\n";
	sleep(1);
        }
}
class ExampleWorker extends Worker {

        public function __construct($name) {
                $this->name = $name;
                $this->data = array();
        }
        public function run(){
                $this->name = sprintf("(%lu)", $this->getThreadId());
        }
}
/* Dead simple pthreads pool */
class Pool {
        /* to hold worker threads */
        public $workers;
        /* to hold exit statuses */
        public $status;
        /* prepare $size workers */
        public function __construct($size = 10) {
                $this->size = $size;
        }
        /* submit Stackable to Worker */
        public function submit(Stackable $stackable) {
            if (count($this->workers)<$this->size) {
                    $id = count($this->workers);
                    $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id));
                    $this->workers[$id]->start(PTHREADS_INHERIT_NONE);

                    if ($this->workers[$id]->stack($stackable)) {
                           return $stackable;
                    } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING);
            }else{
				for ($i=0;$i<count($this->workers);$i++){
                	if( ! $this->workers[$i]->isWorking()){
						$this->workers[$i]->stack($stackable);
						return $stackable;
				}
        	}
		}

                return false;
        }
	public function status(){
		for ($i=0;$i<count($this->workers);$i++){
		printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking());
		}
		printf("\r\n");

	}
        /* Shutdown the pool of threads cleanly, retaining exit status locally */
        public function shutdown() {
                foreach($this->workers as $worker) {
                        $this->status[$worker->getThreadId()]=$worker->shutdown();
                }
        }
}
/* Create a pool of ten threads */
$pool = new Pool(100);
/* Create and submit an array of Stackables */
$work = array();
for ($target = 0; $target < 1000; $target++){

    $work[$target]=$pool->submit(new ExampleWork($target));

	if($work[$target] == false){
		$target--;
		sleep(1);
		continue;
	}
	for ($i=0;$i<count($work);$i++){
		if($work[$i]->isRunning()){
			printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning());
		}
	}
	printf("\r\n");
}
$pool->shutdown();
exit();
		
		

pthreads 自帶 Pool

		
<?php
class ExampleWorker extends Worker {
 
	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}
 
	protected $logger;	
}
 
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		sleep(1);
		printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number);
		$this->status = "OK";
	}
}
 
class Logging extends Stackable {
 
	protected function log($message, $args = []) {
		$args = func_get_args();	
 
		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}
}
 
$pool = new Pool(5, \ExampleWorker::class, [new Logging()]);
 
foreach (range(0, 100) as $number) {
	$pool->submit(new Work($number));
}
 
 
$pool->shutdown();
 
var_dump($pool);
?>
		
		

例 5.7. Threads - Pool

			
# cat pool.php 
<?php

class MyWork extends Stackable {

    public $name;

    public function __construct($name) {
        echo "Stackable executed  $name\n";
        $this->name = $name;
    }

    public function run() {
        echo "Stackable $this->name start running\n";
        for ($i = 1; $i <= 5; $i++) {
            echo "Run $this->name : $i\n";
            sleep(1);
        }
    }

}

class MyWorker extends Worker {
    public function __construct($name) {
	$this->name = $name;
    }
    public function run() {
        echo "Worker started $this->name\n";
    }
}

$pool = new Pool(3, \MyWorker::class, array("pthreads"));
$pool->submit(new MyWork("A"));
$pool->submit(new MyWork("B"));
$pool->submit(new MyWork("C"));
$pool->shutdown();
			
			

5.13.2.4. FAQ

5.13.2.4.1. You cannot serialize or unserialize PDO instances
			
PHP Fatal error:  Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38
Stack trace:
#0 /home/www/threads.php(38): PDO->__sleep()
#1 [internal function]: SQLWorker->run()
#2 {main}
  thrown in /home/www/threads.php on line 38
 not ready
 			
			
			
<?php
class MyWorker extends Worker{
    public static $pdo;

    function __construct($conf){
        $this->conf = $conf;
    }

    function run(){
        self::$pdo = new PDO(
            'mysql:host=localhost;dbname=test');
    }

    function get_connection(){
        return self::$pdo;
    }
}
?>
			
			

5.13.2.5. 互斥鎖

什麼情況下會用到互斥鎖?在你需要控制多個綫程同一時刻只能有一個綫程工作的情況下可以使用。

下面我們舉一個例子,一個簡單的計數器程序,說明有無互斥鎖情況下的不同。

		
<?php
$counter = 0;
//$handle=fopen("php://memory", "rw"); 
//$handle=fopen("php://temp", "rw"); 
$handle=fopen("/tmp/counter.txt", "w"); 
fwrite($handle, $counter ); 
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter ); 
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
		
		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//沒有互斥鎖
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥鎖
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>
		
		

我們使用檔案/tmp/counter.txt保存計數器值,每次打開該檔案將數值加一,然後寫回檔案。當多個綫程同時操作一個檔案的時候,就會綫程運行先後取到的數值不同,寫回的數值也不同,最終計數器的數值會混亂。

沒有加入鎖的結果是計數始終被覆蓋,最終結果是2

而加入互斥鎖後,只有其中的一個進程完成加一工作並釋放鎖,其他綫程才能得到解鎖信號,最終順利完成計數器累加操作

上面例子也可以通過對檔案加鎖實現,這裡主要講的是多綫程鎖,後面會涉及檔案鎖。

5.13.2.6. FAQ

5.13.2.6.1. configure: error: pthreads requires ZTS, please re-compile PHP with ZTS enabled

重新編譯加入 --enable-maintainer-zts

5.13.2.6.2. pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
[root@localhost src]# pecl search pthreads
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.0.7 (stable)        Threading API
			
			
[root@localhost src]# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
No valid packages found
install failed
			

解決方法,手工編譯舊的安裝包

			
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz
[root@localhost src]# tar zxvf pthreads-3.0.6.tgz 		
[root@localhost src]# cd pthreads-3.0.6
[root@localhost pthreads-3.0.6]# phpize
[root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config
[root@localhost pthreads-3.0.6]# make && make install
			
			
5.13.2.6.3. Class 'Stackable' not found in /path/to/file.php

故障出現在PHP 7.x,pecl 已經升級至 3.1.6

Stackable 是 Threaded 的一個別名,這個類使用直到 pthreads v.2.0.0,之後便取消Stackable。

$ pecl search pthread
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.1.6 (stable)        Threading API			
			

源碼

/**
 * Stackable is an alias of Threaded. This class name was used in pthreads until
 * version 2.0.0
 * @link http://www.php.net/manual/en/class.threaded.php
 */
class Stackable extends Threaded implements Traversable, Countable, ArrayAccess {

}
			

解決方案,將 Stackable 改為 Threaded

5.13.3. libevent

5.13.3.1. event_base_loop

永遠循環

		
<?php
$timeouts = 10000000;
//callback function
function func($fd, $event,$arg) {
$time = time();
for($i=0;$i<2;$i++) {
    echo "Timer-$arg: $time : out-$i\n";
    sleep(3);
}
}
//create base and event 
$base = event_base_new();
for($i=0;$i<2;$i++) {
$event[$i] = event_new();
//set event flags
event_set($event[$i], $i , EV_PERSIST, 'func', "$i");
//set event base
event_base_set($event[$i], $base);
//enable event
event_add($event[$i], $timeouts);
}
//start event loop
event_base_loop($base);
		
		

運行一次然後退出

event_base_loop($base, EVLOOP_ONCE );
		
		
<?php
$timeouts = 10;
//callback function
function func($fd, $event,$arg) {
$time = time();
    echo "Timer-$arg: $time\n";
}
//create base and event 
$base = event_base_new();
for($i=0;$i<10;$i++) {
$event[$i] = event_new();
//set event flags
event_set($event[$i], $i , EV_PERSIST, 'func', "$i");
//set event base
event_base_set($event[$i], $base);
//enable event
event_add($event[$i], $timeouts);
}
//start event loop
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );
event_base_loop($base, EVLOOP_ONCE );		
		
		

5.13.3.2.