rabbitMq

发布时间 2023-08-24 17:31:07作者: XUMT111

 

@Service
public class OrderItemServiceImpl  {

    /**
     * https://blog.csdn.net/aetawt/article/details/128957417
     * 监听方式获取消息 消息只消费一次,其他消费者消费后,本消费者不再消费
     * @RabbitHandler:标注在方法上
     * @RabbitListener: 标注在类、方法上
     * 使用 @RabbitHandler +  @RabbitListener 接受不同类型的消息
     */
    @RabbitListener(queues = {MqConstant.SEARCH_DATA_QUEUE})
    public void recieveOrderMessage(Message message,  Channel channel) throws IOException {
        System.out.println("收到了消息了--->" + message + " ====》内容:" + new String(message.getBody()));
        System.out.println("渠道数量:" + channel.getChannelNumber());

        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("消息处理完成---------------------------------");
        //消息顺序,自增
        long deliveryTag = messageProperties.getDeliveryTag();
        System.out.println(deliveryTag);
        //回复,签收消息, fasle表示只签收当前消息,true签收所有
        channel.basicAck(deliveryTag, false);
    }


/**
 * RabbitTemplate 拉取的方式获取消息
 */
@Component
public class RabbitListenerConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void receive(){
        Thread thread = new Thread(){
          public void run(){
              while (true){
                  System.out.println("*************ready");
                  String message = (String)rabbitTemplate.receiveAndConvert(MqConstant.SEARCH_DATA_QUEUE,1000*2);
                  System.out.println("*************receive:"+message);
              }
          }
        };
        thread.start();
    }


    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void produce(String searchData) {
        log.info("消息生产:{}", searchData);
        rabbitTemplate.convertAndSend(MqConstant.SEARCH_DATA_EXCHANGE, MqConstant.SEARCH_DATA_QUEUE, searchData);
    }

spring:
  rabbitmq:
    port: 5672
    host: 192.168.xxx
    username: admin
    password: xxx
    virtual-host: sxxx
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.6.8</version>
        </dependency>