【4.0】RabbitMQ使用之消息安全

发布时间 2023-09-10 19:41:41作者: Chimengmeng

【一】消息安全之ack

  • ACK是一种确认机制,用于确保消息在消费者接收后被正确处理。
    • 当消费者接收到消息并成功处理时,它发送一个ACK(Acknowledgement)给生产者,表示消息已经处理完毕。
    • 只有在收到ACK之后,生产者才会从队列中删除该消息。
  • 我们使用RabbitMQ作为消息中间件,并通过pika库进行消息的生产和消费。
    • 在消费者端,我们可以通过调用channel.basic_ack()方法来发送ACK,确保消息的可靠处理。

【1】生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))


# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz')

# 【4】发布消息
channel.basic_publish(exchange='',
                      routing_key='dream', # 消息队列名称
                      body='hello world')  # 消息信息

# 【5】关闭连接
connection.close()

【2】消费者

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
channel.queue_declare(queue='dream')

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 【5】消费消息
channel.basic_consume(queue='dream',on_message_callback=callback,auto_ack=False)

# 【6】开始消费
channel.start_consuming()
  • channel.basic_consume()中设置auto_ack=False
    • 这表示我们关闭了自动ACK确认机制。
  • 当消费者成功处理一个消息后
    • 会调用ch.basic_ack(delivery_tag=method.delivery_tag)来发送ACK确认。
  • 关闭自动ACK确认后,如果消费者没有发送ACK确认,该消息将一直存在于队列中,直到被消费者确认并删除。
  • 这可以通过保证消息的可靠性和避免消息丢失。

【3】演示

(1)消费者不加ACK自动回复

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
channel.queue_declare(queue='dream')

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

# 【5】消费消息
channel.basic_consume(queue='dream',on_message_callback=callback,auto_ack=False)

# 【6】开始消费
channel.start_consuming()
  • 放入消息

image-20230909180650660

  • 取出消息

image-20230909180726101

  • 消息队列中的消息仍然存在

image-20230909180650660

(2)消费者加ACK自动回复

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
channel.queue_declare(queue='dream')

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 【5】消费消息
channel.basic_consume(queue='dream',on_message_callback=callback,auto_ack=False)

# 【6】开始消费
channel.start_consuming()
  • 取出消息

image-20230909180846141

  • 消息队列清空

image-20230909180910086

【二】消息安全之durable持久化

  • 持久化是指在RabbitMQ的消息中间件中,将消息存储到磁盘上以确保消息的安全性和可靠性。
  • 即使在RabbitMQ服务器重启或宕机后,消息也能够得到保留,以便再次进行消费

【1】生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列),设置 durable=True 来支持持久化,队列必须是新的才可以
channel.queue_declare(queue='dream_q',durable=True)

# 【4】发布消息,并在properties中设置 delivery_mode=2 以使消息持久化
channel.basic_publish(exchange='',
                      routing_key='dream_q', # 消息队列名称
                      body='111',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )

# 【5】关闭连接
connection.close()
  • 我们在channel.queue_declare()中添加了durable=True参数,以支持队列的持久化。
    • 这意味着即使RabbitMQ服务器重启或宕机,队列也会得到保留。
  • 我们在channel.basic_publish()中使用了properties=pika.BasicProperties(delivery_mode=2)来为消息设置持久化属性。
    • 这样一来,即使在RabbitMQ服务器重启或宕机后,消息仍然能够从磁盘中恢复,以便再次进行消费。

【2】消费者

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
channel.queue_declare(queue='dream_q')

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

# 【5】消费消息
channel.basic_consume(queue='dream_q',on_message_callback=callback,auto_ack=False)

channel.start_consuming()
  • 在消费者端即使设置了auto_ack=False,但由于消息已经被持久化,因此即使消费者没有发送ACK确认,消息也会得到保存,直到消费者确认并删除消息。