Rabbitmq队列

发布时间 2023-12-06 17:40:08作者: 木屐呀

rabbitmq

消息中间件 -消息队列

异步

开发语言erlang 爱立信公司

1.安装python rabbitMQ module 

1 pip3 install pika

关闭防火墙

1 service iptables stop  关闭防火墙

2.实现最简单的队列通信

send端:

 1 #send端
 2 import pika
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()  #建立了rabbit协议的通道
 9 
10 # 声明queue
11 channel.queue_declare(queue='hello')
12 
13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
14 channel.basic_publish(exchange='',
15                       routing_key='hello',
16                       body='Hello World!')
17 print(" [x] Sent 'Hello World!'")
18 connection.close()

receive:

 1 import pika
 2 import time
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
11 # We could avoid that if we were sure that the queue already exists. For example if send.py program
12 # was run before. But we're not yet sure which program to run first. In such cases it's a good
13 # practice to repeat declaring the queue in both programs.
14 channel.queue_declare(queue='hello')
15 
16 
17 def callback(ch, method, properties, body):
18     print("received msg....start processing...",body)
19     time.sleep(20)
20     # print(" [x] Received %r" % ch, method, properties, body)
21     print("[x] msg process done...",body)
22 
23 channel.basic_consume(callback,
24                       queue='hello',
25                       no_ack=True)
26 
27 print(' [*] Waiting for messages. To exit press CTRL+C')
28 channel.start_consuming()

远程连接rabbitmq server的话,需要配置权限

首先在rabbitmq server上创建一个用户

1 ./rabbitmqctl  add_user alex alex3714

同时还要配置权限,允许从外面访问

1 rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

The name of the virtual host to which to grant the user access, defaulting to /.

user

The name of the user to grant access to the specified virtual host.

conf

A regular expression matching resource names for which the user is granted configure permissions.

write

A regular expression matching resource names for which the user is granted write permissions.

read

A regular expression matching resource names for which the user is granted read permissions.

 客户端连接的时候需要配置认证参数

1 credentials = pika.PlainCredentials('root','huang123')
2 connection = pika.BlockingConnection(pika.ConnectionParameters(
3     '192.168.98.131',credentials=credentials))

3.消息安全接收

#消息安全接收,关键在于消费者端返回标识符:
#1.# no_ack=True ->>>默认False不需要确认
#2.ch.basic_ack(delivery_tag=method.delivery_tag) ->>手动确认

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

消息提供者代码:

 1 import pika
 2 import time
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 # 声明queue
11 channel.queue_declare(queue='task_queue')
12 
13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
14 import sys
15 
16 message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
17 
18 channel.basic_publish(exchange='',
19                       routing_key='task_queue',
20                       body=message,
21                       properties=pika.BasicProperties(
22                           delivery_mode=2,  # make message persistent
23                       )
24                       )
25 print(" [x] Sent %r" % message)
26 connection.close()

消费者代码:

 1 import pika, time
 2 
 3 credentials = pika.PlainCredentials('root','huang123')
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     '192.168.98.131',credentials=credentials))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='task_queue') 
10 
11 def callback(ch, method, properties, body):
12     print(" [x] Received %r" % body)
13     time.sleep(20)
14     print(" [x] Done")
15     print("method.delivery_tag", method.delivery_tag)
16     ch.basic_ack(delivery_tag=method.delivery_tag)
17     #确认
18 
19 
20 channel.basic_consume(callback,
21                       queue='task_queue',
22                       # no_ack=True
23                       )
24 
25 print(' [*] Waiting for messages. To exit press CTRL+C')
26 channel.start_consuming()

此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

 1 def callback(ch, method, properties, body):
 2     print " [x] Received %r" % (body,)
 3     time.sleep( body.count('.') )
 4     print " [x] Done"
 5     ch.basic_ack(delivery_tag = method.delivery_tag)
 6  
 7 channel.basic_consume(callback,
 8                       queue='hello',
 9                       #no_ack=True
10                     )                            

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

4.消息持久化 

队列持久化:

channel.queue_declare(queue='hello', durable=True) #消息持久化

消息持久化:

properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

