环境
kafka的安装
zookeeper的安装
第三方库准备
- 安装依赖库sarama
go get github.com/Shopify/sarama
- sarama-cluster依赖库(如果你需要的是集群模式)
go get github.com/bsm/sarama-cluster
同步消费模式
var Address = []string{"127.0.0.1:9092"}
func main() {
syncProducer(Address)
}
func syncProducer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer p.Close()
topic := "test"
srcValue := "sync: this is a message. index=%d"
for i:=0; i<10; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Topic:topic,
Value:sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Printf("send message(%s) err=%s \n", value, err)
}else {
fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)
}
time.Sleep(2*time.Second)
}
}
异步消费模式
func asyncProducer1(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
var enqueued, successes, errors int
wg.Add(2)
go func() {
defer wg.Done()
for range p.Successes() {
successes++
}
}()
go func() {
defer wg.Done()
for err := range p.Errors() {
log.Printf("%s 发送失败,err:%s\n", err.Msg, err.Err)
errors++
}
}()
asrcValue := "async-goroutine: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg:
enqueued++
fmt.Fprintln(os.Stdout, value)
case <-signals:
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
wg.Wait()
fmt.Fprintf(os.Stdout, "发送数=%d,发送成功数=%d,发送失败数=%d \n", enqueued, successes, errors)
}
异步消息之Select
//异步消费者(Select):同一线程内,通过select同时发送消息 和 处理errors计数。
//该方式效率较低,如果有大量消息发送, 很容易导致success和errors的case无法执行,从而阻塞一定时间。
//当然可以通过设置config.Producer.Return.Successes=false
func asyncProducer2(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Errors = true
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
//Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, successes, errors int
asrcValue := "async-select: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg:
fmt.Fprintln(os.Stdout, value)
enqueued++
case <-p.Successes():
successes++
case err := <-p.Errors():
log.Printf("%s 发送失败,err:%s\n", err.Msg, err.Err)
errors++
case <-signals:
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
fmt.Fprintf(os.Stdout, "发送数=%d,发送失败数=%d \n", enqueued, errors)
}
消费者
import (
"sync"
"log"
"fmt"
"os"
"github.com/Shopify/sarama"
"os/signal"
"github.com/bsm/sarama-cluster"
)
var Address = []string{"127.0.0.1:9092"}
func main() {
topic := []string{"nginx_log"}
var wg = &sync.WaitGroup{}
wg.Add(2)
go clusterConsumer(wg, Address, topic, "group-1")
go clusterConsumer(wg, Address, topic, "group-2")
wg.Wait()
}
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
if err != nil {
log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
return
}
defer consumer.Close()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for err := range consumer.Errors() {
log.Printf("%s:Error: %s\n", groupId, err.Error())
}
}()
go func() {
for ntf := range consumer.Notifications() {
log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
}
}()
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "")
successes++
}
case <-signals:
break Loop
}
}
fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}
参考资料:https://blog.csdn.net/kdpujie/article/details/79093595