go-RabbitMQ

发布时间 2023-05-31 11:58:27作者: 是心有林夕啊

erlang 安装

编译依赖:yum install make gcc gcc-c++ build-essential openssl openssl-devel unixODBC unixODBC-devel kernel-devel m4 ncurses-devel

解压:tar -zxvf

创建存放环境目录:mkdir /opt/rabbitMq/erlang

进入 erlang 解压目录执行命令:./configure --prefix=/opt/rabbitmq/erlang --without-javac

安装:make && make install

配置环境变量:vim /etc/profile

#set erlang environment
export PATH=$PATH:/路径和上面黄色部分对应/erlang/bin

刷新配置文件:source profile

测试安装是否成功:erl

RabbitMQ 安装

安装:rpm -ivh --nodeps rabbitmq-server-xxx.noarch.rpm

启动:rabbitmq-server start &

// 如果没有权限,使用 find / -name .erlang.cookie,找到后授权
find / -name .erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie

停止:rabbitmqctl stop

插件添加:rabbitmq-plugins enable {插件名}

插件卸载:rabbitmq-plugins disable {插件名}

查看插件:rabbitmq-plugins list

rabbitmq_management 为 web 管理端插件

本地的话可以使用 username:guest password:guest 进行登录,但是远程就不行了,需要新建用户(必须是 rabbitmq 启动状态下才能新建)

# 第一步:添加 admin 用户并设置密码
rabbitmqctl add_user admin 1234

# 第二步:添加 admin 用户为 administrator 角色
rabbitmqctl set_user_tags admin administrator

# 第三步:设置 admin 用户的权限,指定允许访问的 vhost 以及 write/read
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

# 第四步:查看 vhost (/) 允许哪些用户访问
rabbitmqctl list_permissions -p /

# 第五步:查看用户列表
rabbitmqctl list_users

# 第六步:重启 RabbitMQ,使用设置好的账户和密码登录。

go -> RabbitMQ

package rabbitmq

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

// 连接信息标准格式: amqp://用户名:密码@ RabbitMQ 服务器名称:RabbitMQ 所在服务器的端口/virtual hosts
const MQURL = "amqp://admin:admin@123@192.168.177.130:5672/mq1"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string // 队列名称
	Exchange  string // 交换机
	Key       string // key
	MQURL     string // 连接信息
}

// NewRabbitMQ - 创建 RabbitMQ 结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{
		QueueName: queueName,
		Exchange:  exchange,
		Key:       key,
		MQURL:     MQURL,
	}
	var err error
	// 创建 RabbitMQ 连接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MQURL)
	rabbitmq.failOnErr(err, "创建连接错误")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "获取 channel 失败")
	return rabbitmq
}

// Destory - 断开 channel 和 connection
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

// failOnErr - 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s:%s", message, err))
	}
}

Simple

最简单常用的工作模式。

![[Simple.png]]

一个生产者对应一个消费者

获取实例

// NewRabbitMQSimple - 创建 Simple 模式下的 RabbitMQ 实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

生产者

// PublishSimple - Smiple 模式下的生产者
func (r *RabbitMQ) PublishSimple(message string) {
	// 1. 申请队列,找不到就创建,存在就跳过创建
	// 保证队列存在,消息能放到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false, // 是否自动删除
		false, // 是否具有排他性
		false, // 是否阻塞
		nil,   // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	// 2. 发送消息到队列中
	r.channel.Publish(
		r.Exchange,  // 交换机
		r.QueueName, // 队列名称
		false,       // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false,       // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// ConsumeSimple - Smiple 模式下的消费者
func (r *RabbitMQ) ConsumeSimple() {
	// 1. 申请队列,找不到就创建,存在就跳过创建
	// 保证队列存在,消息能放到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false, // 是否自动删除
		false, // 是否具有排他性
		false, // 是否阻塞
		nil,   // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	// 2. 接收消息
	msgs, err := r.channel.Consume(
		r.QueueName, // 队列名称
		"",          // 用来区分多个消费者
		true,        // 是否自动应答
		false,       // 是否具有排他性
		false,       // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,       // 队列消费是否阻塞
		nil,         // 额外属性
	)

	if err != nil {
		fmt.Println(err)
		return
	}

	forever := make(chan bool)
	// 启用协程处理消息
	go func() {
		for d := range msgs {
			// 实现我们要处理的逻辑判断
			fmt.Printf("Received a message: %s\n", d.Body)
		}
	}()

	log.Printf("[*] Waiting for messages, To exit press CTRL + C")
	<-forever
}

示例

mainSimplePublish

package main

import (
	"fmt"
	"rabbitmq-base/rabbitmq"
)

func main() {
	r := rabbitmq.NewRabbitMQSimple("simple")
	r.PublishSimple("aaaa")
	fmt.Println("发送成功")
}

mainSimpleRecieve

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQSimple("simple")
	r.ConsumeSimple()
}

Work

工作模式

一个消息只能被一个消费者获取,也就是消息只能被消费一次。

一个生产者对应多个消费者。

![[Work.png]]

与 Simple 模式最大的区别就是多个消费者

Publish/Subscribe

订阅模式

消息被路由投递给多个队列,一个消息被多个消费者获取。

![[Publish&Subscribe.png]]

获取实例

// NewRabbitMQPubSub - 创建订阅模式下的 RabbitMQ 实例
func NewRabbitMQPubSub(exchange string) *RabbitMQ {
	return NewRabbitMQ("", exchange, "")
}

生产者

// PublishPub - 订阅模式下的生产者
func (r *RabbitMQ) PublishPub(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"fanout",   // 交换机类型,fanout 是广播类型
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveSub - 订阅模式下的消费者
func (r *RabbitMQ) RecieveSub() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"fanout",   // 交换机类型,fanout 是广播类型
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		"", // 在 Pub/Sub 模式下,这里的 key 要为空
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainPub

package main

import (
	"fmt"
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	for i := 0; i < 100; i++ {
		r.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
		fmt.Println("第" + strconv.Itoa(i) + "条消息")
		time.Sleep(1 * time.Second)
	}
}

mainSub1

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	r.RecieveSub()
}

mainSub2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQPubSub("exchange1")
	r.RecieveSub()
}

