RabbitMQ消息的生存时间TTL
TTL(Time To Live)表示消息的生存时间,通常用于设置消息的延迟。TTL是一个时间值,以毫秒为单位,用于指定消息在被发送后多久将被认为是过期的。
在消息队列系统(例如RabbitMQ)中,通过设置消息的TTL,可以控制消息在队列中存活的时间。一旦消息的存活时间超过TTL,消息将被标记为过期并被丢弃或移动到死信队列(Dead Letter Queue)。
TTL可以用于实现延迟消息传递,其中消息在发送后不会立即被消费,而是等待一段时间后再被消费。这对于一些应用场景,比如实现任务调度或延迟处理等,非常有用。
在RabbitMQ中,可以通过设置消息属性 expiration 或通过队列的 x-message-ttl 参数来设置消息的TTL。
MQ环境测试准备
-
xExchange
: 接收生成消息的交换机。 -
QA
: 具有10s消息过期时间的队列,消息过期后通过yExchange
交换机存入死信队列。 -
QB
: 具有10s消息过期时间的队列,消息过期后通过yExchange
交换机存入死信队列。 -
yExchange
: 接收过期消息的交换机。 -
QD
: 死信队列。
代码实现
生产者 8080
定义队列和交换机
package com.zjw.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL队列 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
/**
* 普通交换机名称
*/
public static final String EXCHANGE_X = "X";
/**
* 死信交换机名称
*/
public static final String EXCHANGE_DEAD_LETTER_Y = "Y";
/**
*普通队列名称
*/
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String QUEUE_C = "QC";
/**
* 死信队列名称
*/
public static final String DEAD_LETTER_QUEUE = "QD";
/**
* routing_key
*/
public static final String ROUTING_KEY_XA = "XA";
public static final String ROUTING_KEY_XB = "XB";
public static final String ROUTING_KEY_XC = "XC";
public static final String ROUTING_KEY_YD = "YD";
/**
* 直接交换机
* @return 交换机
*/
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(EXCHANGE_X);
}
/**
* 死信交换机
* @return 交换机
*/
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(EXCHANGE_DEAD_LETTER_Y);
}
/**
* 队列A
* @return 队列
*/
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER_Y);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", ROUTING_KEY_YD);
//设置TTL 单位是ms
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
/**
* 队列B
* @return 队列
*/
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER_Y);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", ROUTING_KEY_YD);
//设置TTL 单位是ms
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
/**
* 队列C
* @return 队列
*/
@Bean("queueC")
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER_Y);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", ROUTING_KEY_YD);
//没有设置TTL
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
/**
* 死信队列
* @return 队列
*/
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
/**
* 队列和交换机绑定
* @param queueA 队列
* @param xExchange 交换机
* @return 绑定关系
*/
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with(ROUTING_KEY_XA);
}
/**
* 队列和交换机绑定
* @param queueB 队列
* @param xExchange 交换机
* @return 绑定关系
*/
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with(ROUTING_KEY_XB);
}
/**
* 队列和交换机绑定
* @param queueC 队列
* @param xExchange 交换机
* @return 绑定关系
*/
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with(ROUTING_KEY_XC);
}
/**
* 队列和交换机绑定
* @param queueD 队列
* @param yExchange 交换机
* @return 绑定关系
*/
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with(ROUTING_KEY_YD);
}
}
用来通过接口发送消息的controller
package com.zjw.controller;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import static com.zjw.config.TtlQueueConfig.*;
@AllArgsConstructor
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
private RabbitTemplate rabbitTemplate;
/**
* 生产者发送消息
* @param message 消息
*/
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), message);
rabbitTemplate.convertAndSend(EXCHANGE_X, ROUTING_KEY_XA, "消息来自ttl为10S的队列" + message);
rabbitTemplate.convertAndSend(EXCHANGE_X, ROUTING_KEY_XB, "消息来自ttl为40S的队列" + message);
}
/**
* 生产者发送消息和过期时间
* @param message 消息
* @param ttlTime 过期时间,单位是毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,
@PathVariable String ttlTime) {
log.info("当前时间:{},发送一条时长{}毫秒信息给队列QC:{}", new Date(), ttlTime, message);
rabbitTemplate.convertAndSend(EXCHANGE_X, ROUTING_KEY_XC, message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
}
消费者 8081
负责监听消费死信队列QD
package com.zjw.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = {"QD"})
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg);
}
}
测试
死信队列
访问:http://localhost:8080/ttl/sendMsg/hello 向QA
QB
队列中发送一个“hello”的消息
观察日志
// 生产者端
2024-01-10T20:12:28.735+08:00 INFO 11744 --- [nio-8080-exec-8] com.zjw.controller.SendMsgController : 当前时间:Wed Jan 10 20:12:28 CST 2024,发送一条信息给两个TTL队列:hello
// 消费者端
2024-01-10T20:12:38.742+08:00 INFO 11548 --- [ntContainer#0-1] c.zjw.consumer.DeadLetterQueueConsumer : 当前时间:Wed Jan 10 20:12:38 CST 2024,收到死信队列的消息:消息来自ttl为10S的队列hello
2024-01-10T20:13:08.741+08:00 INFO 11548 --- [ntContainer#0-1] c.zjw.consumer.DeadLetterQueueConsumer : 当前时间:Wed Jan 10 20:13:08 CST 2024,收到死信队列的消息:消息来自ttl为40S的队列hello
消息在到达QA
队列后10s后消息过期,存入消息队列QD
后被消费者消费。
消息在到达QB
队列后40s后消息过期,存入消息队列QD
后被消费者消费。
死信消息
访问:http://localhost:8080/ttl/sendExpirationMsg/hello/20000 向QC
队列中发送一个“hello”的消息,设置过期时间为20s.
访问:http://localhost:8080/ttl/sendExpirationMsg/world/1000 向QC
队列中发送一个“world”的消息,设置过期时间为1s.
观察日志
// 生产者端
2024-01-10T20:21:35.795+08:00 INFO 5524 --- [nio-8080-exec-2] com.zjw.controller.SendMsgController : 当前时间:Wed Jan 10 20:21:35 CST 2024,发送一条时长20000毫秒信息给队列QC:hello
2024-01-10T20:21:37.921+08:00 INFO 5524 --- [nio-8080-exec-3] com.zjw.controller.SendMsgController : 当前时间:Wed Jan 10 20:21:37 CST 2024,发送一条时长1000毫秒信息给队列QC:world
// 消费者端
2024-01-10T20:21:55.800+08:00 INFO 11548 --- [ntContainer#0-1] c.zjw.consumer.DeadLetterQueueConsumer : 当前时间:Wed Jan 10 20:21:55 CST 2024,收到死信队列的消息:hello
2024-01-10T20:21:55.801+08:00 INFO 11548 --- [ntContainer#0-1] c.zjw.consumer.DeadLetterQueueConsumer : 当前时间:Wed Jan 10 20:21:55 CST 2024,收到死信队列的消息:world
注意观察时间,我们虽然设置了“hello”的过期时间为20s,设置了“world”的过期时间为10s,但是在消费者端接受到的消息几乎是同时的。
这是由于RabbitMQ会先检查先到的第一个消息“hello”的是否过期,并不会管“world”有没有到期,当“hello”过期后将它存到了QD
死信队列,这时才会看后到的“world”有没有过期,发现它也过期了,就将“wrold”也存入了QD
死信队列。
总结
将过期时间设置在队列上和消息上各有优劣。
队列设置过期时间:
优点:消息不用在单独设置过期时间了,队列里的消息过期时间是一致的。
缺点:不能控制单个消息的过期时间。
消息设置过期时间:
优点:可以控制单个消息的过期时间。
缺点:如果先到的消息过期时间太长,会造成后到的消息可能已经过期了,但是还在等待的情况。
所以在开发中如果给消息设置了过期时间而不是而队列设置过期时间的话,上面的处理机制需要注意。