# Web组件-rabbitmq
# 消息队列介绍
- 消息队列 Message Queue 是一种应用程序对应用程序的通信方法;
- 消息队列是生产者-消费者模型的典型的代表;
- 消息队列是分布式系统中重要的组件,主要解决
应用解耦
,异步消息
,流量削锋
等问题
目前使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka等
# RabbitMQ介绍
- RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现;
- RabbitMQ 是一款基于 AMQP 协议的消息中间件,它能够在应用之间提供可靠的消息传输;
# RabbitMQ安装
# Mac M2 芯片推荐 Docker 安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
# 管理端端口15672、客户端端口5672
http://127.0.0.1:15672
guest/guest
# 安装Python驱动
pip install pika
# 简单模式
- 生产者
# producer.py
import pika
# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
# 声明队列,没有时自动创建
channel.queue_declare(queue='queue-learning-rabbitmq-main')
# 发布消息
channel.basic_publish(exchange='', # 简单模式 交换机为'', routing_key为队列名
routing_key='queue-learning-rabbitmq-main',
body=b'Hello World!')
print("[x] Sent 'Hello World!'")
- 消费者
# consumer.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()
# 声明队列,没有时自动创建,兼容生产者消费者谁先启动问题
channel.queue_declare('queue-learning-rabbitmq-main')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 设置消费消息的方式
channel.basic_consume(queue='queue-learning-rabbitmq-main',
auto_ack=True, # True表示从队列中取出消息时,队列中就直接删除该消息
on_message_callback=callback)
print('[*] Waiting for messages. To exit press CTRL+C')
# 真正开始消费
channel.start_consuming()
- 管理者
# 相关参数
# 应答参数
消费者端 auto_ack=False
然后结合 ch.basic_ack(delivery_tag=method.delivery_tag)
,解决因任务异常而导致不能正常消费的问题
- 注重效率 auto_ack=True
- 注重安全 auto_ack=False
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='queue-learning-rabbitmq-main',
auto_ack=False,
on_message_callback=callback)
# 持久化参数
解决 RabbitMQ Server 崩溃导致数据丢失问题
channel.queue_declare(queue='queue-learning-rabbitmq-main2', durable=True) # 若声明过,则换一个名字
channel.basic_publish(exchange='',
routing_key='queue-learning-rabbitmq-main2',
body=b'Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
# 分发参数
- 多个消费者时默认轮询,即依次分发一个任务,而不考虑是否能及时完成;
- 消费者端配置实现公平分发,即完成之后就可以接收新任务,需要结合
auto_ack=False
;
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 根据消费情况分发任务
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='queue-learning-rabbitmq-main',
auto_ack=False,
on_message_callback=callback)
# 交换机模式之发布订阅
- 生产者发布消息到交换机
- 消费者创建队列,绑定到交换机
生产者声明交换机exchange='logs'
,并执行交换机类型exchange_type='fanout'
,发送消息到该交换机即可
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 消息直接发送到交换机就可以
message = b"Info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
生产者同样声明交换机,并且需要声明队列,把队列绑定到对应交换机来接收消息
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 然后创建队列,空字符串表示由broker创建队列名
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 最后把队列绑定到交换机上去接收消息
channel.queue_bind(exchange='logs',
queue=queue_name)
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
# 交换机模式之关键字
生产者声明关键字类型交换机exchange_type='direct'
,且发布消息时设置关键字routing_key='info'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs2',
exchange_type='direct')
# 消息直接发送到交换机就可以
message = b"info: Hello World!"
channel.basic_publish(exchange='logs2',
routing_key='info', # 关键字key info
body=message)
print(" [x] Sent %r" % message)
connection.close()
消费者同样声明关键字类型交换机exchange_type='direct'
,不过绑定的时候指定关键字 channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs2',
exchange_type='direct')
# 然后创建队列,空字符串表示由broker创建队列名
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 最后把队列绑定到交换机上去接收消息
channel.queue_bind(exchange='logs2',
queue=queue_name,
routing_key='info')
channel.queue_bind(exchange='logs2',
queue=queue_name,
routing_key='error')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
# 交换机模式之通配符
"通配符交换机"(Topic Exchange)将路由键和某模式进行匹配
- 符号"#"匹配一个或多个词,如
audit.#
能够匹配到"audit.irs.corporate" - 符号"*"仅匹配一个词,如
audit.*
只会匹配到"audit.irs"
# 生产者
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 注意 routing_key
channel.basic_publish(exchange='logs3', routing_key='europe.weather', body=message)
# 消费者
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 通配符匹配
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key="#.news")