RabbitMQ01

发布时间 2023-06-28 10:03:18作者: 加包辣条多放辣椒

1. 消息队列MQ

1.1. 什么是消息队列

MQ(Message Queue)消息队列(消息中间件),是基础数据结构中“先进先出”的一种数据 结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消 息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息,来实现进程之间的通信

消息生产者把消息发送给消息队列,消息队列存储转发消息,消息消费者接收消息、处理消 息。

1.2. 消息队列的作用

解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主 业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

image

以上的问题:一个功能产生问题,整个系统也会跟着出问题。

所以使用消息队列进行解耦

image

  • 异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体 验。

image

每次操作都是单独操作,必须花费对应的时间才可以。

image

  • 削峰填谷(限流):高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫 痪。

image

image

削峰填谷产生的问题:

1.对MQ的压力比较大

2.会让我们的系统长时间的处于最高性能消耗
image

1.3. 消息队列的缺点

  • 系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况。

    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保 证MQ的高可用?(集群)

  • 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。

    MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行 异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺 序性?

  • 业务一致性。主业务和从属业务一致性的处理。

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成 功,D 系统处理失败。如何保证消息数据处理的一致性?

2. AMQP和JMS

2.1. MQ的两种实现方式

JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于 面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

2.2. AMQP和JMS的区别和联系

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型(点对点,发布订阅);而AMQP的消息模型更加丰富(常用的有5 种)

2.3. 常见的MQ产品有哪些

image

3.RabbitMQ的安装

3.1. 安装erlang环境

rabbitmq的使用erlang语言开发的,所以要想安装rabbitmq必须先安装erlang语言环境。

1.执行指令

cd /opt
mkdir erlang
cd erlang

2.将资料中的erlang的原码包上传到Linux的/opt/erlang

image

3.执行指令

tar -zxvf otp_src_24.0.tar.gz

cd otp_src_24.0

yum -y install gcc

yum -y install vim make libtool libtool-ltdl-devel libevent-devel lua-devel
openssl-devel flex mysql-devel gcc.x86_64 gcc-c++.x86_64 ncurses-devel wget
lrzsz

yum install ncurses-devel.x86_64

yum -y install wxWidgets

./configure --prefix=/usr/local/erlang

make && make intall

4.查看erlang是否安装成功

cd /usr/local/erlang/bin
erl

image

5.配置erlang环境变量

vim /etc/profile

在其中添加:export PATH=$PATH:/usr/local/erlang/bin

source /etc/profile

image

3.2. 安装Rabbitmq

1.执行指令

cd /opt

mkdir rabbitmq

cd rabbitmq

2.将资料中的安装包上传的到Linux的/opt/rabbitmq目录中

image

3.执行指令

rpm -ivh rabbitmq-server-3.9.0-1.el7.noarch.rpm --force --nodeps

3.3. 启动Rabbitmq相关配置

3.3.1. 设置配置文件

将资料中的rabbitmq.config文件拷贝到Linux的/etc/rabbitmq目录中

3.3.2. 开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

image

如果已经配置过了就不用管了。

3.3.3. 设置rabbitmq识别erlang环境

cd /usr/lib/rabbitmq/bin

vim rabbitmq-server

在其中添加:export PATH=$PATH:/usr/local/erlang/bin

image

3.3.4. rabbitmq的启动和停止

systemctl start rabbitmq-server.service

systemctl stop rabbitmq-server.service

systemctl restart rabbitmq-server.service

systemctl status rabbitmq-server.service #查看rabbitmq的启动状态

3.3.5. 开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management

systemctl restart rabbitmq-server.service
  • 5672: rabbitMq的编程代码语言客户端连接端口(rabbitmq服务访问端口)
  • 15672:rabbitMq管理界面端口(rabbitmq的web管理工具访问端口)
  • 25672:rabbitMq集群的端口

Linux防火墙放开端口

systemctl restart firewalld

firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp

systemctl restart firewalld

3.3.6. 访问web界面

  • 默认用户名:guest
  • 默认密码:guest

image

3.3.7. 页面说明

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况。
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由。
  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

3.3.8. 添加用户

image

1、 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用 情况等)。

3、 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部 分)。

4、 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

3.3.9. 添加虚拟主机

  • 虚拟主机:类似于mysql中的database。他们都是以“/”开头

image

3.3.10.设置权限

image

image

3.3.11. RabbitMQ结构及执行流程

image

