Home | 簡體中文 | 繁體中文 | 雜文 | 知乎專欄 | Github | OSChina 博客 | 雲社區 | 雲棲社區 | Facebook | Linkedin | 視頻教程 | 打賞(Donations) | About
知乎專欄多維度架構 微信號 netkiller-ebook | QQ群:128659835 請註明“讀者”

第 139 章 Message Queuing & RPC

目錄

139.1. RabbitMQ
139.1.1. 安裝 RabbitMQ
139.1.1.1. Ubuntu
139.1.1.2. CentOS
139.1.1.3. OSCM 一鍵安裝
139.1.1.4. 檢查連接埠
139.1.2. 配置 RabbitMQ
139.1.2.1. 監聽所有適配器地址
139.1.3. rabbitmqctl - command line tool for managing a RabbitMQ broker
139.1.3.1. change_password
139.1.3.2. list_users
139.1.3.3. 虛擬機管理
139.1.3.4. list_queues
139.1.3.5. list_exchanges
139.1.4. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins
139.1.4.1. rabbitmq_management
139.1.5. Python - Pika
139.1.6. Ruby amqp
139.2. ZeroMQ
139.2.1. python-zeromq
139.2.1.1. pyzmq
139.2.1.2. example
139.2.2. ruby zmq
139.3. nanomsg
139.4. Gearman
139.4.1. Getting Started with Gearman
139.4.1.1. CentOS
139.4.1.2. Ubuntu
139.4.1.3. 防火牆設置
139.4.2. gearman
139.4.3. Gearman PHP Extension
139.5. Apache Kafka is a distributed publish-subscribe messaging system
139.5.1. 安裝 Kafka
139.5.1.1. 安裝 Kafka用於開發與測試環境
139.5.1.2. 安裝 Kafka 適用於 IDC
139.5.1.3. Kafka 日誌
139.5.1.4. 檢查 Kafka 綫程
139.5.2. 測試 Kafka
139.5.3. 配置 Kafka
139.5.3.1. server.properties
139.5.3.1.1. 外網訪問
139.5.3.2. consumer.properties
139.5.3.2.1. group.id
139.5.3.3. producer.properties
139.5.4. 管理 Kafka
139.5.5. FAQ
139.5.5.1. WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
139.5.5.2. Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
139.5.5.3. WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
139.6. Celery
139.7. ActiveMQ
139.8. http://kr.github.io/beanstalkd/
139.9. gRPC

139.1. RabbitMQ

RabbitMQ

139.1.1. 安裝 RabbitMQ

running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).

139.1.1.1. Ubuntu

			
$ sudo apt-get install rabbitmq-server			
			

			

139.1.1.2. CentOS

			
# yum install -y rabbitmq-server
# chkconfig rabbitmq-server on
# service rabbitmq-server start			
			

			

添加用戶, 添加權限, 刪除guest用戶

			
# rabbitmqctl add_user rabbit password
# rabbitmqctl set_permissions -p "/" rabbit ".*" ".*" ".*"
# rabbitmqctl delete_user guest			
			

			

139.1.1.3. OSCM 一鍵安裝

			
curl -s https://raw.githubusercontent.com/oscm/shell/master/mq/rabbitmq/rabbitmq-server-3.6.10.sh | bash

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"			
			
			

139.1.1.4. 檢查連接埠

			
[root@netkiller ~]# ss -lnt | grep 5672
LISTEN 0 128 *:25672 *:*
LISTEN 0 128 :::5672 :::*			
						
			

139.1.2. 配置 RabbitMQ

創建配置檔案,預設情況/etc/rabbitmq/下面什麼都沒有。你需要從共享文檔中複製一份配置檔案過去。

		
cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
		
		

139.1.2.1. 監聽所有適配器地址

預設 RabbitMQ 監聽 localhost 如果你需要讓外部機器連接進來,需要配置 tcp_listeners 0.0.0.0

			
 {tcp_listeners, [{"0.0.0.0",5672}]}		
			
			

139.1.3. rabbitmqctl - command line tool for managing a RabbitMQ broker

			rabbitmqctl status
		

139.1.3.1. change_password

			
rabbitmqctl change_password admin <new_password>
			
			

139.1.3.2. list_users

			
# rabbitmqctl list_users
Listing users ...
guest [administrator]
...done.			
			
			

139.1.3.3. 虛擬機管理

			
$ rabbitmqctl add_vhost test
$ rabbitmqctl add_user testuser password
$ rabbitmqctl set_permissions -p test testuser ".*" ".*" ".*"			
						
			

139.1.3.4. list_queues

			
# rabbitmqctl list_queues
Listing queues ...
amq.gen-RhBwbb9EdZ8Fgk_heGZQ2w 0
bb 0
customer 276930
demo 0
email 0
example 0
hello 1
members_id 282
new_members_id 0
q_linvo 0
real 0
...done.
				
			

			
			

