在Ubuntu20.04上安装RabbitMQ
RabbitMQ官网: https://www.rabbitmq.com/
1.安装前准备
sudo apt-get update -y
sudo apt-get install curl gnupg -y
2.安装RabbitMQ签名密钥
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
3.安装apt HTTPS传输
sudo apt-get install apt-transport-https
4.添加提供最新的RabbitMQ和Erlang版本的Bintray存储库
在/etc/apt/sources.list.d目录下创建bintray.erlang.list文件,并在文件中输入以下内容(这里以Ubuntu20.04, Erlang版本为23.x为例子)
deb https://dl.bintray.com/rabbitmq-erlang/debian focal erlang-23.x
在本步骤中向文件中输入的内容根据Ubuntu和Erlang版本的不同而不同,以下介绍版本的关系和选择
在上述输入内容中focal代表了Ubuntu20.04,具体的对应关系如下图
其中erlang-23.x代表选择了Erlang的23.x的版本
然后保存文件并退出
5.安装Erlang包
sudo apt-get update -y
sudo apt-get install -y erlang-base
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key
erlang-runtime-tools erlang-snmp erlang-ssl
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
指定有效程序包
sudo apt-get update -y
在/etc/apt/preferences.d目录下新建erlang文件并输入以下内容
Package: erlang*
Pin: release o=Bintray
Pin-Priority: 1000
执行以下命令
sudo apt-cache policy
在/etc/apt/preferences.d目录下的erlang文件中更改成以下内容(这里erlang选择23.0.3-1版本)
Package: erlang*
Pin: version 1:23.0.3-1
Pin-Priority: 1000
Package: esl-erlang
Pin: version 1:22.3.4.1
Pin-Priority: 1000
6.在/etc/apt/preferences.d/目录下篡改剪rabbitmq文件,并添加以下内容
Package: rabbitmq-server
Pin: version 1:3.8.7
Pin-Priority: 1000
7.安装RabbitMQ
执行以下命令
sudo apt-get update -y
sudo apt-get install rabbitmq-server -y --fix-missing
8.验证RabbitMQ安装
启动
//启动管理界面和外部监控系统
sudo rabbitmq-plugins enable rabbitmq_management
// 启动RabbitMQ
sudo service rabbitmq-server start
9.访问页面查看效果
在浏览器中访问http://localhost:15672/,密码和用户名都是guest
此处是因为我已经建立了队列,所以会有消息。
9.四条RabbitMQ使用的命令总结以下
// 启动
sudo service rabbitmq-server start
// 重启
sudo service rabbitmq-server restart
// 停止
sudo service rabbitmq-server stop
// 查看状态
sudo service rabbitmq-server status
go语言整合rabbitmq(手动ack)
准备包
我们使用github.com/streadway/amqp下的包。此处我才用gomod的形式来管理包,所以按照mod的模式下载包即可。
生产者,producer.go
package main
import (
"github.com/streadway/amqp"
"log"
"reflect"
"strconv"
"time"
)
//初始化全局变量,方便复用
var connection *amqp.Connection
var ch *amqp.Channel
var queue amqp.Queue
var c chan int
var confirms chan amqp.Confirmation
/**
初始化函数,默认在main方法前执行
*/
func init() {
var err error
//建立连接
connection, err = amqp.Dial("amqp://guest:guest@ip:5672/")
if nil != err {
log.Fatalf("connect the rabbitmqProducer error: %s",err)
}
//设置通道
ch, err = connection.Channel()
if nil != err {
log.Fatalf("connect the channel error: %s",err)
}
//开启确认机制
ch.Confirm(false)
confirms = ch.NotifyPublish(make(chan amqp.Confirmation, 1)) // 处理确认逻辑
//定义队列
queue, _ = ch.QueueDeclare("guet-block-1",false,false,
false,false,nil)
if nil != err {
log.Fatalf("create the queue error: %s",err)
}
}
/**
因为channel通道隔一段时间会被关闭,需要重连,否则手动ack会报错。
*/
func reconnectRabbitmq() () {
var err error
//建立连接
connection, err = amqp.Dial("amqp://guest:guest@ip:5672/")
if nil != err {
log.Fatalf("connect the rabbitmqProducer error: %s",err)
}
//设置通道
ch, err = connection.Channel()
if nil != err {
log.Fatalf("connect the channel error: %s",err)
}
//开启确认机制
ch.Confirm(false)
// 处理确认逻辑
confirms = ch.NotifyPublish(make(chan amqp.Confirmation, 1))
//定义队列
queue, _ = ch.QueueDeclare("guet-block-1",false,false,
false,false,nil)
if nil != err {
log.Fatalf("create the queue error: %s",err)
}
}
func main(){
//模拟阻塞
c = make(chan int)
go producer("")
<- c
}
/**
msg是需要的消息内容,可以是json或其他内容。入参可以加上发送内容的类型,如:ContentType: "text/plain"
*/
func producer(msg string) {
//此处msg均写死。
for i := 0; i < 300; i++ {
msg = strconv.Itoa(i)
//通过反射拿到channel结构体的closed字段值
closed := getClosed(*ch)
if closed == 1 {
log.Printf("重连。。。。。。。。。。。。。。。。。")
//0:channel未关闭,1:channel已关闭,为了不报错,我们重新建立连接
reconnectRabbitmq()
}
//发布消息
//exchange, key string, mandatory, immediate bool, msg Publishing
err :=ch.Publish("",queue.Name,false,false,amqp.Publishing{
Body: []byte(msg),
})
if(err != nil){
log.Panic("send mesg error %v",msg)
}
// 生产者是否confirm成功
isSuccess := confirmOne(confirms)
//未成功需要重新调用
if ! isSuccess{
//log.Printf("重发======================== ",msg)
//通过反射拿到channel结构体的closed字段值
closed := getClosed(*ch)
if closed == 1 {
log.Printf("重连。。。。。。。。。。。。。。。。。")
//0:channel未关闭,1:channel已关闭,为了不报错,我们重新建立连接
reconnectRabbitmq()
}
//发布消息
//exchange, key string, mandatory, immediate bool, msg Publishing
err :=ch.Publish("",queue.Name,false,false,amqp.Publishing{
Body: []byte(msg),
})
if(err != nil){
log.Panic("send mesg error %v",msg)
}
}
log.Printf("发送的数据为:%s ",msg)
//间隔三秒发一次信息
time.Sleep(time.Duration(3)*time.Second)
}
c <- 0
}
func getClosed(ch amqp.Channel) int64 {
d :=reflect.ValueOf(ch)
// 根据名字查找字段并转换为Int64
i := d.FieldByName("closed").Int()
return i
}
// 消息确认
func confirmOne(confirms <-chan amqp.Confirmation) bool{
if confirmed := <-confirms; !confirmed.Ack {
//log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
return false
}
return true
}
消费者 consumer.go
package main
import (
"github.com/streadway/amqp"
"log"
"reflect"
)
//初始化全局变量,方便复用
var connection *amqp.Connection
var ch *amqp.Channel
var queue amqp.Queue
var consumerMsg <-chan amqp.Delivery
/**
初始化函数,默认在main方法前执行
*/
func init() {
var err error
//建立连接
connection, err = amqp.Dial("amqp://guest:guest@ip:5672/")
if nil != err {
log.Fatalf("connect the rabbitmqConsumer error: %s",err)
}
//设置通道
ch, err = connection.Channel()
if nil != err {
log.Fatalf("connect the channel error: %s",err)
}
//定义队列
queue, _ = ch.QueueDeclare("guet-block",false,false,
false,false,nil)
if nil != err {
log.Fatalf("create the queue error: %s",err)
}
consumerMsg, err = ch.Consume(queue.Name, "", false,
false, false, false, nil)
if nil != err {
log.Fatalf("receive error %v", err)
}
}
//因为channel通道隔一段时间会被关闭,需要重连,否则手动ack会报错。
func reconnectRabbitmq() () {
var err error
//建立连接
connection, err = amqp.Dial("amqp://guest:guest@ip:5672/")
if nil != err {
log.Fatalf("connect the rabbit error: %v",err)
}
//defer connection.Close()
//设置通道
ch, err = connection.Channel()
if nil != err {
log.Fatalf("get connect fail: %v",err)
}
//defer ch.Close()
//定义队列
queue, _ = ch.QueueDeclare("guet-block", false, false,
false, false, nil)
//获取消息
consumerMsg, err = ch.Consume(queue.Name, "", false,
false, false, false, nil)
if nil != err {
log.Fatalf("receive error %v", err)
}
}
func main() {
// 创建一个channel
c := make(chan int)
log.Println("正在异步请求 consumer")
go consumer()
<- c
}
/**
消费者
*/
func consumer() {
//无限监听
for {
select {
case msg := <-consumerMsg:
//channel被强制关闭,需要重新建立连接
//通过反射拿到channel结构体的closed字段值
closed := getClosed(*ch)
if closed == 1 {
//0:channel未关闭,1:channel已关闭,为了不报错,我们重新建立连接
reconnectRabbitmq()
}else {
log.Printf("接受到的数据为 :%s\n",msg.Body)
//手动ack,false:代表只对当前消息ack,ture,对这条消息前的所有的消息都ack
if err := msg.Ack(false); err != nil {
log.Fatalf("hand ack fail msg: ",err)
}
}
}
}
}
/**
反射获取channel关闭位标识位
*/
func getClosed(ch amqp.Channel) int64 {
d :=reflect.ValueOf(ch)
// 根据名字查找字段并转换为Int64
i := d.FieldByName("closed").Int()
return i
}
模拟效果: