十年河东,十年河西,莫欺少年穷
学无止境,精益求精
为了实现RabbitMQ的延迟队列,我做了如下代码
也就是如下代码,将我的电脑跑死了好几次
入口函数如下:
public static void Main(string[] args) { for(int i = 0; i < 10; i++) { RabbitProducer.ProductDelayMessage($"test{i}", $"hell1o{i}", 1000 * (i+1)); } RabbitConsumer.ConsumerMessage("QnameDLX"); Console.Read(); }
生产者代码:
public class RabbitProducer { private static IConnection connection; private static object o = new object(); public static IConnection GetConnection() { lock (o) { if (connection == null) { //创建连接工厂【设置相关属性】 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest", AutomaticRecoveryEnabled = true, //断开后五秒自动连接 Port = AmqpTcpEndpoint.UseDefaultPort, VirtualHost = "/", }; //通过工厂创建连接对象 connection = factory.CreateConnection(); } return connection; } } public static void ProductMessage(string queueName, string messages) { //创建管道 using (var channel = GetConnection().CreateModel()) { Dictionary<string, object> arguments = new Dictionary<string, object>(); arguments.Add("x-max-priority", 10); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; //deliveryMode: 1(nopersistent)非持久化,2(persistent)持久化 properties.Priority = 9;//消息的优先级 值越大 优先级越高 properties.Expiration = (60 * 1000 * 3).ToString(); //消息的过期时间为3分钟 3分钟内没被消费 则会被丢弃 properties.ContentType = "text/plain";//消息的内输出格式 channel.BasicPublish("", queueName, properties, Encoding.UTF8.GetBytes(messages)); //生产消息 } } /// <summary> /// /// </summary> /// <param name="Ename">正常队列交换机名称</param> /// <param name="EnameDLX">死信交换机</param> /// <param name="QnameName">队列名称</param> /// <param name="QnameNameDLX">死信队列名称</param> /// <param name="routingKey">路由器</param> /// <param name="routingKey">死信队列路由器</param> /// <param name="messageDDL">声明周期 单位毫秒</param> /// <param name="messages"></param> public static void ProductDelayMessage(string QnameName, string messages,long messageDDL) { string QnameNameDLX = "QnameDLX"; string Ename = "Exchange"; string EnameDLX= "ExchangeDLX"; string routingKey="Route"; string routingKeyDLX= "RouteDLX"; //创建管道 using (var channel = GetConnection().CreateModel()) { //durable 是否持久化 //void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments); channel.ExchangeDeclare(Ename, ExchangeType.Direct, true, false, null); channel.ExchangeDeclare(EnameDLX, ExchangeType.Direct, true, false, null); Dictionary<string, object> arguments = new Dictionary<string, object>(); ////队列优先级最高为10,不加x-max-priority的话,计算发布时设置了消息的优先级也不会生效 arguments.Add("x-max-priority", 10); arguments.Add("x-message-ttl", messageDDL);//10秒消息过期 arguments.Add("x-dead-letter-exchange", EnameDLX);// arguments.Add("x-dead-letter-routing-key", routingKeyDLX);// channel.QueueDeclare(QnameName, true, false, false, arguments); // channel.QueueDeclare(QnameNameDLX, true, false, false, null);//死信队列不需要设置其他属性 因此arguments为NULL //正常队列和正常交换机绑定 channel.QueueBind(QnameName, Ename, routingKey); //死信队列和死信交换机绑定 channel.QueueBind(QnameNameDLX, EnameDLX, routingKeyDLX); //正常队列和死信交换机绑定 channel.QueueBind(QnameName, EnameDLX, routingKeyDLX); var properties = channel.CreateBasicProperties(); properties.Priority = 9;//消息的优先级 值越大 优先级越高 0~9 注意,必须要开启队列的优先级,否则此处消息优先级的声明无效 properties.ContentType = "text/plain";//消息的内输出格式 channel.BasicPublish(Ename, routingKey, properties, Encoding.UTF8.GetBytes(messages)); //发送消息 } } }
消费者:
public class RabbitConsumer { private static IConnection connection; private static object o = new object(); public static IConnection GetConnection() { lock (o) { if (connection == null) { //创建连接工厂【设置相关属性】 var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest", AutomaticRecoveryEnabled = true, //断开后五秒自动连接 Port = AmqpTcpEndpoint.UseDefaultPort, VirtualHost = "/", }; //通过工厂创建连接对象 connection = factory.CreateConnection(); } return connection; } } public static void ConsumerMessage(string queueName) { var connection = GetConnection(); //创建通道 using (var channel = connection.CreateModel()) { while (true) { if (!channel.IsClosed) { try { //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); }; //启动消费者 channel.BasicConsume(queueName, false, consumer); } catch { channel.Close(); Console.WriteLine("释放信道1"); channel.Dispose(); Console.WriteLine("关闭连接1"); connection.Close(); connection.Dispose(); } } else { channel.Close(); Console.WriteLine("释放信道1"); channel.Dispose(); Console.WriteLine("关闭连接1"); connection.Close(); connection.Dispose(); } } } } }
我的预期是生成 10个队列,每个队列一条消息且设置不同的消息过期时间,test0 过期时间1秒 test1 过期时间2秒 以此类推。
过期的消息转至死信队列,然后我侦听死信队列,达到延迟消费的目的。
但
代码调试后,确实生成了10个队列,但同时也生成了几十万甚至上百万条消息。这令我很费解。
不知道什么原因,欢迎评论。