Routing

路由模式

一个消息被多个消费者获取。并且消息的目标队列可被生产者指定。

![[Routing.png]]

获取实例

// NewRabbitMQRouting - 创建路由模式下的 RabbitMQ 实例
func NewRabbitMQRouting(exchange string, routingKey string) *RabbitMQ {
	return NewRabbitMQ("", exchange, routingKey)
}

生产者

// PublishRouting - 路由模式下的生产者
func (r *RabbitMQ) PublishRouting(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"direct",   // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		r.Key, // 设置 routingKey
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveRouting - 路由模式下的消费者
func (r *RabbitMQ) RecieveRouting() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"direct",   // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		r.Key, // 需要绑定 routingKey
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainRoutingPublish

package main

import (
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r1 := rabbitmq.NewRabbitMQRouting("exchange2", "error")
	r2 := rabbitmq.NewRabbitMQRouting("exchange2", "info")

	for i := 0; i < 5; i++ {
		r1.PublishRouting("Hello exchange1 error" + strconv.Itoa(i))
		r2.PublishRouting("Hello exchange1 info" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
	}
}

mainRoutingRecieve1

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQRouting("exchange2", "error")
	r.RecieveRouting()
}

mainRoutingRecieve2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQRouting("exchange2", "info")
	r.RecieveRouting()
}

Topic

话题模式

一个消息被多个消费者获取。消息的目标 queue 可用 BindingKey 以通配符,(#:一个或多个词,*:一个词)的方式指定。

![[Topic.png]]

获取实例

// NewRabbitMQTopic - 创建话题模式下的 RabbitMQ 实例
func NewRabbitMQTopic(exchange string, routingKey string) *RabbitMQ {
	return NewRabbitMQ("", exchange, routingKey)
}

生产者

// PublishTopic - 话题模式下的生产者
func (r *RabbitMQ) PublishTopic(message string) {
	// 1. 尝试创建交换机,不存在创建
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"topic",    // 交换机类型,topic
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 发送消息
	err = r.channel.Publish(
		r.Exchange,
		r.Key, // 设置 routingKey
		false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
		false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

消费者

// RecieveTopic - 话题模式下的消费者
func (r *RabbitMQ) RecieveTopic() {
	// 1. 试探性创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange, // 交换机名称
		"topic",    // 交换机类型,direct
		true,       // 是否持久化
		false,      // 自动删除
		false,      // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
		false,      // 是否阻塞
		nil,        // 额外属性
	)

	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
	q, err := r.channel.QueueDeclare(
		"",    // 随机生产队列名称
		false, // 是否持久化
		false, // 自动删除
		true,  // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)

	r.failOnErr(err, "Failed to declare a queue")

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		r.Key, // 需要绑定 routingKey
		r.Exchange,
		false, // 是否阻塞
		nil,   // 额外属性
	)

	// 3. 消费消息
	message, err := r.channel.Consume(
		q.Name, // 队列名称
		"",     // 用来区分多个消费者
		true,   // 是否自动应答
		false,  // 是否具有排他性
		false,  // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
		false,  // 队列消费是否阻塞
		nil,    // 额外属性
	)

	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL + C")
	<-forever
}

示例

mainTopicPublish

package main

import (
	"rabbitmq-base/rabbitmq"
	"strconv"
	"time"
)

func main() {
	r1 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic1")
	r2 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic2")

	for i := 0; i <= 100; i++ {
		r1.PublishTopic("Hello exchange3 xylx.topic1 -> " + strconv.Itoa(i))
		r2.PublishTopic("Hello exchange3 xylx.topic2 -> " + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
	}
}

mainTopicRecieve1

. 用来分隔词

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic1")
	r.RecieveTopic()
}

mainTopicRecieve2

package main

import "rabbitmq-base/rabbitmq"

func main() {
	r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic2")
	r.RecieveTopic()
}