RocketMQ获取消息的方式有两种:
-
PULL(消费者主动去Broker拉取):拉取消息需要编写代码去Broker获取。通过DefaultMQPullConsumer,关联namesrv后,通过topic获取到关联的所有Message Queue。遍历所有的Message Queue,批量获取消息。并消费。直到处理完所有的Message Queue。用户需要自己保存消费进度,也就是Message Queue下一次的OffSet。
-
PUSH(服务端推送):当消息到达时,服务端主动推送消息给消费者。RocketMQ的实现方式还是使用PULL,但是封装了遍历Message Queue的过程,并注册MessageListener,取到消息后唤醒MessageListener消费消息。
优缺点比较
-
PUSH实时性高,但是增加了服务端负载,而且可能会造成消费者消息堆积,消费者消费能力不同,如果服务端推送消息过快,消费较慢就会造成消息堆积。消费消息逻辑简单,只需添加MessageListener,用户不用自己维护消费进度。
-
PULL是由消费者自动从服务端拉取,较灵活,但是需要自己编写代码拉取消息,而且拉取消息的时间间隔不好控制,间隔太短,空请求太多,间隔太长,消息不能处理。还需要用户维护消费进度。
上个例子的消费者OldVersionConsumer 就是PUSH方式:
@Slf4j
@Component
public class OldVersionConsumer implements InitializingBean, DisposableBean {
private DefaultMQPushConsumer consumer;
@Value("${rocketmq.namesrv}")
private String namesrv;
String topic = "MyTopic";
@Override
public void destroy() throws Exception {
if (consumer != null) {
consumer.shutdown();
}
}
@Override
public void afterPropertiesSet() throws Exception {
consumer = new DefaultMQPushConsumer("my-consumer");
consumer.setNamesrvAddr(namesrv);
// 设置从之前的消费位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("消费消息:{}", list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
新增OldVersionPullConsumer pull方式的消费者:
@Slf4j
@Component
public class OldVersionPullConsumer implements InitializingBean, DisposableBean {
private DefaultMQPullConsumer pullConsumer;
@Value("${rocketmq.namesrv}")
private String namesrv;
String topic = "MyTopic";
Map<MessageQueue, Long> OFFSET_MAP = new HashMap<>();
@Override
public void destroy() throws Exception {
if (pullConsumer != null) {
pullConsumer.shutdown();
}
}
@Override
public void afterPropertiesSet() throws Exception {
pullConsumer = new DefaultMQPullConsumer("my-pull-consumer");
pullConsumer.setNamesrvAddr(namesrv);
pullConsumer.start();
handleMassage();
}
@Scheduled(cron = "0/30 * * * * ?")
private void handleMassage() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
log.info("message queue:{}", messageQueues);
for (MessageQueue messageQueue : messageQueues) {
PullResult pullResult = pullConsumer.pull(messageQueue, (String) null,getOffSet(messageQueue) , 32);
// 保存消费进度
OFFSET_MAP.put(messageQueue, pullResult.getNextBeginOffset());
List<MessageExt> list = null;
switch (pullResult.getPullStatus()) {
case FOUND:
list = pullResult.getMsgFoundList();
break;
case NO_NEW_MSG:
break;
case NO_MATCHED_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
if (!CollectionUtils.isEmpty(list)) {
log.info("消费消息:{}", list);
}
}
}
private Long getOffSet(MessageQueue queue) {
Long aLong = OFFSET_MAP.get(queue);
if (Objects.isNull(aLong)) {
return 0L;
} else {
return aLong;
}
}
}
OFFSET_MAP 保存了每个Message Queue的消费进度。