前言
本文对nsq基础设施简单带过,主要集中在,生产中如何部署健康,高可用的分布式消息队列。
详细介绍可以参考官方文档: https://nsq.io
1.简介
- nsq 是一款高可用的,可单点可分布式的消息队列。
- nsq是生产者消费者经典模式,包含nsqd和lookupd两个broker角色,但各自的数量可以拓展。
- 生产者通过直接向nsqd投递话题消息,消息直接存放进nsqd所在服务器。
- 消费者不直接向nsqd索要对应话题,而是通过lookupd来索取话题消息,索取时可以通过channel来标记索取者渠道。
2.示例代码
import "github.com/nsqio/go-nsq"
生产者-生产消息
var producer *nsq.Producer
var nsqdAddrTCP = "localhost:4150"
var conf *nsq.Config
conf = nsq.NewConfig()
producer, er = nsq.NewProducer(nsqdAddrTCP, conf)
if er != nil {
log.Println(er.Error())
return
}
// 并发发布3000个消息
for i := 0; i < 3000; i++ {
go func(i int) {
er := producer.Publish("go-nsq_testcase", []byte(fmt.Sprintf("hello,everyone_%d", i)))
if er != nil {
log.Println(er.Error())
return
}
}(i)
消费者-消费消息
type ConsumerT struct{
}
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func main() {
var nsqlookupdAddr = "localhost:4161" // help consumer find topics
var channel = "channel_1" // 消费者渠道标记
var consumers []*nsq.Consumer
// 创建100个消费者
consumers = make([]*nsq.Consumer, 100)
for i, _ := range consumers {
consumers[i], e = nsq.NewConsumer("go-nsq_testcase", channel, conf)
if e != nil {
log.Println(e.Error())
return
}
consumers[i].SetLogger(nil, 0)
consumers[i].AddHandler(&ConsumerT{
}) // 添加消费者接口
if e = consumers[i].ConnectToNSQLookupd(nsqlookupdAddr);e!=nil {
log.Println(e)
return
}
}
}
3.nsq架构
3-7个lookupd - 数百个nsqd - 数万消费者
这个比例是健康的
3.1消费者如何通过lookupd找到话题
nsqd在发起时,必须绑定到一个lookupd:
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
也就说,消费者必须提前知道,他要的话题,被投递到了哪个nsqd上,可以通过对应的lookupd获取到该消息。
3.2 如果生产者投递的nsqd服务器负载过高,如何增加节点均衡。
假定订单数据被投递到了nsqd1-lookupd1,消费者通过lookupd1来消费订单数据。当订单数据积压生产超过了500万,nsqd1服务器太累了,那么此时,就应该开一个新的服务器,并运行一个nsqd2.
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
那么,只要nsqd2和nsqd1一样,绑定到同一个lookupd,那么消费者就可以直接消费到新增的nsqd2上的数据。
总之,保证以下一点,即可动态拓展节点:
- 直接新增nsqd节点,并将该节点,绑定到同一个lookupd上
3.3 nsq怎么解决重复投递的问题
很遗憾,nsq不能解决重复投递问题,需要客户端自己去处理。可以参考数据库唯一键,redis SETNX。不过前提是,消息本身具备一个唯一性的id。
3.4 nsq怎么解决执行顺序
很遗憾,nsq同样不保证消息顺序。那么消费者如果有顺序需求,怎么办呢?
解决方案: 标记消息权重,使用redis排行榜机制(zadd-添加,zrange-列表,zcard-长度,复杂度logN)
- 在消息中,增加标记头,头部包含以下信息:
{
"total_number": 10,
"number":1
}
- 消费者在消费时
// 自带去重,不怕重复消费
conn.Do("ZADD", "话题名", number, "消息")
// 插入后检查长度
length ,_ := redis.Int64(conn.Do("ZCARD", "话题名"))
if length == total_number {
messagss, _ := redis.Bytes(conn.Do("ZRANGE", "话题名",0, -1))
// 按照messages顺序消费完。
for i,v:=range message{
handle(v)
}
}
// 最后,设置定期销毁该定序队列
// 该步不可忽视,因为可能出现单个消息重复投递,如果在消费完10个数据后,重复数据到达消费者这里,
// 那么该话题就会永久保留一个多余的消息,不会失效。
conn.Do("Expire", "话题名", 60 *60 * 24)
3.5 nsqd挂了,消息会丢失吗?
默认配置下,会丢失,但是nsqd是可以配置成持久化的。
nsqd --lookupd-tcp-address=127.0.0.1:4160 --mem-queue-size 0
mem-queue-size设置成0后,则都会持久化,重启不会造成丢失。
3.6 nsqd和lookupd有运行顺序吗?
有,先lookupd,再nsqd