rabbitMq主要是操作rabbitmq的Channel
1. docker部署rabbitmq
docker pull rabbitmq:3.8-management-alpine
2. 启动容器
docker run -d --name rmq -e RABBITMQ_DEFAULT_USER=[username] -e RABBITMQ_DEFAULT_PASS=[password] -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management-alpine
3. 下载gorabbitmq第三方库
go get github.com/streadway/amqp
4. 连接初始化rabbitmq
dns := fmt.Sprintf("amqp://%s:%s@%s:%d/","shenyi","123","127.0.0.1",5672)
conn, err := amqp.Dial(dsn)
if err != nil {
log.Fatal("amqp.Dial ",err)
}
5. 连接mq后,获取conn,然后获取channel
channel, err := conn.Channel()
if err != nil {
log.Println(err)
return nil
}
6. 申请交换机
//CONST EXCHANGE_USER = "UserExchange"
err := channel.ExchangeDeclare(EXCHANGE_USER, "direct", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("exchage error ", err)
}
7. 绑定队列到交换机
queue, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
return err
}
//CONST QUEUE_NEWUSER = "newuser"
//CONST ROUTER_KEY_USERREG = "userreg" //注册用户的路由key
//CONST EXCHANGE_USER = "UserExchange"
err = this.Channel.QueueBind(QUEUE_NEWUSER, ROUTER_KEY_USERREG, EXCHANGE_USER, false, nil)
if err != nil {
return err
}
8. 发送消息
//CONST EXCHANGE_USER = "UserExchange"
//CONST ROUTER_KEY_USERREG = "userreg" //注册用户的路由key
this.Channel.Publish(
EXCHANGE_USER,
ROUTER_KEY_USERREG,
//这个参数设置为true如果无法投递给queue,MQ会把消息返还生产者
true,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("消息内容"),
},
)
9. 消费消息
1. 首先,连接mq,然后获取conn,再获取channel,
限流, 同时只能处理2个请求
err := channel.Qos(2, 0, false)
if err != nil {
log.Fatal("mq.Channel.Qos ", err)
}
消费消息
msgs, err := channel.Consume(
queue,
key,
//是否自动确认消息
false,
false,
false,
false,
nil)
if err != nil {
return err
}
for msg := range msgs {
fmt.Println("msg:", string(msg.Body))
//这里进行处理流程,如果处理成功
msg.ACck(false) //Ack后消息会从rabbitmq中删掉,如果不ack,下次连接还会收到消息
//如果处理失败
msg.Reject(false) //丢弃消息
//msg.Nack() //标记这条消息未确认
}