Home | 簡體中文 | 繁體中文 | 雜文 | 打賞(Donations) | OSChina 博客 | Facebook | Linkedin | 知乎專欄 | Search | About

第 142 章 Message Queuing & RPC

目錄

142.1. RabbitMQ
142.1.1. 安裝 RabbitMQ
142.1.1.1. Ubuntu
142.1.1.2. CentOS
142.1.1.3. OSCM 一鍵安裝
142.1.1.4. 檢查連接埠
142.1.2. rabbitmqctl - command line tool for managing a RabbitMQ broker
142.1.2.1. change_password
142.1.2.2. list_users
142.1.2.3. 虛擬機管理
142.1.2.4. list_queues
142.1.2.5. list_exchanges
142.1.3. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins
142.1.3.1. rabbitmq_management
142.1.4. Python - Pika
142.1.5. Ruby amqp
142.2. ZeroMQ
142.2.1. python-zeromq
142.2.1.1. pyzmq
142.2.1.2. example
142.2.2. ruby zmq
142.3. nanomsg
142.4. Gearman
142.4.1. Getting Started with Gearman
142.4.1.1. CentOS
142.4.1.2. Ubuntu
142.4.1.3. 防火牆設置
142.4.2. gearman
142.4.3. Gearman PHP Extension
142.5. Apache Kafka is a distributed publish-subscribe messaging system
142.5.1. 安裝 Kafka
142.5.1.1. 安裝 Kafka用於開發與測試環境
142.5.1.2. 安裝 Kafka 適用於 IDC
142.5.1.3. Kafka 日誌
142.5.1.4. 檢查 Kafka 綫程
142.5.2. 測試 Kafka
142.5.3. 配置 Kafka
142.5.3.1. 外網訪問
142.5.3.2. group.id
142.5.4. 管理 Kafka
142.5.5. FAQ
142.5.5.1. WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
142.5.5.2. Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
142.5.5.3. WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
142.6. Celery
142.7. ActiveMQ
142.8. http://kr.github.io/beanstalkd/
142.9. gRPC

142.1. RabbitMQ

RabbitMQ

142.1.1. 安裝 RabbitMQ

running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).

142.1.1.1. Ubuntu

$ sudo apt-get install rabbitmq-server
			

142.1.1.2. CentOS

# yum install -y rabbitmq-server
# chkconfig rabbitmq-server on
# service rabbitmq-server start
			

添加用戶, 添加權限, 刪除guest用戶

# rabbitmqctl add_user rabbit password
# rabbitmqctl set_permissions -p "/" rabbit ".*" ".*" ".*"
# rabbitmqctl delete_user guest
			

142.1.1.3. OSCM 一鍵安裝

curl -s https://raw.githubusercontent.com/oscm/shell/master/mq/rabbitmq/rabbitmq-server-3.6.10.sh | bash

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
			

142.1.1.4. 檢查連接埠

[root@netkiller ~]# ss -lnt | grep 5672
LISTEN     0      128          *:25672                    *:*                  
LISTEN     0      128         :::5672                    :::*
			

142.1.2. rabbitmqctl - command line tool for managing a RabbitMQ broker

rabbitmqctl status
		

142.1.2.1. change_password

			
rabbitmqctl change_password admin <new_password>
			
			

142.1.2.2. list_users

# rabbitmqctl list_users
Listing users ...
guest	[administrator]
...done.
			

142.1.2.3. 虛擬機管理

$ rabbitmqctl add_vhost test
$ rabbitmqctl add_user testuser password
$ rabbitmqctl set_permissions -p test testuser ".*" ".*" ".*"			
			

142.1.2.4. list_queues

# rabbitmqctl list_queues
Listing queues ...
amq.gen-RhBwbb9EdZ8Fgk_heGZQ2w	0
bb	0
customer	276930
demo	0
email	0
example	0
hello	1
members_id	282
new_members_id	0
q_linvo	0
real	0
...done.
			

			
			

142.1.2.5. list_exchanges

# rabbitmqctl list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
email	direct
...done.
			

142.1.3. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins

啟用插件

rabbitmq-plugins enable rabbitmq_management		
		

142.1.3.1. rabbitmq_management

RabbitMQ Management HTTP API (https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html)

