前一篇文章,我们讨论了在micro中如何发布/订阅消息。它的优点是简单,缺点是缺乏灵活性。如果你想使用底层去发送与接收消息,需要使用接口:github.com/micro/go-micro/v2/broker.Broker
这个接口是micro异步消息处理的核心,事实上,发布/订阅也依赖它
下面,使用一个例子来使用Broker
package main
import (
"fmt"
"github.com/micro/go-micro/v2"
"github.com/micro/go-micro/v2/broker"
"log"
"time"
)
var topic string = "com.foo.topic"
func pub(brk broker.Broker) {
i := 0
for range time.Tick(time.Second) {
msg := &broker.Message{
Header: map[string]string{"id" : fmt.Sprintf("%d", i)},
Body: []byte(fmt.Sprintf("%d:%s", i, time.Now().String())),
}
if err := brk.Publish(topic,msg); err != nil {
log.Printf("[pub] failed: %v\n", err)
} else {
log.Printf("[pub] pubbed message:%s\n", string(msg.Body))
}
i++
}
}
func sub(brk broker.Broker) {
_, err := brk.Subscribe(topic, func(event broker.Event) error {
fmt.Println("[sub] received message:", string(event.Message().Body), ", header:", event.Message().Header)
return nil
}, broker.Queue(topic))
if err != nil {
fmt.Println(err)
}
}
func main() {
service := micro.NewService(
micro.Name("com.foo.broker.example"),
)
service.Init(micro.AfterStart(func() error {
brk := service.Options().Broker
if err := brk.Connect(); err != nil {
log.Fatalf("Broker connect error:%v", err)
}
go sub(brk)
go pub(brk)
return nil
}))
service.Run()
}
结果:
2020-05-25 10:23:47.846793 I | [pub] pubbed message:0:2020-05-25 10:23:47.845600764 +0800 CST m=+1.747833554
[sub] received message: 0:2020-05-25 10:23:47.845600764 +0800 CST m=+1.747833554 , header: map[Micro-Topic:com.foo.topic id:0]
2020-05-25 10:23:48.845661 I | [pub] pubbed message:1:2020-05-25 10:23:48.845603507 +0800 CST m=+2.747836227
[sub] received message: 1:2020-05-25 10:23:48.845603507 +0800 CST m=+2.747836227 , header: map[Micro-Topic:com.foo.topic id:1]
2020-05-25 10:23:49.845608 I | [pub] pubbed message:2:2020-05-25 10:23:49.845576914 +0800 CST m=+3.747809559
[sub] received message: 2:2020-05-25 10:23:49.845576914 +0800 CST m=+3.747809559 , header: map[Micro-Topic:com.foo.topic id:2]
2020-05-25 10:23:50.845687 I | [pub] pubbed message:3:2020-05-25 10:23:50.845612017 +0800 CST m=+4.747844777
[sub] received message: 3:2020-05-25 10:23:50.845612017 +0800 CST m=+4.747844777 , header: map[Micro-Topic:com.foo.topic id:3]
2020-05-25 10:23:51.845613 I | [pub] pubbed message:4:2020-05-25 10:23:51.845573602 +0800 CST m=+5.747806265
[sub] received message: 4:2020-05-25 10:23:51.845573602 +0800 CST m=+5.747806265 , header: map[Micro-Topic:com.foo.topic id:4]
Broker
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg, opts...)
}
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler, opts...)
}
拿broker.Subscribe方法来说,第三个参数为SubscribeOption,它的含义与server.SubscribeOption 相同(前一篇文章). 有三种设置broker.DisableAutoAck, broker.Queue, broker.SubscribeContext, 它们与server.SubscribeOption 相同。不同的是其他broker 插件也可以提供额外的broker.SubscribeOption来实现插件具体的方法
例如:
在go.mod中加入
github.com/micro/go-plugins v1.5.1
...
import "github.com/micro/go-plugins/broker/rabbitmq"
...
_, err := broker.Subscribe(topic, func(p broker.Event) error {
...
p.Ack()
return nil
}, broker.Queue(topic), broker.DisableAutoAck(), rabbitmq.DurableQueue(),)
注意:,如果使用了DisableAutoAck, 我们可以在方法中执行p.Ack()来进行确认
使用kafka作为消息队列
首先,go.mod:
module micro-hello-pub
go 1.13
require (
github.com/golang/protobuf v1.4.0
github.com/micro/go-micro/v2 v2.7.0
github.com/micro/go-plugins/broker/kafka/v2 v2.5.0
)
main.go
package main
import (
"fmt"
"github.com/micro/go-micro/v2"
"github.com/micro/go-micro/v2/broker"
_ "github.com/micro/go-plugins/broker/kafka/v2"
"log"
"time"
)
var (
topic string = "com.foo.topic"
send_i = 0
recv_i = 0
)
func pub(brk broker.Broker) {
for range time.Tick(time.Second) {
msg := &broker.Message{
Header: map[string]string{"id" : fmt.Sprintf("%d", send_i)},
Body: []byte(fmt.Sprintf("%d:%s", send_i, time.Now().String())),
}
if err := brk.Publish(topic,msg); err != nil {
log.Printf("[pub] failed: %v\n", err)
} else {
log.Printf("[pub] pubbed message:%s\n", string(msg.Body))
}
send_i++
}
}
func sub(brk broker.Broker) {
_, err := brk.Subscribe(topic, func(event broker.Event) error {
recv_i++
fmt.Println("[sub] received message:", string(event.Message().Body), ", header:", event.Message().Header)
return nil
}, broker.Queue(topic))
if err != nil {
fmt.Println(err)
}
}
func main() {
service := micro.NewService(
micro.Name("com.foo.broker.example"),
)
service.Init(micro.AfterStart(func() error {
brk := service.Options().Broker
if err := brk.Connect(); err != nil {
log.Fatalf("Broker connect error:%v", err)
}
go sub(brk)
go pub(brk)
return nil
}),
micro.BeforeStop(func() error {
log.Printf("send:%d,recv:%d\n", send_i, recv_i)
return nil
}))
service.Run()
}
启动时,在environment中加入:
MICRO_BROKER=kafka;MICRO_BROKER_ADDRESS=1127.0.0.1:9192,127.0.0.1:9292,127.0.0.1:9392
结果:
luslin@local:~/go/workspace/tools/micro-hello-pub$ MICRO_REGISTRY=etcd MICRO_BROKER=kafka MICRO_BROKER_ADDRESS=127.0.0.1:9192,127.0.0.1:9292,127.0.0.1:9392 ./micro_hello_pub
2020-05-25 14:25:04 file=auth.go:31 level=info Auth [noop] Authenticated as com.foo.broker.example-72df0e38-5791-467d-af6f-1019fcb17662 in the go.micro namespace
2020-05-25 14:25:04 file=service.go:205 level=info Starting [service] com.foo.broker.example
2020-05-25 14:25:04 file=grpc.go:845 level=info Server [grpc] Listening on [::]:45751
2020-05-25 14:25:04 file=grpc.go:676 level=info Registry [etcd] Registering node: com.foo.broker.example-72df0e38-5791-467d-af6f-1019fcb17662
[sub] received message: 0:2020-05-25 14:25:05.978230301 +0800 CST m=+1.882055700 , header: map[id:0]
2020-05-25 14:25:05.985487 I | [pub] pubbed message:0:2020-05-25 14:25:05.978230301 +0800 CST m=+1.882055700
[sub] received message: 3:2020-05-25 14:22:33.897258745 +0800 CST m=+4.857683803 , header: map[id:3]
2020-05-25 14:25:06.987850 I | [pub] pubbed message:1:2020-05-25 14:25:06.978188043 +0800 CST m=+2.882013423
[sub] received message: 1:2020-05-25 14:25:06.978188043 +0800 CST m=+2.882013423 , header: map[id:1]
^C2020-05-25 14:25:07.468866 I | send:2,recv:3
2020-05-25 14:25:07 file=grpc.go:772 level=info Deregistering node: com.foo.broker.example-72df0e38-5791-467d-af6f-1019fcb17662
2020-05-25 14:25:07 file=grpc.go:940 level=info Broker [kafka] Disconnected from 127.0.0.1:9192