Home | 簡體中文 | 繁體中文 | 雜文 | 知乎專欄 | Github | OSChina 博客 | 雲社區 | 雲棲社區 | Facebook | Linkedin | 視頻教程 | 打賞(Donations) | About
知乎專欄多維度架構 微信號 netkiller-ebook | QQ群:128659835 請註明“讀者”

16.2. 消息隊列

這裡選擇使用ZeroMQ的原因主要考慮的是性能問題,其他MQ方案可能會阻塞資料庫。

16.2.1. 背景

之前我發表過一篇文章 http://netkiller.github.io/journal/mysql.plugin.fifo.html

該文章中提出了通過fifo 管道,實現資料庫與其他進程的通信。屬於 IPC 機制(同一個OS/伺服器內),後我有採用ZeroMQ重新實現了一個 RPC 機制的方案,同時兼容IPC(跨越OS/伺服器)

各種縮寫的全稱 IPC(IPC :Inter-Process Communication 進程間通信),ITC(ITC : Inter Thread Communication 綫程間通信)與RPC(RPC: Remote Procedure Calls遠程過程調用)。

支持協議

inproc://my_publisher
tcp://server001:5555
ipc:///tmp/feeds/0
		

16.2.2. 應用場景

如果你想處理數據,由於各種原因你不能在程序中實現,你可以使用這個插件。當資料庫中的數據發生變化的時候出發某種操作,你可以使用這個插件。

有時候你的項目可能是外包的,項目結束後外包方不會在管你,你有無法改動現有代碼,或者根本不敢改。你可以使用這個插件

採用MQ技術對資料庫無任何壓力,與採用程序處理並無不同,省卻了寫代碼

處理方法,可以採用同步或者非同步方式

例 16.1. 發送短信

發送短信、郵件,只需要查詢出相應手機號碼,發送到MQ的服務端,服務端接收到手機號碼後,放入隊列中,多綫程程序從隊列中領取任務,發送短信。

select zmq_client('tcp://localhost:5555',mobile) from demo where subscribed='Y' ...;
			

傳遞多個參數,可以使用符號分隔

select zmq_client('tcp://localhost:5555',concat(name,',',mobile,', news')) from demo;
select zmq_client('tcp://localhost:5555',concat(name,'|',mobile,'|news')) from demo;
			

json格式

select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,', template:news}')) from demo;
			

建議採用非同步方式,MQ端接收到任務立即反饋 “成功”信息,因為我們不太關心是否能發送成功,本身就是盲目性的發送,手機號碼是否可用我們無從得知,短信或者郵件的發送到達率不是100%,所以當進入隊列後,讓程序自行處理,將成功或者失敗信息記錄到日誌中即可。


例 16.2. 處理圖片

首先查詢出需要處理圖片,然後將路徑與分辨率傳遞給MQ另一端的處理程序

select zmq_client('tcp://localhost:5555',concat(image,',800x600}')) from demo;
			

建議採用非同步方式,MQ端接收到任務立即反饋 “成功”信息


例 16.3. 身份證號碼校驗

select zmq_client('tcp://localhost:5555',id_number) from demo;
			

可以採用同步方案,因為MQ款處理几乎不會延遲,直接將處理結構反饋


例 16.4. 靜態化案例

情景模擬,你的項目是你個電商項目,採用外包模式開發,項目已經開發完成。外包放不再負責維護,你現在要做靜態化。增加該功能,你要檢查多處與商品表相關的造作。

于其改代碼,不如程序從外部處理,這樣更保險。我們只要寫一個程序將動態 URL 下載保存成靜態即可,當數據發生變化的時候重新下載覆蓋即可

CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_insert` AFTER INSERT ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_update` AFTER UPDATE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_delete` AFTER DELETE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
			

MQ 另一端的服務會下載http://www.example.com/goods.php?cid=111&id=100, 然後生成html頁面,http://www.example.com/111/100.html

插入會新建頁面,更新會覆蓋頁面,刪除會刪除頁面

這樣無論商品的價格,屬性改變,靜態化程序都會做出相應的處理。


例 16.5. 數據同步案例

我們有多個資料庫,A 庫裡面的數據發生變化後,要同步書庫到B庫,或者處理結果,或者數據轉換後寫入其他資料庫中

方法也是採用觸發器或者EVENT處理


16.2.3. Mysql plugin

我開發了幾個 UDF, 共4個 function

UDF

zmq_client(sockt,message)

sockt .成功返回true,失敗返回flase.

有了上面的function後你就可以在begin,commit,rollback 直接穿插使用,實現在事物處理期間做你愛做的事。也可以用在觸發器與EVENT定時任務中。

16.2.4. plugin 的開發與使用

編譯UDF你需要安裝下面的軟件包

sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev

sudo apt-get install gcc gcc-c++ make cmake
		

https://github.com/netkiller/mysql-zmq-plugin

編譯udf,最後將so檔案複製到 /usr/lib/mysql/plugin/

		
git clone https://github.com/netkiller/mysql-zmq-plugin.git
cd mysql-zmq-plugin

cmake .
make && make install
		
			

裝載

create function zmq_client returns string soname 'libzeromq.so';
create function zmq_publish returns string soname 'libzeromq.so';
		

卸載

drop function zmq_client;
drop function zmq_publish;
		

確認安裝成功

		
mysql> SELECT * FROM `mysql`.`func` where name like 'zmq%';
+-------------+-----+--------------+----------+
| name        | ret | dl           | type     |
+-------------+-----+--------------+----------+
| zmq_client  |   0 | libzeromq.so | function |
| zmq_publish |   0 | libzeromq.so | function |
+-------------+-----+--------------+----------+
2 rows in set (0.00 sec)
		
			

16.2.5. 插件如何使用

插件有很多種用法,這裡僅僅一個例

編譯zeromq server 測試程序

cd test
cmake .
make
		

啟動服務進程

./server
		

發送Hello world!

		
mysql> select zmq_client('tcp://localhost:5555','Hello world!');
+---------------------------------------------------+
| zmq_client('tcp://localhost:5555','Hello world!') |
+---------------------------------------------------+
| Hello world! OK                                   |
+---------------------------------------------------+
1 row in set (0.01 sec)
		
			

查看伺服器端是否接收到信息。

$ ./server
Received: Hello world!
		

我們再將上面的例子使用觸發器進一步優化

		
mysql> select zmq_client('tcp://localhost:5555',mobile) from demo;
+-------------------------------------------+
| zmq_client('tcp://localhost:5555',mobile) |
+-------------------------------------------+
| 13113668891 OK                            |
| 13113668892 OK                            |
| 13113668893 OK                            |
| 13322993040 OK                            |
| 13588997745 OK                            |
+-------------------------------------------+
5 rows in set (0.03 sec)
		
			

伺服器端已經接收到資料庫發過來的信息

$ ./server
Received: Hello world!
Received: 13113668891
Received: 13113668892
Received: 13113668893
Received: 13322993040
Received: 13588997745
		

我們可以拼裝json或者序列化數據,發送給遠端

		
mysql> select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) from demo;
+------------------------------------------------------------------------------+
| zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) |
+------------------------------------------------------------------------------+
| {name:neo, tel:13113668891} OK                                               |
| {name:jam, tel:13113668892} OK                                               |
| {name:leo, tel:13113668893} OK                                               |
| {name:jerry, tel:13322993040} OK                                             |
| {name:tom, tel:13588997745} OK                                               |
+------------------------------------------------------------------------------+
5 rows in set (0.03 sec)
		
			

返回數據取決於你服務端怎麼編寫處理程序,你可以返回true/false等等。

觸發器以及事務處理,這裡就不演示了