发送端代码
package main
import (
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s : %s", msg, err)
}
}
func connectMQ() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to opem a channel")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Fail open a chennel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
failOnError(err, "failed to declare a quene")
forerver := make(chan bool)
go func() {
count := 0
for {
count += 1
body := fmt.Sprintf("%s , %d", "hello world", count)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
time.Sleep(100 * time.Millisecond)
}
}()
<-forerver
}
func main() {
connectMQ()
}
自动应答 接收端代码
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s - %s ", msg, err)
}
}
func reviceMQ() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "failed to connect to rabbitmq")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, " Failede to open achannel")
defer ch.Close()
ch.Qos(1, 0, true)
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
failOnError(err, "failed to declare a queue ")
自动应答, 这样会造成 工人 死亡时 详细丢失
msgs, err := ch.Consume(
q.Name,
"",
true, // 自动应答 标识
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for {
d := <-msgs
log.Printf("Receied a message : %s \n len msgs : %d \n", d.Body, len(msgs))
time.Sleep(400 * time.Millisecond)
d.Ack(true)
}
}()
log.Printf(" waiting for messages , to exit press ctrl+c")
<-forever
}
func main() {
reviceMQ()
}
使用说明
sender 与 reciver serve 都可以启动多份,来提高队列效率
什么是自动应答?
自动应答就是,接到任务就认为完成,
PrefetchCount
PrefetchSize