1. 消息队列MQ
1.1. 什么是消息队列
MQ(Message Queue)消息队列(消息中间件),是基础数据结构中“先进先出”的一种数据 结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消 息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息,来实现进程之间的通信
消息生产者把消息发送给消息队列,消息队列存储转发消息,消息消费者接收消息、处理消 息。
1.2. 消息队列的作用
解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主 业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

以上的问题:一个功能产生问题,整个系统也会跟着出问题。
所以使用消息队列进行解耦

- 异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体 验。

每次操作都是单独操作,必须花费对应的时间才可以。

- 削峰填谷(限流):高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫 痪。


削峰填谷产生的问题:
1.对MQ的压力比较大
2.会让我们的系统长时间的处于最高性能消耗

1.3. 消息队列的缺点
-
系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保 证MQ的高可用?(集群)
-
系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行 异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺 序性?
-
业务一致性。主业务和从属业务一致性的处理。
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成 功,D 系统处理失败。如何保证消息数据处理的一致性?
2. AMQP和JMS
2.1. MQ的两种实现方式
JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于 面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
AMQP:即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
2.2. AMQP和JMS的区别和联系
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型(点对点,发布订阅);而AMQP的消息模型更加丰富(常用的有5 种)
2.3. 常见的MQ产品有哪些

3.RabbitMQ的安装
3.1. 安装erlang环境
rabbitmq的使用erlang语言开发的,所以要想安装rabbitmq必须先安装erlang语言环境。
1.执行指令
cd /opt
mkdir erlang
cd erlang
2.将资料中的erlang的原码包上传到Linux的/opt/erlang

3.执行指令
tar -zxvf otp_src_24.0.tar.gz
cd otp_src_24.0
yum -y install gcc
yum -y install vim make libtool libtool-ltdl-devel libevent-devel lua-devel
openssl-devel flex mysql-devel gcc.x86_64 gcc-c++.x86_64 ncurses-devel wget
lrzsz
yum install ncurses-devel.x86_64
yum -y install wxWidgets
./configure --prefix=/usr/local/erlang
make && make intall
4.查看erlang是否安装成功
cd /usr/local/erlang/bin
erl

5.配置erlang环境变量
vim /etc/profile
在其中添加:export PATH=$PATH:/usr/local/erlang/bin
source /etc/profile

3.2. 安装Rabbitmq
1.执行指令
cd /opt
mkdir rabbitmq
cd rabbitmq
2.将资料中的安装包上传的到Linux的/opt/rabbitmq目录中

3.执行指令
rpm -ivh rabbitmq-server-3.9.0-1.el7.noarch.rpm --force --nodeps
3.3. 启动Rabbitmq相关配置
3.3.1. 设置配置文件
将资料中的rabbitmq.config文件拷贝到Linux的/etc/rabbitmq目录中
3.3.2. 开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config

如果已经配置过了就不用管了。
3.3.3. 设置rabbitmq识别erlang环境
cd /usr/lib/rabbitmq/bin
vim rabbitmq-server
在其中添加:export PATH=$PATH:/usr/local/erlang/bin

3.3.4. rabbitmq的启动和停止
systemctl start rabbitmq-server.service
systemctl stop rabbitmq-server.service
systemctl restart rabbitmq-server.service
systemctl status rabbitmq-server.service #查看rabbitmq的启动状态
3.3.5. 开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server.service
- 5672: rabbitMq的编程代码语言客户端连接端口(rabbitmq服务访问端口)
- 15672:rabbitMq管理界面端口(rabbitmq的web管理工具访问端口)
- 25672:rabbitMq集群的端口
Linux防火墙放开端口
systemctl restart firewalld
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
systemctl restart firewalld
3.3.6. 访问web界面
- 默认用户名:guest
- 默认密码:guest

3.3.7. 页面说明
- connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况。
- channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
- Exchanges:交换机,用来实现消息的路由。
- Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
3.3.8. 添加用户

1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用 情况等)。
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部 分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
3.3.9. 添加虚拟主机
- 虚拟主机:类似于mysql中的database。他们都是以“/”开头

3.3.10.设置权限


3.3.11. RabbitMQ结构及执行流程

4. Rabbitmq的五种消息模型


1.简单模式:一个生产者发送消息,一个消费者接收消息
2.工作队列模式:一个生产者发送消息,允许有多个消费者,同一个消息,只能被一个消费者消 费,多个消费者之间是顺序竞争关系。
3.发布订阅模式:一个生产者发送消息,多个消息者接收消息。
4.路由绑定模式:交换机和队列在绑定的时候,需要指定路由规则,生产者发送消息时,也会指 定路由规则,消息会存放到路由规则匹配的队列中,消费者监听不同的队列则获取不同的消息。
5.通配符路由绑定模式:交换机和队列进行绑定的时候,通过通配符的方式设置路由绑定规则, 生产者发送消息时,会指定路由规则,发送消息的路由规则会和绑定的路由规则进行匹配,匹配 上就保存到对应的队列中。
4.1. 测试第一种simple消息模型

- P(producer/ publisher):生产者,一个发送消息的用户应用程序。
- C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
- 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大 的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
- RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中, RabbitMQ是邮政信箱,邮局和邮递员。
1.引入依赖
创建springboot项目,并引入依赖
<!--rabbitmq起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写连接rabbitmq的工具类
public class ConnectionUtil {
public static Connection getConnection() throws IOException,TimeoutException {
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务器地址
connectionFactory.setHost("192.168.200.129");
//端口
connectionFactory.setPort(5672);
//设置账号信息,用户名,密码,虚拟主机名称
connectionFactory.setUsername("song");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/song");
//获取连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
3.编写生产者发送消息
public class SimpleProducer {
public static void main(String[] args) throws IOException,TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建消息队列,方便存放消息
/*
参数:
1. queue:队列名称,如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
2. durable:是否持久化,当mq重启之后,消息还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare("simple_queue",false,false,false,null);
//创建消息
String message = "Hello World";
//发送消息
/*
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish("","simple_queue",null,message.getBytes());
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
4.编写消费者接受消息
public class SimpleConsumer {
public static void main(String[] args) throws IOException,TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建消息队列,方便存放消息
channel.queueDeclare("simple_queue",false,false,false,null);
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
参数:
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
//接收消息
String message = new String(body);
System.out.println(message);
}
};
//监听消息队列
/*
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调监听对象
*/
channel.basicConsume("simple_queue",true,consumer);
}
}
- 我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消 息。一旦有新的消息进入队列,就会立即打印.

4.2. 消息确认机制(ACK)
- 自动ACK:消息一旦被接收,消费者自动发送ACK
- 手动ACK:消息接收后,不会发送ACK,需要手动调用
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动 ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
public class SimpleConsumer {
public static void main(String[] args) throws IOException,TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建消息队列,方便存放消息
channel.queueDeclare("simple_queue",false,false,false,null);
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
参数:
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
//接收消息
String message = new String(body);
System.out.println(message);
//手动进行ACK
//参数1:收到消息的标签
//参数2:true:确认<=DeliveryTag接收,false:只确认标签消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听消息队列
/*
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调监听对象
*/
channel.basicConsume("simple_queue",false,consumer);
}
}