RabbitMq

发布时间 2023-07-07 15:26:58作者: primaryC

1,RabbitMq 简介

是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

官网
安装

2,RabbitMq 几个术语

1. Exchange - 交换机

生产者将消息发送给交换机,交换机按照一定规则分发消息给指定队列。消息根据交换机类型和 binding 可以投递到多个队列中。

常用的交换机有四种。

  1. 直连交换机

directExchange: 根据 routeKey 匹配队列

@Bean
public DirectExchange directExchangeDemo(){
    /*
    * 直连交换机
    * 一共四个参数:String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
    *       name: 名称
    *       durable: 持久化
    *       autoDelete:自动删除
    *       arguments:参数
    *
    * */
    return new DirectExchange("directExchangeTest",true,false);
}
  1. 扇形交换机

FanoutExchange:不用匹配 routekey,所有队列都能获取扇形交换机分发的消息

@Bean
public FanoutExchange fanoutExchangeDemo(){
    /* 扇形交换机 */
    return new FanoutExchange("fanoutExchangeTest",true,false);
}
  1. 主题交换机

TopicExchange: 增强版的直连交换机,路由键 routekey 中,* 代表匹配任意一个单词,# 代表匹配任意一个或多个单侧, . 代表一个部分(www.# 可以匹配 www.aaa)

@Bean
public TopicExchange topicExchangeDemo(){
    return new TopicExchange("topicExchangeTest1",true,false);
}
  1. 头部交换机

HeadersExchange : 通过头部键值对匹配队列的交换机

@Bean
public HeadersExchange headersExchangeDemo(){
    /* 头部交换机 */
    return new HeadersExchange("headersExchangeTest",true,false);
}

2. Broker

接收和分发消息的应用,就是 mq 的服务端。

3. Virtual host

虚拟分组,类似于 nameSpace。

4. Connection

publisher/customer 和 broker 直接的连接。

5. Channel

信道,复用 Connection。

6. Exchange

交换机,message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中。

7. Queue

最终消息被送到这里等待被 customer 取走。

8. Binding

exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key, Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

3. 消息队列大致使用过程

  1. 启动一个消息队列服务器
  2. 客户端连接到消息队列服务器,打开一个 channel
  3. 客户端声明一个 exchange,并设置相关属性
  4. 客户端声明一个 queue,并设置相关属性
  5. 客户端使用 routing key,在 exchage 和 queue 中建立绑定关系
  6. 生产者投递消息到 exchange,exchange 接收到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到对应的队列中。
  7. 消费者消费队列中的消息。

4,消息应答

创建消费者:

/**
 *  消费者消费消息
 *    1,消费哪个队列
 *    2,消费成功之后是否要自动应答 "true" 代表自动应答 "false" 手动应答
 *    3,消费者未成功消费的回调
 * */
channel.basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback);

确认消费:

/**
 * 参数 1,消息标记
 *      2,false 表示只应答接收到那个传递的消息
 * 用于肯定确认:RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
 * 
 * multiple 的 true 和 false 代表不同意思:
 *        true 表示批量应答 channel 上未应答的消息,false 表示只应答当前 channel 上的消息。
 * */
Channel.basicAck(long deliveryTag, boolean multiple)

拒绝消费

/**
 *    参数 1,消息标记
 *         2,是否应答 channel 上所有未应答的消息
 *         3,是否重新入列
 *    用于否定消息
 * */ 
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

拒绝消费

/**    
 *    参数 1,消息标记
 *         3,是否重新入列
 *    用于否定消息,相比 basicNack 缺少 multiple 参数,不能批量确认
 * */
Channel.basicReject(long deliveryTag, boolean requeue)
手动确认 demo
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("yanqi");
        factory.setPassword("5211314");
        factory.setVirtualHost("love");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        System.out.println("Custom1 等待接收消息....");

        //消费消息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody());
                System.out.println(message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        //取消消息
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag);
            System.out.println("消息消费被中断");
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3,消费成功
         * 4.消费者未成功消费的回调
         */
        channel.basicConsume("中华艺术宫", false, deliverCallback, cancelCallback);
    }
}

5,队列持久化

//durable:true 表示队列持久化,false 表示不持久化,重启 rabbitmq 队列就没了

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
/**
 * 生成一个队列
 * 1.队列名称
 * 2.队列里面的消息是否持久化 默认消息存储在内存中
 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
 * 5.其他参数
 */
channel.queueDeclare("中华艺术宫", false, false, false, null);

6,消息持久化

//props 中添加 MessageProperties.PERSISTENT_TEXT_PLAIN
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
/**
 * 发送一个消息
 * 1.发送到那个交换机
 * 2.路由的 key 是哪个
 * 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
 * 4.发送消息的消息体
 */
channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

