版權聲明
轉載請與作者聯繫,轉載時請務必標明文章原始出處和作者信息及本聲明。
|
|
|
微信掃瞄二維碼進入 Netkiller 微信訂閲號 QQ群:128659835 請註明“讀者” |
2017-06-16
2014-03-12 第一版
2014-05-15 第二版
2014-06-13 第三版
2014-07-24 第四版
2015-10-26 第五版
安裝PHP 5.5.9
https://github.com/oscm/shell/blob/master/php/5.5.9.sh
./configure --prefix=/srv/php-5.5.9 \ --with-config-file-path=/srv/php-5.5.9/etc \ --with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \ --enable-fpm \ --with-fpm-user=www \ --with-fpm-group=www \ --with-pear \ --with-curl \ --with-gd \ --with-jpeg-dir \ --with-png-dir \ --with-freetype-dir \ --with-zlib-dir \ --with-iconv \ --with-mcrypt \ --with-mhash \ --with-pdo-mysql \ --with-mysql-sock=/var/lib/mysql/mysql.sock \ --with-openssl \ --with-xsl \ --with-recode \ --enable-sockets \ --enable-soap \ --enable-mbstring \ --enable-gd-native-ttf \ --enable-zip \ --enable-xml \ --enable-bcmath \ --enable-calendar \ --enable-shmop \ --enable-dba \ --enable-wddx \ --enable-sysvsem \ --enable-sysvshm \ --enable-sysvmsg \ --enable-opcache \ --enable-pcntl \ --enable-maintainer-zts \ --disable-debug
編譯必須啟用zts支持否則無法安裝 pthreads(--enable-maintainer-zts)
安裝https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh
# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash
查看pthreads是否已經安裝
# php -m | grep pthreads
<?php
class HelloWorld extends Thread {
public function __construct($world) {
$this->world = $world;
}
public function run() {
print_r(sprintf("Hello %s\n", $this->world));
}
}
$thread = new HelloWorld("World");
if ($thread->start()) {
printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());
}
?>
<?php
class SQLQuery extends Stackable {
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$dbh = $this->worker->getConnection();
$row = $dbh->query($this->sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
/*
* The run method should just prepare the environment for the work that is coming ...
*/
public function run(){
self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$work=new SQLQuery('select * from members order by id desc limit 5');
$worker->stack($work);
$table1 = new SQLQuery('select * from demousers limit 2');
$worker->stack($table1);
$worker->start();
$worker->shutdown();
?>
什麼情況下會用到互斥鎖?在你需要控制多個綫程同一時刻只能有一個綫程工作的情況下可以使用。
下面我們舉一個例子,一個簡單的計數器程序,說明有無互斥鎖情況下的不同。
<?php
$counter = 0;
//$handle=fopen("php://memory", "rw");
//$handle=fopen("php://temp", "rw");
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);
class CounterThread extends Thread {
public function __construct($mutex = null){
$this->mutex = $mutex;
$this->handle = fopen("/tmp/counter.txt", "w+");
}
public function __destruct(){
fclose($this->handle);
}
public function run() {
if($this->mutex)
$locked=Mutex::lock($this->mutex);
$counter = intval(fgets($this->handle));
$counter++;
rewind($this->handle);
fputs($this->handle, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
if($this->mutex)
Mutex::unlock($this->mutex);
}
}
//沒有互斥鎖
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread();
$threads[$i]->start();
}
//加入互斥鎖
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
$threads[$i] = new CounterThread($mutex);
$threads[$i]->start();
}
Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
$threads[$i]->join();
}
Mutex::destroy($mutex);
?>
我們使用檔案/tmp/counter.txt保存計數器值,每次打開該檔案將數值加一,然後寫回檔案。當多個綫程同時操作一個檔案的時候,就會綫程運行先後取到的數值不同,寫回的數值也不同,最終計數器的數值會混亂。
沒有加入鎖的結果是計數始終被覆蓋,最終結果是2
而加入互斥鎖後,只有其中的一個進程完成加一工作並釋放鎖,其他綫程才能得到解鎖信號,最終順利完成計數器累加操作
上面例子也可以通過對檔案加鎖實現,這裡主要講的是多綫程鎖,後面會涉及檔案鎖。
在共享內存的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作內存操作本身具備鎖的功能。
<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
有些場景我們不希望 thread->start() 就開始運行程序,而是希望綫程等待我們的命令。
$thread->wait();測作用是 thread->start()後綫程並不會立即運行,只有收到 $thread->notify(); 發出的信號後才運行
<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');
$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );
class CounterThread extends Thread {
public function __construct($shmid){
$this->shmid = $shmid;
}
public function run() {
$this->synchronized(function($thread){
$thread->wait();
}, $this);
$counter = shm_get_var( $this->shmid, 1 );
$counter++;
shm_put_var( $this->shmid, 1, $counter );
printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
}
}
for ($i=0;$i<100;$i++){
$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
$threads[$i]->start();
}
for ($i=0;$i<100;$i++){
$threads[$i]->synchronized(function($thread){
$thread->notify();
}, $threads[$i]);
}
for ($i=0;$i<100;$i++){
$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>
pthreads 提供的 Pool class 例子
<?php
class WebWorker extends Worker {
public function __construct(SafeLog $logger) {
$this->logger = $logger;
}
protected $loger;
}
class WebWork extends Stackable {
public function isComplete() {
return $this->complete;
}
public function run() {
$this->worker
->logger
->log("%s executing in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
$this->complete = true;
}
protected $complete;
}
class SafeLog extends Stackable {
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf(
"{$message}\n", $args);
}
}
}
$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);
$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();
$pool->collect(function($work){
return $work->isComplete();
});
var_dump($pool);
現在我來詳細講解綫程池,官方文檔比較少,很多經驗是筆者工作中摸索出來的。
Pool 構造方法第一次參數 size, 手冊解釋是 Pool 對象可容納的 Worker 對象的最大數量,但我實際使用發現 size 並不是控制pool壓入任務的數量,而是同時並發的綫程數。
$pool->submit()是可以無線提交任務的,只要內存允許(參考php.ini配置),但同時執行的綫程數由size控制。
我們自行實現一個類來解釋Pool工作原理
<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
}
public function run() {
if(strlen($this->row['bankno']) > 100 ){
$bankno = safenet_decrypt($this->row['bankno']);
}else{
$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
class Pool {
public $pool = array();
public function __construct($count) {
$this->count = $count;
}
public function push($row){
if(count($this->pool) < $this->count){
$this->pool[] = new Update($row);
return true;
}else{
return false;
}
}
public function start(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->start();
}
}
public function join(){
foreach ( $this->pool as $id => $worker){
$this->pool[$id]->join();
}
}
public function clean(){
foreach ( $this->pool as $id => $worker){
if(! $worker->isRunning()){
unset($this->pool[$id]);
}
}
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc";
$row = $dbh->query($sql);
$pool = new Pool(5);
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
while(true){
if($pool->push($member)){ //壓入任務到池中
break;
}else{ //如果池已經滿,就開始啟動綫程
$pool->start();
$pool->join();
$pool->clean();
}
}
}
$pool->start();
$pool->join();
$dbh = null;
} catch (Exception $e) {
echo '[' , date('H:i:s') , ']', '系統錯誤', $e->getMessage(), "\n";
}
?>
上面的例子是當綫程池滿後執行start統一啟動,下面的例子是隻要綫程池中有空閒便立即創建新綫程。
<?php
class Update extends Thread {
public $running = false;
public $row = array();
public function __construct($row) {
$this->row = $row;
$this->sql = null;
//print_r($this->row);
}
public function run() {
if(strlen($this->row['bankno']) > 100 ){
$bankno = safenet_decrypt($this->row['bankno']);
}else{
$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
file_put_contents("bankno_error.log", $error, FILE_APPEND);
}
if( strlen($bankno) > 7 ){
$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);
$this->sql = $sql;
}
printf("%s\n",$this->sql);
}
}
try {
$dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select id,bankno from members order by id desc limit 50";
$row = $dbh->query($sql);
$pool = array();
while($member = $row->fetch(PDO::FETCH_ASSOC))
{
$id = $member['id'];
while (true){
if(count($pool) < 5){
$pool[$id] = new Update($member);
$pool[$id]->start();
break;
}else{
foreach ( $pool as $name => $worker){
if(! $worker->isRunning()){
unset($pool[$name]);
}
}
}
}
}
$dbh = null;
} catch (Exception $e) {
echo '【' , date('H:i:s') , '】', '【系統錯誤】', $e->getMessage(), "\n";
}
?>
$pool->submit 是非阻塞,提交到綫程池中,就會運行下面代碼,有時我們希望等待綫程執行完畢,收集綫程的工作狀況。
$mutex = Mutex::create();
$pool = new Pool ( self::MAXCONN , \ImportWorker::class, array($this->config, $mutex) );
$pool->collect(function($work){
return $work->isComplete();
});
foreach($tasks as $task){
$this->logger ( __CLASS__, sprintf("Task %s %s", $task->file, 'Processing') );
pcntl_signal_dispatch();
if(Signal::get() == SIGHUP){
Signal::reset();
break;
}
if(file_exists ($task->file)){
$handle = fopen($task->file, 'r');
$i = 0;
while (($row = fgetcsv($handle, 100000, ',')) !== false) {
$work[$i] = new Import ( $task, $row );
$pool->submit ( $work[$i] );
$i++;
//$pool->submit ( new Import ( $task, $row ));
}
fclose($handle);
$waiting = true;
while($waiting){
for($i=0;$i<count($work);$i++){
if($work[$i]->isComplete()){
Counter::$completed++;
}
//printf("work %s:%s \n", count($work), Counter::$completed);
if(Counter::$completed == count($work)){
$waiting = false;
break;
}
}
sleep(1);
}
$this->completedTask($task);
}else{
$this->failedTask($task);
}
//printf("Ignore: %s\n", Counter::$ignore ) ;
}
$pool->shutdown ();
//Mutex::unlock($mutex);
Mutex::destroy($mutex);
while($waiting) 對持續運行,直到所有線程都完成後才會退出。
在多綫程中讀寫檔案但進程是有區別的,讀取內容比較容易時間,但寫入數據就需要保證同一時刻只能有一個進程操作,雖然通過互斥鎖可以解決,但從安全的角度檔案必須上鎖。
檔案鎖種類。
LOCK_SH 取得共享鎖定(讀取的程序)。 LOCK_EX 取得獨占鎖定(寫入的程序。 LOCK_UN 釋放鎖定(無論共享或獨占)。 LOCK_NB 如果不希望 flock() 在鎖定時堵塞
共享鎖例子
<?php
$fp = fopen("/tmp/lock.txt", "r+");
if (flock($fp, LOCK_EX)) { // 進行排它型鎖定
ftruncate($fp, 0); // truncate file
fwrite($fp, "Write something here\n");
fflush($fp); // flush output before releasing the lock
flock($fp, LOCK_UN); // 釋放鎖定
} else {
echo "Couldn't get the lock!";
}
fclose($fp);
?>
共享鎖例子2
<?php
$fp = fopen('/tmp/lock.txt', 'r+');
/* Activate the LOCK_NB option on an LOCK_EX operation */
if(!flock($fp, LOCK_EX | LOCK_NB)) {
echo 'Unable to obtain lock';
exit(-1);
}
/* ... */
fclose($fp);
?>
多綫程中操作資料庫總結與注意事項 pthreads 與 pdo 同時使用是,需要注意一點,需要靜態聲明public static $dbh;並且通過單例模式訪問資料庫連接。
<?php
class Work extends Stackable {
public function __construct() {
}
public function run() {
$dbh = $this->worker->getConnection();
$sql = "select id,name from members order by id desc limit 50";
$row = $dbh->query($sql);
while($member = $row->fetch(PDO::FETCH_ASSOC)){
print_r($member);
}
}
}
class ExampleWorker extends Worker {
public static $dbh;
public function __construct($name) {
}
/*
* The run method should just prepare the environment for the work that is coming ...
*/
public function run(){
self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
}
public function getConnection(){
return self::$dbh;
}
}
$worker = new ExampleWorker("My Worker Thread");
$work=new Work();
$worker->stack($work);
$worker->start();
$worker->shutdown();
?>
在綫程池中連結資料庫
# cat pool.php
<?php
class ExampleWorker extends Worker {
public function __construct(Logging $logger) {
$this->logger = $logger;
}
protected $logger;
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($number) {
$this->number = $number;
}
public function run() {
$dbhost = 'db.example.com'; // 資料庫伺服器
$dbuser = 'example.com'; // 資料庫用戶名
$dbpw = 'password'; // 資料庫密碼
$dbname = 'example_real';
$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";
#echo $sql;
$row = $dbh->query($sql);
$mt4_trades = $row->fetch(PDO::FETCH_ASSOC);
if($mt4_trades){
$row = null;
$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";
$dbh->query($sql);
#printf("%s\n",$sql);
}
$dbh = null;
printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);
}
}
class Logging extends Stackable {
protected static $dbh;
public function __construct() {
$dbhost = 'db.example.com'; // 資料庫伺服器
$dbuser = 'example.com'; // 資料庫用戶名
$dbpw = 'password'; // 資料庫密碼
$dbname = 'example_real'; // 資料庫名
self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
)
);
}
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf("{$message}\n", $args);
}
}
protected function getConnection(){
return self::$dbh;
}
}
$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);
$dbhost = 'db.example.com'; // 資料庫伺服器
$dbuser = 'example.com'; // 資料庫用戶名
$dbpw = 'password'; // 資料庫密碼
$dbname = 'db_example';
$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
)
);
$sql = "select `order`,name from accounts where deposit_time is null order by id desc";
$row = $dbh->query($sql);
while($account = $row->fetch(PDO::FETCH_ASSOC))
{
$pool->submit(new Work($account));
}
$pool->shutdown();
?>
進一步改進上面程序,我們使用單例模式 $this->worker->getInstance(); 全局僅僅做一次資料庫連接,綫程使用共享的資料庫連接
<?php
class ExampleWorker extends Worker {
#public function __construct(Logging $logger) {
# $this->logger = $logger;
#}
#protected $logger;
protected static $dbh;
public function __construct() {
}
public function run(){
$dbhost = 'db.example.com'; // 資料庫伺服器
$dbuser = 'example.com'; // 資料庫用戶名
$dbpw = 'password'; // 資料庫密碼
$dbname = 'example'; // 資料庫名
self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
}
protected function getInstance(){
return self::$dbh;
}
}
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
public function __construct($data) {
$this->data = $data;
#print_r($data);
}
public function run() {
#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
try {
$dbh = $this->worker->getInstance();
#print_r($dbh);
$id = $this->data['id'];
$mobile = safenet_decrypt($this->data['mobile']);
#printf("%d, %s \n", $id, $mobile);
if(strlen($mobile) > 11){
$mobile = substr($mobile, -11);
}
if($mobile == 'null'){
# $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";
# printf("%s\n",$sql);
# $dbh->query($sql);
$mobile = '';
$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
}else{
$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";
}
$sth = $dbh->prepare($sql);
$sth->bindValue(':mobile', $mobile);
$sth->bindValue(':id', $id);
$sth->execute();
#echo $sth->debugDumpParams();
}
catch(PDOException $e) {
$error = sprintf("%s,%s\n", $mobile, $id );
file_put_contents("mobile_error.log", $error, FILE_APPEND);
}
#$dbh = null;
printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);
}
}
$pool = new Pool(100, \ExampleWorker::class, []);
#foreach (range(0, 100) as $number) {
# $pool->submit(new Work($number));
#}
$dbhost = 'db.example.com'; // 資料庫伺服器
$dbuser = 'example.com'; // 資料庫用戶名
$dbpw = 'password'; // 資料庫密碼
$dbname = 'example';
$dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
)
);
#print_r($dbh);
#$sql = "select id, mobile from members where id < :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':id',300);
#$sth->execute();
#$result = $sth->fetchAll();
#print_r($result);
#
#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";
#$sth = $dbh->prepare($sql);
#$sth->bindValue(':mobile', 'aa');
#$sth->bindValue(':id','272');
#echo $sth->execute();
#echo $sth->queryString;
#echo $sth->debugDumpParams();
$sql = "select id, mobile from members order by id asc"; // limit 1000";
$row = $dbh->query($sql);
while($members = $row->fetch(PDO::FETCH_ASSOC))
{
#$order = $account['order'];
#printf("%s\n",$order);
//print_r($members);
$pool->submit(new Work($members));
#unset($account['order']);
}
$pool->shutdown();
?>
總的來說 pthreads 仍然處在發展中,仍有一些不足的地方,我們也可以看到pthreads的git在不斷改進這個項目
資料庫持久連結很重要,否則每個綫程都會開啟一次資料庫連接,然後關閉,會導致很多連結超時。
<?php
$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(
PDO::ATTR_PERSISTENT => true
));
?>
但有些場景資料庫持久連結適得其反,所以根據你的場景選擇連結方式
<?php
$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(
PDO::ATTR_PERSISTENT => false
));
?>
由於現成持續連結數據,有時可能因為資料庫或者網絡原因導致數據無法連接,程序拋出異常或終止,所以使用單例並不保險。
protected function getInstance(){
return self::$dbh;
}
為單例增加重新連接功能
class SenderWorker extends Worker {
protected $config;
protected static $dbh;
protected static $amqp;
public function __construct($config) {
$this->config = $config;
$this->logger = new Logger();
}
public function run() {
}
private function connect(){
try {
$dbhost = $this->config['database']['host'];
$dbport = $this->config['database']['port'];
$dbuser = $this->config['database']['user'];
$dbpass = $this->config['database']['password'];
$dbname = $this->config['database']['dbname'];
self::$dbh = new PDO ( "mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array (
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
PDO::MYSQL_ATTR_COMPRESS => true
/*PDO::ATTR_PERSISTENT => true*/
) );
self::$dbh->setAttribute ( PDO::ATTR_ERRMODE, PDO::ERRMODE_WARNING );
} catch ( PDOException $e ) {
$this->logger ( 'Exception worker', $e->getMessage( ) );
} catch ( Exception $e ) {
$this->logger ( 'Exception worker', $e->getMessage( ) );
}
}
protected function getInstance() {
if(!self::$dbh) {
$this->connect();
$this->logger ( 'Database', sprintf("Connect database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );
}else{
$this->logger ( 'Database', sprintf("Get instance database %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );
}
if(self::$dbh){
return self::$dbh;
}else{
$this->logger ( 'Database', sprintf("Connect database is error %s, %s", $this->config['database']['dbname'], $this->getThreadId ()) );
$this->logger ( 'Error', sprintf("Worker is shutdown %s", $this->getThreadId ()) );
$this->shutdown();
}
}
public function logger($type, $message) {
$this->logger->logger($type, $message);
}
public function getAmqpInstance(){
if(!self::$amqp){
self::$amqp = new AMQPConnection(array(
'host' => $this->config['amqp']['host'],
'port' => $this->config['amqp']['port'],
'vhost' => $this->config['amqp']['vhost'],
'login' => $this->config['amqp']['login'],
'password' => $this->config['amqp']['password']
));
$this->logger ( 'AMQP', sprintf("Connect amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );
}else{
$this->logger ( 'AMQP', sprintf("Get instance amqp %s, %s", $this->config['amqp']['host'], $this->getThreadId ()) );
}
return self::$amqp;
}
}
每次調用 getInstance() 會判斷當前資料庫是否已經連結,如果連結丟失,將重新連結資料庫。
多綫程編程中對資料庫更新操作需要注意的是,有些場景,你需要控制同一時刻只能有一個綫程對資料庫做Update, Delete, Insert,否則數據容易出錯。
例如下面的操作,你會發現程序運行完成後數據欄位沒有任何變化。這是因為綫程間相互覆蓋對方之前更新的數據。
$sql = "update import set succeed = succeed+1 where status = :status and id = :id";
解決方法有兩種,一種是外部實現排他鎖,一種是在資料庫內部實現,通過事物處理,解決綫程資源爭奪,相互覆蓋的問題。
private function updateSucceed($task){
$dbh = $this->worker->getInstance();
$dbh->beginTransaction();
$sql = "update import set succeed = succeed+1 where status = :status and id = :id";
$sth = $dbh->prepare ( $sql );
$sth->bindValue ( ':id', $task->id );
$sth->bindValue ( ':status', 'Processing' );
$status = $sth->execute ();
$dbh->commit();
return $status;
}
應用場景,我使用觸發器監控資料庫某個表,一旦發現有改變就通知程序處理數據
首先安裝ZeroMQ 與 ZeroMQ for MySQL UDF 然後創建觸發器。 https://github.com/netkiller/mysql-zmq-plugin
CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)
LANGUAGE SQL
NOT DETERMINISTIC
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '交易監控'
BEGIN
DECLARE Example CHAR(1) DEFAULT 'N';
IF CMD IN ('0','1') THEN
IF VOLUME >=10 AND VOLUME <=90 THEN
select coding into Example from example.members where username = LOGIN and coding = 'Y';
IF Example = 'Y' THEN
select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));
END IF;
END IF;
END IF;
END
CREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN
call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);
END
<?php
class ExampleWorker extends Worker {
#public function __construct(Logging $logger) {
# $this->logger = $logger;
#}
#protected $logger;
protected static $dbh;
public function __construct() {
}
public function run(){
$dbhost = '192.168.2.1'; // 資料庫伺服器
$dbport = 3306;
$dbuser = 'www'; // 資料庫用戶名
$dbpass = 'password'; // 資料庫密碼
$dbname = 'example'; // 資料庫名
self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(
/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
}
protected function getInstance(){
return self::$dbh;
}
}
/* the collectable class implements machinery for Pool::collect */
class Fee extends Stackable {
public function __construct($msg) {
$trades = explode(",", $msg);
$this->data = $trades;
print_r($trades);
}
public function run() {
#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
try {
$dbh = $this->worker->getInstance();
$insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";
$sth = $dbh->prepare($insert);
$sth->bindValue(':ticket', $this->data[0]);
$sth->bindValue(':login', $this->data[1]);
$sth->bindValue(':volume', $this->data[2]);
$sth->execute();
//$sth = null;
//$dbh = null;
/* 業務實現在此處 */
$update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";
$sth = $dbh->prepare($update);
$sth->bindValue(':ticket', $this->data[0]);
$sth->execute();
//echo $sth->queryString;
}
catch(PDOException $e) {
$error = sprintf("%s,%s\n", $mobile, $id );
file_put_contents("mobile_error.log", $error, FILE_APPEND);
}
#$dbh = null;
//printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);
#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);
}
}
class Example {
/* config */
const LISTEN = "tcp://192.168.2.15:5555";
const MAXCONN = 100;
const pidfile = __CLASS__;
const uid = 80;
const gid = 80;
protected $pool = NULL;
protected $zmq = NULL;
public function __construct() {
$this->pidfile = '/var/run/'.self::pidfile.'.pid';
}
private function daemon(){
if (file_exists($this->pidfile)) {
echo "The file $this->pidfile exists.\n";
exit();
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit($pid);
} else {
// we are the child
file_put_contents($this->pidfile, getmypid());
posix_setuid(self::uid);
posix_setgid(self::gid);
return(getmypid());
}
}
private function start(){
$pid = $this->daemon();
$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);
$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$this->zmq->bind(self::LISTEN);
/* Loop receiving and echoing back */
while ($message = $this->zmq->recv()) {
if($message){
$this->pool->submit(new Fee($message));
$this->zmq->send('TRUE');
}else{
$this->zmq->send('FALSE');
}
}
$pool->shutdown();
}
private function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, 9);
unlink($this->pidfile);
}
}
private function help($proc){
printf("%s start | stop | help \n", $proc);
}
public function main($argv){
if(count($argv) < 2){
printf("please input help parameter\n");
exit();
}
if($argv[1] === 'stop'){
$this->stop();
}else if($argv[1] === 'start'){
$this->start();
}else{
$this->help($argv[0]);
}
}
}
$example = new Example();
$example->main($argv);
使用方法
# php example.php start # php example.php stop # php example.php help
此程序涉及守候進程實現$this->daemon()運行後轉到後台運行,進程ID保存,進程的互斥(不允許同時啟動兩個進程),綫程池連接數以及綫程任務等等