啟用插件 Management and Monitoring 插件

rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server			
			
# curl -u guest:guest http://localhost:15672/api/overview
{"management_version":"3.3.5","statistics_level":"fine","exchange_types":[{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the AMQP specification","enabled":true}],"rabbitmq_version":"3.3.5","cluster_name":"rabbit@iZ623qr3xctZ","erlang_version":"R16B03-1","erlang_full_version":"Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [hipe] [kernel-poll:true]","message_stats":{},"queue_totals":{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":1,"queues":3,"exchanges":10,"connections":1,"channels":1},"node":"rabbit@iZ623qr3xctZ","statistics_db_node":"rabbit@iZ623qr3xctZ","listeners":[{"node":"rabbit@iZ623qr3xctZ","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@iZ623qr3xctZ","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@iZ623qr3xctZ","description":"RabbitMQ Management","path":"/","port":15672}]}			
			

vhosts

# curl -u guest:guest http://localhost:15672/api/vhosts
[{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"recv_oct":617,"recv_oct_details":{"rate":0.0},"send_oct":625,"send_oct_details":{"rate":0.0},"name":"/","tracing":false}]			
			

queues

			
# curl -s -u guest:guest http://localhost:15672/api/queues/%2f/example | sed 's/,/,\n/g'
{"message_stats":{"ack":817,
"ack_details":{"rate":0.8},
"deliver":829,
"deliver_details":{"rate":0.8},
"deliver_get":829,
"deliver_get_details":{"rate":0.8},
"publish":33700,
"publish_details":{"rate":22.4},
"redeliver":9,
"redeliver_details":{"rate":0.0}},
"messages":32884,
"messages_details":{"rate":39.2},
"messages_ready":32881,
"messages_ready_details":{"rate":39.2},
"messages_unacknowledged":3,
"messages_unacknowledged_details":{"rate":0.0},
"policy":"",
"exclusive_consumer_tag":"",
"consumers":1,
"consumer_utilisation":0.00005551817727208515,
"memory":34387224,
"backing_queue_status":{"q1":0,
"q2":0,
"delta":["delta",
0,
0,
0],
"q3":0,
"q4":32881,
"len":32881,
"pending_acks":3,
"target_ram_count":"infinity",
"ram_msg_count":32881,
"ram_ack_count":3,
"next_seq_id":33700,
"persistent_count":0,
"avg_ingress_rate":31.071205055112543,
"avg_egress_rate":0.7083061832348867,
"avg_ack_ingress_rate":0.7083061832348867,
"avg_ack_egress_rate":0.7083061832348867},
"state":"running",
"incoming":[{"stats":{"publish":33700,
"publish_details":{"rate":22.4}},
"exchange":{"name":"email",
"vhost":"/"}}],
"deliveries":[{"stats":{"redeliver":3,
"redeliver_details":{"rate":0.0},
"deliver_get":348,
"deliver_get_details":{"rate":0.8},
"deliver":348,
"deliver_details":{"rate":0.8},
"ack":345,
"ack_details":{"rate":0.8}},
"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"}}],
"consumer_details":[{"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"},
"queue":{"name":"example",
"vhost":"/"},
"consumer_tag":"amq.ctag-6BSkZzt3eWgBG5Jn2nl4QA",
"exclusive":false,
"ack_required":true,
"prefetch_count":3,
"arguments":{}}],
"name":"example",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{},
"node":"rabbit@iZ623qr3xctZ"}		
				
			

142.1.4. Python - Pika

http://pika.github.com/

sudo apt-get install python-setuptools python-pip git-core
sudo pip install pika

sudo easy_install pika
		

142.1.5. Ruby amqp

$ sudo gem install amqp
		

例 142.1. Ruby on RabbitMQ

subscriber.rb

$ cat subscriber.rb
require 'rubygems'
require 'amqp'

EM.run {
  amq = MQ.new
  amq.queue("logins").subscribe do |login|
    puts login
  end
}
			

producer.rb

$ cat producer.rb
require 'rubygems'
require 'amqp'

EM.run {
  amq = MQ.new
  queue = amq.queue("logins")
  %w[scott nic robi].each { |login|
      queue.publish(login)
  }
}
			

test

$ ruby subscriber.rb

$ ruby producer.rb