关于kafka相关架构及应用场景等,请见笔者相关文章。
1.消费kafka消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"time"
)
var (
kafkaConsumer *cluster.Consumer
kafkaBrokers = []string{
"127.0.0.1:9092"}
kafkaTopic = "test_topic_1"
groupId = "csdn_test_1"
)
func init() {
var err error
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = -2
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Group.Return.Notifications = true
kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{
kafkaTopic}, config)
if err != nil {
panic(err.Error())
}
if kafkaConsumer == nil {
panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId))
}
fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic)
}
func main() {
for {
select {
case msg, ok := <-kafkaConsumer.Messages():
if ok {
fmt.Printf("kafka msg: %s \n", msg.Value)
kafkaConsumer.MarkOffset(msg, "")
} else {
fmt.Printf("kafka 监听服务失败")
}
case err, ok := <-kafkaConsumer.Errors():
if ok {
fmt.Printf("consumer error: %v" , err)
}
case ntf, ok := <-kafkaConsumer.Notifications():
if ok {
fmt.Printf("consumer notification: %v" , ntf)
}
}
}
}
2.生产kafka消息
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"time"
)
func main() {
startProduce()
}
var (
producer sarama.SyncProducer
brokers = []string{
"127.0.0.1:9092"}
topic = "test_topic_1"
)
func init() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
brokers := brokers
var err error
producer, err = sarama.NewSyncProducer(brokers, config)
if err != nil {
fmt.Printf("init producer failed -> %v \n", err)
panic(err)
}
fmt.Println("producer init success")
}
func produceMsg(msg string) {
msgX := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
}
fmt.Printf("SendMsg -> %v\n", dumpString(msgX))
partition, offset, err := producer.SendMessage(msgX)
if err != nil {
fmt.Printf("send msg error:%s \n", err)
}
fmt.Printf("msg send success, message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}
func startProduce() {
tick := time.Tick(2 * time.Second)
for {
select {
case <-tick:
t := time.Now().Unix() * 1000
msg := fmt.Sprintf("{\"timestamp\":%d}", t)
produceMsg(msg)
}
}
}
//解析为json字符串
func dumpString(v interface{
}) (str string) {
bs, err := json.Marshal(v)
b := bytes.Buffer{
}
if err != nil {
b.WriteString("{err:\"json format error.")
b.WriteString(err.Error())
b.WriteString("\"}")
} else {
b.Write(bs)
}
str = b.String()
return str
}