BlockingConnection
类声明的全路径为pika.BlockingConnection
,它抽象了pika客户端工具和rabbitmq server的连接,封装了连接客户端的方法(__init__
函数)以及通过客户端建立通信通道的方法(channel
函数)。BlockingConnection
的初始化函数用来与rabbitmq server建立连接,传入pika.connection.Parameters
类的实例作为参数,下面的代码用于连接本机上的rabbitmq server,返回一个BlockingConnection
类的实例。connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
rabbitmqctl list_connections
可以看到连接到rabbitmq server的连接数,如下所示,pika客户端工具和rabbitmq server监听的5672端口建立了连接,客户端的端口为57428BlockingConnection
的channel
函数用于和rabbitmq server建立一个消息通信通道,这个方法返回pika.adapters.blocking_connection.BlockingChannel
类的实例。客户端和rabbitmq server的所有通信都通过此通道展开,因此客户端利用rabbitmq通信的编程模型都基于这个类提供的方法展开。下面的代码创建了一个通道:channel = connection.channel()
rabbitmqctl list_channels
可以看到连接到serverde channel信息:BlockingChannel
类声明的全路径为pika.adapters.blocking_connection.BlockingChannel
,是对客户端与rabbitmq server通道的抽象,无论客户端作为生产者还是消费者,向rabbitmq server提供的队列发送消息基本都通过操作该对象的实例完成。这是rabbitmq 消息通信的核心类。queue_declare
是向rabbitmq server声明一个队列,如果这个队列不存在,rabbitmq server会创建,因此queue_declare
可以说是创建rabbitmq的方法。下面的代码如果第一次调用,在rabbitmq server中会创建一个名为hello的消息队列:channel.queue_declare(queue='hello')
rabbitmqctl list_queues
可以看到rabbitmq server中创建的队列信息:queue_delete
可以删除队列:channel.queue_delete(queue='hello')
channel.exchange_declare(exchange='logs', exchange_type='fanout')
basic_publish
方法发送。basic_publish
函数根据AMQP规范实现了消息发送,其具体用法参考AMQP手册。basic_publish
函数有两个重要的参数,分别是exchange和routing_key,exchange指定将本条消息发送到哪个交换机,routing_key设置该交换机在路由匹配时使用的关键字。channel.basic_publish(exchange='', routing_key='hello', body=message)
扇出
类型的交换机,命令logs,然后将message消息发送到logs交换机,channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)
basic_consume
方法是消费者接收rabbitmq消息的函数,它的核心参数是指定消费者感兴趣的队列名,以及消息到达时的处理函数,下面是客户端为消费者的代码,所有绑定到logs交换机上的队列,都会收到此消息,消息到达后通过callback函数处理,将其打印出来channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
wget https://github.com/rabbitmq/erlang-rpm/archive/refs/heads/master.zip
unzip master.zip
cd erlang-rpm-master
make prepare
make erlang
cd RPMS/x86_64/ && rpm -ivh erlang-xxx-x.el7.x86_64.rpm
python3.6 -m pip install pika --upgrade
rm -f /usr/bin/python && ln -s /usr/bin/python3.6 /usr/bin/python /* 将python软链接到python3.6 */
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.6/rabbitmq-server-generic-unix-3.10.6.tar.xz
xz -d rabbitmq-server-generic-unix-3.10.6.tar.xz
tar -xf rabbitmq-server-generic-unix-3.10.6.tar
echo "export PATH=${PATH}:/path/to/rabbitmq_server-3.10.6/sbin" >> /etc/profile && source /etc/profile
rabbitmq-server -detached
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
#!/usr/bin/env python3.6
import pika
# 连接rabbitmq server,这里我们的rabbitmq server部署在本地,pika工具通过5672端口连接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
# 建立和rabbitmq server的通道
channel = connection.channel()
# 声明一个名为rpc_queue的队列,如果rabbit sever没有会创建
channel.queue_declare(queue='rpc_queue')
# 实现客户端要调用的例程函数
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
# 实现rabbitmq server消息到达时的回调函数,处理客户端发送来的消息
def on_request(ch, method, props, body):
# 取出消息体并转化为整数
n = int(body)
print(" [.] fib(%s)" % n)
# 执行例程函数,获取函数返回值
response = fib(n)
# 将函数的返回值封装成消息体,发送到RPC的客户端
#
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置通道的qos,规定客户端最多预取1条消息,防止客户端在处理完当前消息之前又接收到新的消息,造成消息的积压
channel.basic_qos(prefetch_count=1)
# 设置当rpc_queue队列有消息到达时,通过on_request函数进行处理
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
# 开始监听rpc_queue队列
channel.start_consuming()
#!/usr/bin/env python3.6
import pika
import uuid
# 实现一个远程调用的类
class FibonacciRpcClient(object):
# 实现类的初始化函数
def __init__(self):
# 连接到rabbitmq server
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
# 创建到rabbitmq server的消息通道
self.channel = self.connection.channel()
# 声明队列,队列名由rabbitmq server自动生成
# 由于这个队列用于传输参数和返回结果,只需要RPC的调用者知道这个队列的存在
# 因此exclusive设置为true表示不允许其它客户端消费此队列的消息
result = self.channel.queue_declare(queue='', exclusive=True)
# 通过返回结果中的method属性,可以获取rabbitmq server自动生成的队列名
self.callback_queue = result.method.queue
# 将上面创建的队列设置为我们感兴趣的队列,将on_response函数作为消息到达时的处理函数
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
self.response = None
self.corr_id = None
# RPC Server执行完例程后通过rabbitmq发送返回结果,消息到达时的处理函数
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
# 远程过程调用函数,接受n作为斐波那契数列索引
def call(self, n):
self.response = None
# 产生随机的uuid,用来标识RPC的每条消息
self.corr_id = str(uuid.uuid4())
# 将消息发送到rabbitmq server的默认交换机
# 交换机使用RPC服务端监听的队列名rpc_queue作为路由关键字
# 消息从交换机出来后会分发到rpc_queue队列,所有监听rpc_queue队列的客户端都会收到此消息
# 设置本次消息的属性:一是设置客户端在收到本次消息后的回调队列,二是标记本条消息的uuid
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
# 阻塞等待直到发送的消息被处理
self.connection.process_data_events(time_limit=None)
# 返回RPC Server处理后的消息
return int(self.response)
# 初始化一个远程调用实例
fibonacci_rpc = FibonacciRpcClient()
# 调用RPC
print(" [x] Requesting fib(6)")
response = fibonacci_rpc.call(6)
print(" [.] Got %r" % response)