7,预取值

  Channel 上未确认的缓冲区,通过 basicQos(int prefetchCount) 设置值,避免缓冲区无限制未确认大小。通过设置预取值,还可以根据不同消费者性能问题实现不公平分发。

8,发布确认

生成者将 Channel 设置成 confirm 模式,一旦消息被投递到所有匹配的队列之后, broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。

发布确认
public class PushlierConfirm {
    public static void main(String[] args) {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("yanqi");
        factory.setPassword("5211314");
        factory.setVirtualHost("love");

        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {

            //发布确认
            channel.confirmSelect();

            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare("中华艺术宫", false, false, false, null);

            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息,比如 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
             * 4.发送消息的消息体
             */
            int i = 0;
            while(true){
                String message = "hello world--" + (++i);
                channel.basicPublish("", "中华艺术宫", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                boolean b = channel.waitForConfirms();
                if(b){
                    System.out.println("消息 " + i + " 发布成功!");
                }else{
                    System.out.println("消息 " + i + " 发布失败!");
                }

                Thread.sleep(3_000);
            }

            //System.out.println("消息发送完毕");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

9,死信队列

无法被消费的消息。

来源:
1)消息 TTL 过期
2)队列达到最大长度,无法再添加数据到 mq 中
3)被拒绝的消息,并且 requeue = false

声明死信队列 demo
public static void rejectCustom() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    /**
     * 声明死信队列
     * queueDeclare(String queue,
     *              boolean durable,
     *              boolean exclusive,
     *              boolean autoDelete,
     *              Map<String, Object> arguments)
     *   queue:队列名
     *   durable:是否持久化
     *   exclusive:该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
     *   autoDelete:是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
     *   arguments:其他参数
     */
    String dead_queue = "dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 私信队列绑定交换机
     * */
    String dead_exchange = "dead_exchange";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, "dead_routing");

    //声明正常队列
    String normal_queue = "normal_queue";
    Map<String, Object> params = new HashMap<>();
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", "dead_routing");
    channel.queueDeclare(normal_queue, false, false, false, params);


    //等待接收消息
    System.out.println("等待接收消息----");
    channel.basicConsume(normal_queue, false, (consumerTag, message) -> {
        System.out.println(new String(message.getBody(), "UTF-8"));
        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    }, consumerTag -> {
        System.out.println("消费失败");
    });
}

10,延时队列

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

设置超时时间
1)队列的 TTl,队列中的消息一旦过了 TTL 时间未被消费,就会丢弃(有死信队列就放到死信队列中)
2)消息的 TTL,即使消息过期,也不一定被马上丢弃

Rabbitmq 插件实现延迟队列

延时队列
//消息队列设置延时,投送消息到普通队列,ttl 时间内未被消费,投送到死信队列
public static void delay_queue() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    //死信队列
    String dead_queue = "delay_dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 死信队列绑定交换机
     * */
    String dead_exchange = "delay_dead_exchange";
    String delay_routing_key = "delay_routing";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, delay_routing_key);

    //声明带有 ttl 的队列
    String queue = "delay_queue";
    Map<String, Object> params = new HashMap<>();

    //设置队列的 ttl 时间
    params.put("x-message-ttl", 5000);
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", delay_routing_key);
    channel.queueDeclare(queue, false, false, false, params);

    channel.basicPublish("", queue, null, "延时队列数据:1".getBytes());
    channel.basicPublish("", queue, null, "延时队列数据:2".getBytes());
    channel.basicPublish("", queue, null, "延时队列数据:3".getBytes());
}

/**
 * 消息延时
 * 消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
 * 可以用 java DelayQueue
 */
public static void delay_message() throws IOException, TimeoutException {
    Channel channel = ChannelUtil.getChannel();

    //死信队列
    String dead_queue = "delay_dead_queue";
    channel.queueDeclare(dead_queue, false, false, false, null);

    /**
     * 死信队列绑定交换机
     * */
    String dead_exchange = "delay_dead_exchange";
    String delay_routing_key = "delay_routing";
    channel.exchangeDeclare(dead_exchange, BuiltinExchangeType.DIRECT);
    channel.queueBind(dead_queue, dead_exchange, delay_routing_key);

    //声明带有 ttl 的队列
    String queue = "delay_queue";
    Map<String, Object> params = new HashMap<>();

    //设置队列的 ttl 时间
    params.put("x-dead-letter-exchange", dead_exchange);
    params.put("x-dead-letter-routing-key", delay_routing_key);
    channel.queueDeclare(queue, false, false, false, params);

    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("10000").build(), "延时队列数据:1".getBytes());
    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("3000").build(), "延时队列数据:2".getBytes());
    channel.basicPublish("", queue, new AMQP.BasicProperties().builder().expiration("300").build(), "延时队列数据:3".getBytes());
}