@Service
public class OrderItemServiceImpl {
/**
* https://blog.csdn.net/aetawt/article/details/128957417
* 监听方式获取消息 消息只消费一次,其他消费者消费后,本消费者不再消费
* @RabbitHandler:标注在方法上
* @RabbitListener: 标注在类、方法上
* 使用 @RabbitHandler + @RabbitListener 接受不同类型的消息
*/
@RabbitListener(queues = {MqConstant.SEARCH_DATA_QUEUE})
public void recieveOrderMessage(Message message, Channel channel) throws IOException {
System.out.println("收到了消息了--->" + message + " ====》内容:" + new String(message.getBody()));
System.out.println("渠道数量:" + channel.getChannelNumber());
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("消息处理完成---------------------------------");
//消息顺序,自增
long deliveryTag = messageProperties.getDeliveryTag();
System.out.println(deliveryTag);
//回复,签收消息, fasle表示只签收当前消息,true签收所有
channel.basicAck(deliveryTag, false);
}
/**
* RabbitTemplate 拉取的方式获取消息
*/
@Component
public class RabbitListenerConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void receive(){
Thread thread = new Thread(){
public void run(){
while (true){
System.out.println("*************ready");
String message = (String)rabbitTemplate.receiveAndConvert(MqConstant.SEARCH_DATA_QUEUE,1000*2);
System.out.println("*************receive:"+message);
}
}
};
thread.start();
}
@Autowired
private RabbitTemplate rabbitTemplate;
public void produce(String searchData) {
log.info("消息生产:{}", searchData);
rabbitTemplate.convertAndSend(MqConstant.SEARCH_DATA_EXCHANGE, MqConstant.SEARCH_DATA_QUEUE, searchData);
}
spring:
rabbitmq:
port: 5672
host: 192.168.xxx
username: admin
password: xxx
virtual-host: sxxx
publisher-confirm-type: correlated
publisher-returns: true
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
retry:
enabled: true
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.8</version>
</dependency>