# Web组件-rabbitmq

# 消息队列介绍

  • 消息队列 Message Queue 是一种应用程序对应用程序的通信方法;
  • 消息队列是生产者-消费者模型的典型的代表;
  • 消息队列是分布式系统中重要的组件,主要解决应用解耦异步消息流量削锋等问题

目前使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka等

# RabbitMQ介绍

  • RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现;
  • RabbitMQ 是一款基于 AMQP 协议的消息中间件,它能够在应用之间提供可靠的消息传输;

Python官网文档 (opens new window)

# RabbitMQ安装

# Mac M2 芯片推荐 Docker 安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management

# 管理端端口15672、客户端端口5672
http://127.0.0.1:15672
guest/guest

# 安装Python驱动
pip install pika

# 简单模式

  • 生产者
# producer.py
import pika

# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()

# 声明队列,没有时自动创建
channel.queue_declare(queue='queue-learning-rabbitmq-main')

# 发布消息
channel.basic_publish(exchange='',          # 简单模式 交换机为'', routing_key为队列名
                      routing_key='queue-learning-rabbitmq-main',
                      body=b'Hello World!')

print("[x] Sent 'Hello World!'")
  • 消费者
# consumer.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
channel = connection.channel()

# 声明队列,没有时自动创建,兼容生产者消费者谁先启动问题
channel.queue_declare('queue-learning-rabbitmq-main')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 设置消费消息的方式
channel.basic_consume(queue='queue-learning-rabbitmq-main',
                      auto_ack=True,    # True表示从队列中取出消息时,队列中就直接删除该消息
                      on_message_callback=callback)

print('[*] Waiting for messages. To exit press CTRL+C')
# 真正开始消费
channel.start_consuming()
  • 管理者

# 相关参数

# 应答参数

消费者端 auto_ack=False 然后结合 ch.basic_ack(delivery_tag=method.delivery_tag),解决因任务异常而导致不能正常消费的问题

  • 注重效率 auto_ack=True
  • 注重安全 auto_ack=False
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(queue='queue-learning-rabbitmq-main',
                      auto_ack=False,
                      on_message_callback=callback)

# 持久化参数

解决 RabbitMQ Server 崩溃导致数据丢失问题

channel.queue_declare(queue='queue-learning-rabbitmq-main2', durable=True)  # 若声明过,则换一个名字
 
channel.basic_publish(exchange='',
                      routing_key='queue-learning-rabbitmq-main2',
                      body=b'Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )

# 分发参数

  • 多个消费者时默认轮询,即依次分发一个任务,而不考虑是否能及时完成;
  • 消费者端配置实现公平分发,即完成之后就可以接收新任务,需要结合 auto_ack=False;
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 根据消费情况分发任务
channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='queue-learning-rabbitmq-main',
                      auto_ack=False,
                      on_message_callback=callback)

# 交换机模式之发布订阅

  • 生产者发布消息到交换机
  • 消费者创建队列,绑定到交换机

生产者声明交换机exchange='logs',并执行交换机类型exchange_type='fanout',发送消息到该交换机即可

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 消息直接发送到交换机就可以
message = b"Info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)

connection.close()

生产者同样声明交换机,并且需要声明队列,把队列绑定到对应交换机来接收消息

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 然后创建队列,空字符串表示由broker创建队列名
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 最后把队列绑定到交换机上去接收消息
channel.queue_bind(exchange='logs',
                   queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

# 交换机模式之关键字

生产者声明关键字类型交换机exchange_type='direct',且发布消息时设置关键字routing_key='info'

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

# 消息直接发送到交换机就可以
message = b"info: Hello World!"
channel.basic_publish(exchange='logs2',
                      routing_key='info',   # 关键字key info
                      body=message)
print(" [x] Sent %r" % message)

connection.close()

消费者同样声明关键字类型交换机exchange_type='direct',不过绑定的时候指定关键字 channel.queue_bind(exchange='logs2', queue=queue_name, routing_key='info')

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 交换机模式下,需要先创建交换机
channel.exchange_declare(exchange='logs2',
                         exchange_type='direct')

# 然后创建队列,空字符串表示由broker创建队列名
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 最后把队列绑定到交换机上去接收消息
channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='info')

channel.queue_bind(exchange='logs2',
                   queue=queue_name,
                   routing_key='error')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

# 交换机模式之通配符

"通配符交换机"(Topic Exchange)将路由键和某模式进行匹配

  • 符号"#"匹配一个或多个词,如audit.#能够匹配到"audit.irs.corporate"
  • 符号"*"仅匹配一个词,如audit.*只会匹配到"audit.irs"

# 生产者
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 注意 routing_key
channel.basic_publish(exchange='logs3', routing_key='europe.weather', body=message)


# 消费者
channel.exchange_declare(exchange='logs3', exchange_type='topic')
# 通配符匹配
channel.queue_bind(exchange='logs3', queue=queue_name, routing_key="#.news")
上次更新: 11/14/2022, 8:54:42 PM