139.1.3.5. list_exchanges

			
# rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
email direct
...done.
			
			

139.1.4. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins

啟用插件

		
rabbitmq-plugins enable rabbitmq_management		
		
		

139.1.4.1. rabbitmq_management

RabbitMQ Management HTTP API (https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html)

啟用插件 Management and Monitoring 插件

						
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server
				
			
			
# curl -u guest:guest http://localhost:15672/api/overview
{"management_version":"3.3.5","statistics_level":"fine","exchange_types":[{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the
AMQP specification","enabled":true}],"rabbitmq_version":"3.3.5","cluster_name":"rabbit@iZ623qr3xctZ","erlang_version":"R16B03-1","erlang_full_version":"Erlang R16B03-1
(erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [hipe]
[kernel-poll:true]","message_stats":{},"queue_totals":{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":1,"queues":3,"exchanges":10,"connections":1,"channels":1},"node":"rabbit@iZ623qr3xctZ","statistics_db_node":"rabbit@iZ623qr3xctZ","listeners":[{"node":"rabbit@iZ623qr3xctZ","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@iZ623qr3xctZ","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@iZ623qr3xctZ","description":"RabbitMQ
Management","path":"/","port":15672}
				
			

vhosts

			
	# curl -u guest:guest http://localhost:15672/api/vhosts
				[{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"recv_oct":617,"recv_oct_details":{"rate":0.0},"send_oct":625,"send_oct_details":{"rate":0.0},"name":"/","tracing":false}]			
			
			

queues

			
# curl -s -u guest:guest http://localhost:15672/api/queues/%2f/example | sed 's/,/,\n/g'
{"message_stats":{"ack":817,
"ack_details":{"rate":0.8},
"deliver":829,
"deliver_details":{"rate":0.8},
"deliver_get":829,
"deliver_get_details":{"rate":0.8},
"publish":33700,
"publish_details":{"rate":22.4},
"redeliver":9,
"redeliver_details":{"rate":0.0}},
"messages":32884,
"messages_details":{"rate":39.2},
"messages_ready":32881,
"messages_ready_details":{"rate":39.2},
"messages_unacknowledged":3,
"messages_unacknowledged_details":{"rate":0.0},
"policy":"",
"exclusive_consumer_tag":"",
"consumers":1,
"consumer_utilisation":0.00005551817727208515,
"memory":34387224,
"backing_queue_status":{"q1":0,
"q2":0,
"delta":["delta",
0,
0,
0],
"q3":0,
"q4":32881,
"len":32881,
"pending_acks":3,
"target_ram_count":"infinity",
"ram_msg_count":32881,
"ram_ack_count":3,
"next_seq_id":33700,
"persistent_count":0,
"avg_ingress_rate":31.071205055112543,
"avg_egress_rate":0.7083061832348867,
"avg_ack_ingress_rate":0.7083061832348867,
"avg_ack_egress_rate":0.7083061832348867},
"state":"running",
"incoming":[{"stats":{"publish":33700,
"publish_details":{"rate":22.4}},
"exchange":{"name":"email",
"vhost":"/"}}],
"deliveries":[{"stats":{"redeliver":3,
"redeliver_details":{"rate":0.0},
"deliver_get":348,
"deliver_get_details":{"rate":0.8},
"deliver":348,
"deliver_details":{"rate":0.8},
"ack":345,
"ack_details":{"rate":0.8}},
"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"}}],
"consumer_details":[{"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"},
"queue":{"name":"example",
"vhost":"/"},
"consumer_tag":"amq.ctag-6BSkZzt3eWgBG5Jn2nl4QA",
"exclusive":false,
"ack_required":true,
"prefetch_count":3,
"arguments":{}}],
"name":"example",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{},
"node":"rabbit@iZ623qr3xctZ"}		
			
			

139.1.5. Python - Pika

http://pika.github.com/

		
sudo apt-get install python-setuptools python-pip git-core
sudo pip install pika

sudo easy_install pika		
		
			
		

139.1.6. Ruby amqp

		
$ sudo gem install amqp		
		
		

例 139.1. Ruby on RabbitMQ

subscriber.rb

			
$ cat subscriber.rb
require 'rubygems'
require 'amqp'

EM.run {
amq = MQ.new
amq.queue("logins").subscribe do |login|
puts login
end
}			
			
			

producer.rb

			
$ cat producer.rb
require 'rubygems'
require 'amqp'

EM.run {
amq = MQ.new
queue = amq.queue("logins")
%w[scott nic robi].each { |login|
queue.publish(login)
}
}			
			
			

test

			
$ ruby subscriber.rb
$ ruby producer.rb