1 channel.queue_declare(queue='task_queue', durable=True)

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

1 channel.basic_publish(exchange='',
2                       routing_key="task_queue",
3                       body=message,
4                       properties=pika.BasicProperties(
5                          delivery_mode = 2, # make message persistent
6                       ))

send:

 1 import pika
 2 import time
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 # 声明queue
11 channel.queue_declare(queue='task_queue', durable=True) #消息持久化
12 
13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
14 import sys
15 
16 message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
17 
18 channel.basic_publish(exchange='',
19                       routing_key='task_queue',
20                       body=message,
21                       properties=pika.BasicProperties(
22                           delivery_mode=2,  # make message persistent
23                       )
24                       )
25 print(" [x] Sent %r" % message)
26 connection.close()

receive:

 1 import pika, time
 2 
 3 credentials = pika.PlainCredentials('root','huang123')
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     '192.168.98.131',credentials=credentials))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='task_queue', durable=True) #消息持久化
10 
11 def callback(ch, method, properties, body):
12     print(" [x] Received %r" % body)
13     time.sleep(20)
14     print(" [x] Done")
15     print("method.delivery_tag", method.delivery_tag)
16     ch.basic_ack(delivery_tag=method.delivery_tag)
17     #确认
18 
19 
20 channel.basic_consume(callback,
21                       queue='task_queue',
22                       # no_ack=True
23                       )
24 
25 print(' [*] Waiting for messages. To exit press CTRL+C')
26 channel.start_consuming()

5.消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

1 channel.basic_qos(prefetch_count=1)

带消息持久化+公平分发的完整代码

生产者端

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)
 9  
10 message = ' '.join(sys.argv[1:]) or "Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

消费者端

 1 import pika
 2 import time
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)
 9 print(' [*] Waiting for messages. To exit press CTRL+C')
10  
11 def callback(ch, method, properties, body):
12     print(" [x] Received %r" % body)
13     time.sleep(body.count(b'.'))
14     print(" [x] Done")
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16  
17 channel.basic_qos(prefetch_count=1)
18 channel.basic_consume(callback,
19                       queue='task_queue')
20  
21 channel.start_consuming()

6.Publish\Subscribe(消息发布\订阅)

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

 headers: 通过headers 来决定把消息发给哪些queue

消息publisher

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='logs',
11                          exchange_type='fanout')
12 
13 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
14 channel.basic_publish(exchange='logs',
15                       routing_key='',
16                       body=message)
17 print(" [x] Sent %r" % message)
18 connection.close()

消息subscriber

 1 import pika
 2 
 3 credentials = pika.PlainCredentials('root','huang123')
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     '192.168.98.131',credentials=credentials))
 6 
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange='logs',
10                          exchange_type='fanout')
11 
12 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
13 queue_name = result.method.queue
14 
15 #将q绑定到交换机上
16 channel.queue_bind(exchange='logs',
17                    queue=queue_name)
18 
19 print(' [*] Waiting for logs. To exit press CTRL+C')
20 
21 def callback(ch, method, properties, body):
22     print(" [x] %r" % body)
23 
24 
25 channel.basic_consume(callback,
26                       queue=queue_name,
27                       no_ack=True)
28 
29 channel.start_consuming()

有选择的接收消息(exchange_type=direct) 

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列

publisher

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='direct_logs',
11                          exchange_type='direct')
12 
13 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #严重程度,级别
14 message = ' '.join(sys.argv[2:]) or 'Hello World!'
15 
16 channel.basic_publish(exchange='direct_logs',
17                       routing_key=severity,
18                       body=message)
19 print(" [x] Sent %r:%r" % (severity, message))
20 connection.close()

subscriber 

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='direct_logs',
11                          exchange_type='direct')
12 
13 result = channel.queue_declare(exclusive=True)
14 queue_name = result.method.queue
15 
16 severities = sys.argv[1:]
17 if not severities:
18     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
19     sys.exit(1)
20 
21 for severity in severities:
22     channel.queue_bind(exchange='direct_logs',
23                        queue=queue_name,
24                        routing_key=severity)
25 
26 print(' [*] Waiting for logs. To exit press CTRL+C')
27 
28 
29 def callback(ch, method, properties, body):
30     print(" [x] %r:%r" % (method.routing_key, body))
31 
32 
33 channel.basic_consume(callback,
34                       queue=queue_name,
35                       no_ack=True)
36 
37 channel.start_consuming()

