golang使用sarama生产和消费kafka

关于kafka相关架构及应用场景等,请见笔者相关文章

1.消费kafka消息

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"time"
)


var (
	kafkaConsumer *cluster.Consumer
	kafkaBrokers = []string{
    
    "127.0.0.1:9092"}
	kafkaTopic   = "test_topic_1"
	groupId = "csdn_test_1"
)

func init() {
    
    
	var err error
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	config.Consumer.Offsets.Initial = -2
	config.Consumer.Offsets.CommitInterval = 1 * time.Second
	config.Group.Return.Notifications = true
	kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{
    
    kafkaTopic}, config)
	if err != nil {
    
    
		panic(err.Error())
	}
	if kafkaConsumer == nil {
    
    
		panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId))
	}
	fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic)
}

func main() {
    
    
	for {
    
    
		select {
    
    
		case msg, ok := <-kafkaConsumer.Messages():
			if ok {
    
    
				fmt.Printf("kafka msg: %s \n", msg.Value)
				kafkaConsumer.MarkOffset(msg, "")
			} else {
    
    
				fmt.Printf("kafka 监听服务失败")
			}
		case err, ok := <-kafkaConsumer.Errors():
			if ok {
    
    
				fmt.Printf("consumer error: %v" , err)
			}
		case ntf, ok := <-kafkaConsumer.Notifications():
			if ok {
    
    
				fmt.Printf("consumer notification: %v" , ntf)
			}
		}
	}
}

2.生产kafka消息

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"time"
)

func main() {
    
    
	startProduce()
}

var (
	producer sarama.SyncProducer
	brokers = []string{
    
    "127.0.0.1:9092"}
	topic   = "test_topic_1"
)

func init() {
    
    
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true
	brokers := brokers
	var err error
	producer, err = sarama.NewSyncProducer(brokers, config)
	if err != nil {
    
    
		fmt.Printf("init producer failed -> %v \n", err)
		panic(err)
	}
	fmt.Println("producer init success")
}

func produceMsg(msg string) {
    
    
	msgX := &sarama.ProducerMessage{
    
    
		Topic: topic,
		Value: sarama.StringEncoder(msg),
	}
	fmt.Printf("SendMsg -> %v\n", dumpString(msgX))

	partition, offset, err := producer.SendMessage(msgX)
	if err != nil {
    
    
		fmt.Printf("send msg error:%s \n", err)
	}
	fmt.Printf("msg send success, message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset)
}

func startProduce() {
    
    
	tick := time.Tick(2 * time.Second)
	for {
    
    
		select {
    
    
		case <-tick:
			t := time.Now().Unix() * 1000
			msg := fmt.Sprintf("{\"timestamp\":%d}", t)
			produceMsg(msg)
		}
	}
}

//解析为json字符串
func dumpString(v interface{
    
    }) (str string) {
    
    

	bs, err := json.Marshal(v)
	b := bytes.Buffer{
    
    }
	if err != nil {
    
    
		b.WriteString("{err:\"json format error.")
		b.WriteString(err.Error())
		b.WriteString("\"}")
	} else {
    
    
		b.Write(bs)
	}
	str = b.String()
	return str
}

猜你喜欢

转载自blog.csdn.net/m0_38075425/article/details/109118809