NSQ怎么架构好?

前言

本文对nsq基础设施简单带过,主要集中在,生产中如何部署健康,高可用的分布式消息队列。
详细介绍可以参考官方文档: https://nsq.io

1.简介

  • nsq 是一款高可用的,可单点可分布式的消息队列。
  • nsq是生产者消费者经典模式,包含nsqd和lookupd两个broker角色,但各自的数量可以拓展。
  • 生产者通过直接向nsqd投递话题消息,消息直接存放进nsqd所在服务器。
  • 消费者不直接向nsqd索要对应话题,而是通过lookupd来索取话题消息,索取时可以通过channel来标记索取者渠道。

2.示例代码

import "github.com/nsqio/go-nsq"

生产者-生产消息

var producer *nsq.Producer
var nsqdAddrTCP = "localhost:4150"
var conf *nsq.Config
conf = nsq.NewConfig()

producer, er = nsq.NewProducer(nsqdAddrTCP, conf)
if er != nil {
    
    
	log.Println(er.Error())
	return
}

	// 并发发布3000个消息
for i := 0; i < 3000; i++ {
    
    
	go func(i int) {
    
    
		er := producer.Publish("go-nsq_testcase", []byte(fmt.Sprintf("hello,everyone_%d", i)))
		if er != nil {
    
    
			log.Println(er.Error())
			return
		}
}(i)

消费者-消费消息

type ConsumerT struct{
    
    }
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    
    
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}
func main() {
    
    
	var nsqlookupdAddr = "localhost:4161" // help consumer find topics
	var channel = "channel_1"  // 消费者渠道标记
	var consumers []*nsq.Consumer
	// 创建100个消费者
	consumers = make([]*nsq.Consumer, 100)
	for i, _ := range consumers {
    
    
		consumers[i], e = nsq.NewConsumer("go-nsq_testcase", channel, conf)
		if e != nil {
    
    
			log.Println(e.Error())
			return
		}
		consumers[i].SetLogger(nil, 0)
		consumers[i].AddHandler(&ConsumerT{
    
    }) // 添加消费者接口
	
		if e = consumers[i].ConnectToNSQLookupd(nsqlookupdAddr);e!=nil {
    
    
			log.Println(e)
			return
		}
	}
}

3.nsq架构

3-7个lookupd - 数百个nsqd - 数万消费者
这个比例是健康的

3.1消费者如何通过lookupd找到话题

nsqd在发起时,必须绑定到一个lookupd:

$ nsqd --lookupd-tcp-address=127.0.0.1:4160

也就说,消费者必须提前知道,他要的话题,被投递到了哪个nsqd上,可以通过对应的lookupd获取到该消息。

3.2 如果生产者投递的nsqd服务器负载过高,如何增加节点均衡。

假定订单数据被投递到了nsqd1-lookupd1,消费者通过lookupd1来消费订单数据。当订单数据积压生产超过了500万,nsqd1服务器太累了,那么此时,就应该开一个新的服务器,并运行一个nsqd2.

$ nsqd --lookupd-tcp-address=127.0.0.1:4160

那么,只要nsqd2和nsqd1一样,绑定到同一个lookupd,那么消费者就可以直接消费到新增的nsqd2上的数据。
总之,保证以下一点,即可动态拓展节点:

  • 直接新增nsqd节点,并将该节点,绑定到同一个lookupd上

3.3 nsq怎么解决重复投递的问题

很遗憾,nsq不能解决重复投递问题,需要客户端自己去处理。可以参考数据库唯一键,redis SETNX。不过前提是,消息本身具备一个唯一性的id。

3.4 nsq怎么解决执行顺序

很遗憾,nsq同样不保证消息顺序。那么消费者如果有顺序需求,怎么办呢?
解决方案: 标记消息权重,使用redis排行榜机制(zadd-添加,zrange-列表,zcard-长度,复杂度logN)

  • 在消息中,增加标记头,头部包含以下信息:
{
    
    
    "total_number": 10,
    "number":1
}
  • 消费者在消费时
// 自带去重,不怕重复消费
conn.Do("ZADD", "话题名", number, "消息")

// 插入后检查长度
length ,_ := redis.Int64(conn.Do("ZCARD", "话题名"))

if length == total_number {
    
    
    messagss, _ := redis.Bytes(conn.Do("ZRANGE", "话题名",0, -1))
    // 按照messages顺序消费完。
    for i,v:=range message{
    
    
        handle(v)        
    }
}
// 最后,设置定期销毁该定序队列
// 该步不可忽视,因为可能出现单个消息重复投递,如果在消费完10个数据后,重复数据到达消费者这里,
// 那么该话题就会永久保留一个多余的消息,不会失效。
conn.Do("Expire", "话题名", 60 *60 * 24)

3.5 nsqd挂了,消息会丢失吗?

默认配置下,会丢失,但是nsqd是可以配置成持久化的。

nsqd --lookupd-tcp-address=127.0.0.1:4160 --mem-queue-size 0

mem-queue-size设置成0后,则都会持久化,重启不会造成丢失。

3.6 nsqd和lookupd有运行顺序吗?

有,先lookupd,再nsqd

猜你喜欢

转载自blog.csdn.net/fwhezfwhez/article/details/105411482