更细致的消息过滤

 

publisher

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='topic_logs',
11                          exchange_type='topic')
12 
13 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
14 message = ' '.join(sys.argv[2:]) or 'Hello World!'
15 channel.basic_publish(exchange='topic_logs',
16                       routing_key=routing_key,
17                       body=message)
18 print(" [x] Sent %r:%r" % (routing_key, message))
19 connection.close()

subscriber

 1 import pika
 2 import sys
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='topic_logs',
11                          exchange_type='topic')
12 
13 result = channel.queue_declare(exclusive=True)
14 queue_name = result.method.queue
15 
16 binding_keys = sys.argv[1:]
17 if not binding_keys:
18     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
19     sys.exit(1)
20 
21 for binding_key in binding_keys:
22     channel.queue_bind(exchange='topic_logs',
23                        queue=queue_name,
24                        routing_key=binding_key)
25 
26 print(' [*] Waiting for logs. To exit press CTRL+C')
27 
28 
29 def callback(ch, method, properties, body):
30     print(" [x] %r:%r" % (method.routing_key, body))
31 
32 
33 channel.basic_consume(callback,
34                       queue=queue_name,
35                       no_ack=True)
36 
37 channel.start_consuming()

To receive all the logs run:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"






7.Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received

1 fibonacci_rpc = FibonacciRpcClient()
2 result = fibonacci_rpc.call(4)
3 print("fib(4) is %r" % result)

RPC server

 1 import pika
 2 import time
 3 
 4 credentials = pika.PlainCredentials('root','huang123')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     '192.168.98.131',credentials=credentials))
 7 
 8 channel = connection.channel()
 9 
10 channel.queue_declare(queue='rpc_queue')
11 
12 
13 def fib(n):
14     if n == 0:
15         return 0
16     elif n == 1:
17         return 1
18     else:
19         return fib(n - 1) + fib(n - 2)
20 
21 
22 def on_request(ch, method, props, body):
23     n = int(body)
24 
25     print(" [.] fib(%s)" % n)
26     response = fib(n)
27 
28     ch.basic_publish(exchange='',
29                      routing_key=props.reply_to,
30                      properties=pika.BasicProperties(correlation_id= \
31                                                          props.correlation_id),
32                      body=str(response))
33     ch.basic_ack(delivery_tag=method.delivery_tag)
34 
35 
36 channel.basic_qos(prefetch_count=1)
37 channel.basic_consume(on_request, queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")

RPC client

 1 import pika
 2 import uuid
 3 
 4 class FibonacciRpcClient(object):
 5     def __init__(self):
 6         credentials = pika.PlainCredentials('root', 'huang123')
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 8             '192.168.98.131', credentials=credentials))
 9 
10         self.channel = self.connection.channel()
11 
12         result = self.channel.queue_declare(exclusive=True)
13         self.callback_queue = result.method.queue
14 
15         self.channel.basic_consume(self.on_response, no_ack=True, #准备接受命令结果
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):
19         """callback方法"""
20         if self.corr_id == props.correlation_id:
21             self.response = body
22 
23     def call(self, n):
24         self.response = None
25         self.corr_id = str(uuid.uuid4()) #唯一标识符
26         self.channel.basic_publish(exchange='',
27                                    routing_key='rpc_queue',
28                                    properties=pika.BasicProperties(
29                                        reply_to=self.callback_queue,
30                                        correlation_id=self.corr_id,
31                                    ),
32                                    body=str(n))
33 
34         count = 0
35         while self.response is None:
36             self.connection.process_data_events() #检查队列里有没有新消息,但不会阻塞
37             count += 1
38             print('check count..',count)
39         return int(self.response)
40 
41 
42 fibonacci_rpc = FibonacciRpcClient()
43 
44 print(" [x] Requesting fib(30)")
45 response = fibonacci_rpc.call(30)
46 print(" [.] Got %r" % response)