4. Rabbitmq的五种消息模型

image

image

1.简单模式:一个生产者发送消息,一个消费者接收消息

2.工作队列模式:一个生产者发送消息,允许有多个消费者,同一个消息,只能被一个消费者消 费,多个消费者之间是顺序竞争关系。

3.发布订阅模式:一个生产者发送消息,多个消息者接收消息。

4.路由绑定模式:交换机和队列在绑定的时候,需要指定路由规则,生产者发送消息时,也会指 定路由规则,消息会存放到路由规则匹配的队列中,消费者监听不同的队列则获取不同的消息。

5.通配符路由绑定模式:交换机和队列进行绑定的时候,通过通配符的方式设置路由绑定规则, 生产者发送消息时,会指定路由规则,发送消息的路由规则会和绑定的路由规则进行匹配,匹配 上就保存到对应的队列中。

4.1. 测试第一种simple消息模型

image

  • P(producer/ publisher):生产者,一个发送消息的用户应用程序。
  • C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  • 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大 的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
  • RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中, RabbitMQ是邮政信箱,邮局和邮递员。

1.引入依赖

创建springboot项目,并引入依赖

<!--rabbitmq起步依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.编写连接rabbitmq的工具类

public class ConnectionUtil {
    public static Connection getConnection() throws IOException,TimeoutException {
        //定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置服务器地址
        connectionFactory.setHost("192.168.200.129");
        //端口
        connectionFactory.setPort(5672);
        //设置账号信息,用户名,密码,虚拟主机名称
        connectionFactory.setUsername("song");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/song");
        //获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

3.编写生产者发送消息

public class SimpleProducer {
    public static void main(String[] args) throws IOException,TimeoutException {
    //获取连接
    Connection connection = ConnectionUtil.getConnection();
    //创建通道
    Channel channel = connection.createChannel();
    //创建消息队列,方便存放消息
    /*
    参数:
        1. queue:队列名称,如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
        2. durable:是否持久化,当mq重启之后,消息还在
        3. exclusive:
            * 是否独占。只能有一个消费者监听这队列
            * 当Connection关闭时,是否删除队列
        4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
        5. arguments:参数。
    */
    channel.queueDeclare("simple_queue",false,false,false,null);
    //创建消息
    String message = "Hello World";
    //发送消息
    /*
    参数:
        1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
        2. routingKey:路由名称
        3. props:配置信息
        4. body:发送消息数据
    */
    channel.basicPublish("","simple_queue",null,message.getBytes());
    //关闭通道
    channel.close();
    //关闭连接
    connection.close();
    }
}

4.编写消费者接受消息

public class SimpleConsumer {
	public static void main(String[] args) throws IOException,TimeoutException {
    //获取连接
    Connection connection = ConnectionUtil.getConnection();
    //创建通道
    Channel channel = connection.createChannel();
    //创建消息队列,方便存放消息
    channel.queueDeclare("simple_queue",false,false,false,null);
    //创建消费者
    DefaultConsumer consumer = new DefaultConsumer(channel) {
    /*
    参数:
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数据
    */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
            //接收消息
            String message = new String(body);
            System.out.println(message);
        }
    };
        //监听消息队列
        /*
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调监听对象
        */
    	channel.basicConsume("simple_queue",true,consumer);
    }
}
  • 我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消 息。一旦有新的消息进入队列,就会立即打印.

image

4.2. 消息确认机制(ACK)

  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用
  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动 ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
public class SimpleConsumer {
	public static void main(String[] args) throws IOException,TimeoutException {
    //获取连接
    Connection connection = ConnectionUtil.getConnection();
    //创建通道
    Channel channel = connection.createChannel();
    //创建消息队列,方便存放消息
    channel.queueDeclare("simple_queue",false,false,false,null);
    //创建消费者
    DefaultConsumer consumer = new DefaultConsumer(channel) {
        /*
        参数:
        1. consumerTag:标识
        2. envelope:获取一些信息,交换机,路由key...
        3. properties:配置信息
        4. body:数据
        */
        @Override
        public void handleDelivery(String consumerTag, Envelope
        envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                //接收消息
                String message = new String(body);
                System.out.println(message);
                //手动进行ACK
                //参数1:收到消息的标签
                //参数2:true:确认<=DeliveryTag接收,false:只确认标签消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听消息队列
        /*
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认
            3. callback:回调监听对象
        */
        channel.basicConsume("simple_queue",false,consumer);
    }
}