在项目实战中就会发现,其实不管是微服务也好,DDD也好,都是为了履行设计原则里的低耦合、高内聚而无论是RabbitMQ还是Kafka,都是通过消息队列的方式对系统进行解耦,在从入门到放弃系列里我详细介绍过其使用背景以及模型等,由于最近的项目用到了RabbitMQ和Kafka,所以索性搭建一个简单的生产者消费者模型。
生产者端
在生产者端,生产者不停的生产消息并发送到kafka的服务器集群上,依据自己的topic和partition:
我们发送Kafka消息的时候,外层的封装方法如下,需要传递一个Kafka的topic、一个用来计算Partition的标识tenantId以及需要传递的消息。
public static bool SendKafkaExportData(
string appName,
int tenantId,
int userId,
string metaObjName,
string viewName,
string exportFileName,
SearchCondition condition,
string version = null,
int total = -1,
ExportFileType fileType = ExportFileType.Xlsx,
string applicationContext = null,
string msgTemplate = null)
{
Common.HelperObjects.ArgumentHelper.AssertNotEmpty(appName, nameof (appName));
Common.HelperObjects.ArgumentHelper.AssertNotEmpty(metaObjName, nameof (metaObjName));
Common.HelperObjects.ArgumentHelper.AssertNotEmpty(viewName, nameof (viewName));
Common.HelperObjects.ArgumentHelper.AssertNotEmpty(exportFileName, nameof (exportFileName));
Common.HelperObjects.ArgumentHelper.AssertPositive(tenantId, nameof (tenantId));
Common.HelperObjects.ArgumentHelper.AssertPositive(userId, nameof (userId));
Common.HelperObjects.ArgumentHelper.AssertNotNull<SearchCondition>(condition, nameof (condition));
bool flag = true;
try
{
ExportRequestDataModel exportRequestData = ExportRequestDataModel.GetExportRequestData(appName, tenantId, userId, metaObjName, viewName, exportFileName, condition, version, total, fileType, applicationContext, msgTemplate);
long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);
ExportRequestDataModel.logger.Debug((object) string.Format("{0}-{1}-{2}发送Kafka消息{3}成功", (object) appName, (object) tenantId, (object) userId, (object) num));
}
catch (Exception ex)
{
ExportRequestDataModel.logger.Error((object) string.Format("{0}-{1}-{2}发送Kafka消息异常", (object) appName, (object) tenantId, (object) userId), ex);
flag = false;
}
return flag;
}
而其中的核心方法: long num = KafkaProducer.Send<ExportRequestDataModel>("TMLSent", tenantId, exportRequestData);
的实现逻辑如下,将kafka携带的消息序列化为二进制数组:
/// <summary>Send a message to a topic.</summary>
/// <param name="topic">The name of the topic to send the message to.</param>
/// <param name="tenant">The id of the tenant the message belongs to.</param>
/// <param name="value">The message content.</param>
/// <returns>The offset of the message.</returns>
public static long Send<T>(string topic, int tenant, T value) where T : IBinarySerializable
{
ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
ArgumentHelper.AssertPositive(tenant, nameof (tenant));
return KafkaProducer.Send(topic, tenant, (object) value == null ? (byte[]) null : BigEndianEncoder.Encode<T>(value));
}
消息发送机制如下,获取到需要的topic,用于计算Partition的标识tenantId以及序列化后可以直接发送的二进制字符串消息:
/// <summary>Send a message to a topic.</summary>
/// <param name="topic">The name of the topic to send the message to.</param>
/// <param name="tenant">The id of the tenant the message belongs to.</param>
/// <param name="value">The message content.</param>
/// <returns>The offset of the message.</returns>
public static long Send(string topic, int tenant, byte[] value)
{
ArgumentHelper.AssertNotEmpty(topic, nameof (topic));
ArgumentHelper.AssertPositive(tenant, nameof (tenant));
try
{
return KafkaProtocol.Produce(topic, tenant, value);
}
catch (ConnectionPoolException ex)
{
return KafkaProtocol.Produce(topic, tenant, value);
}
catch (KafkaException ex)
{
if (ex.Error == ErrorCode.NotLeaderForPartition || ex.Error == ErrorCode.LeaderNotAvailable)
return KafkaProtocol.Produce(topic, tenant, value);
throw;
}
}
核心的发送方法为:
public static long Produce(string topic, int tenant, byte[] value)
{
TopicConfig topicConfig = BaseConfig<KafkaMapping>.Instance.GetTopicConfig(topic);
int num = tenant % KafkaProtocol.GetTopicPartitionCount(topic); //计算
int partitionLeader = KafkaProtocol.GetPartitionLeader(topic, num); //设置leader
try
{
using (KafkaSession kafkaSession = new KafkaSession(topicConfig.Cluster, partitionLeader)) //创建一个kafka消息发送实例
{
Message message = new Message(value, TimeUtil.CurrentTimestamp);
ProduceRequest request = new ProduceRequest((IDictionary<TopicAndPartition, MessageSet>) new Dictionary<TopicAndPartition, MessageSet>()
{
{
new TopicAndPartition(topic, num), //设置topic和partition
new MessageSet(topicConfig.Codecs, (IList<Message>) new List<Message>()
{
message
})
}
}); //设置要发送的消息
ProduceResponse produceResponse = kafkaSession.Issue<ProduceRequest, ProduceResponse>(request); //发送Kafka消息并
KafkaProtocol.CheckErrorCode(produceResponse.Error, topic, new int?(num), new int?(tenant));
return produceResponse.Offset;
}
}
catch (Exception ex)
{
KafkaProtocol.RefreshPartitionMetadata(topic);
throw;
}
}
这样一个我们需要传递的消息就发送到对应的topic和对应的partition上了(不同的partition可以存放在不同 的机器上,这样取同样余数的租户的数据会被放置到相同分区),无需再自己封装消息分发。
消费者端
在消费者端,提供消费者集群进行分区消费,需要注意的是:对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费(保证了一个分区里的消息在一个Group里不会被消费者争夺执行),因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息
在消费者端,机器需要预热并开启消息消费服务,当然也要有关闭消息服务的方法,开启消费服务意味着开启消息接收和开启消息处理线程,关闭消息服务同理表示关闭消息接收和关闭消息处理线程。
/// <summary>
/// 接收导出消息的服务
/// </summary>
public class ReceiveMsgProvider : IReceiveMsgProvider
{
#region 日志、构造方法以及单例
protected static readonly LogWrapper Logger = new LogWrapper();
private ReceiveMsgProvider()
{
}
public static ReceiveMsgProvider Instance { get; } = new ReceiveMsgProvider();
#endregion 日志、构造方法以及单例
#region 开启消息接收服务
public bool _ActivateService()
{
// 预热
Cloud.Plugins.Helper.ESBProxy.WarmUp();
//开启消息接收服务
StartMessageService();
//开始处理ExportQueue队列中的消息
ExportConsumer.Instance.BeginImportData();
Logger.Debug("_ActivateService was called.");
return true;
}
protected void StartMessageService()
{
try
{
//开始消费消息
ExportConsumer.Instance.Start();
}
catch (Exception ex)
{
Logger.Error(ex);
}
}
#endregion 开启消息接收服务
#region 关闭消息接收服务
public bool _UnActivateService()
{
//关闭消息接收服务
StopMessageService();
//关闭处理queue的线程
ExportConsumer.CloseQueueThreads();
Logger.Debug("_UnActivateService was called.");
return true;
}
protected void StopMessageService()
{
try
{
//停止消费消息
ExportConsumer.Instance.Stop();
}
catch (Exception ex)
{
Logger.Error(ex);
}
}
}
其中,开启和关闭消息接收服务的核心方法如下:
/// <summary>
/// ESB服务调用入口:启动
/// </summary>
public void Start()
{
_loggging.Debug("ESB服务调用入口:启动");
_consumer = new KafkaGroupConsumer(ExportKafkaConst.ExportKafkaConsumerGroup, ExportKafkaConst.ExportKafkaTopic, OnMessage); //OnMessage即是处理消费逻辑的方法
_consumer .Start();
}
/// <summary>
/// ESB服务调用入口:停止
/// </summary>
public void Stop()
{
_loggging.Debug("ESB服务调用入口:停止");
if (_consumer != null && _consumer .IsRunning)
{
_consumer .Stop();
}
}
当然这一套生产消费体系搭建起来后最重要的就是接收和消费消息啦:
/// <summary>
/// 接收导出消息并放置到缓存队列里
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public bool OnMessage(Message context)
{
logger.Debug(string.Format("接收到消息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(context.Value)));
ExportRequestDataModel data = null;
try
{
//读取消息内容
data = BigEndianEncoder.Decode<ExportRequestDataModel>(context.Value);
if (data == null || data.MsgType != ExportMessageType.Export)
{
//无消息需要处理
return true;
}
else
{
logger.Debug(string.Format("成功取到数据:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(data)));
}
bool enQueueResult = ExportQueue.En_Queue(data, ApplicationContext.Current.TenantId);
if (!enQueueResult)
{
logger.Error("导出数据 In_Queue失败:userId:" + ApplicationContext.Current.UserId + " tenantId:" + ApplicationContext.Current.TenantId);
}
}
catch (Exception ex)
{
var contextInfo = JsonConvert.SerializeObject(context);
logger.Error($"导出写入队列失败,接收到的消息为:{contextInfo} ,信息异常信息:" + ex);
}
return true;
}
消息一个个入队后当然接下来就要使用我们启动的一批线程去消费啦:
/// <summary>
/// 开始对ExportQueue队列中的数据进行梳理
/// </summary>
public void BeginImportData()
{
///初始化线程List
ExportQueue.InstanceExportThreadsList();
int count = ExportQueue.ExportThreadsList.Count;
for (int i = 0; i < count; i++)
{
logger.Debug("开启线程th_" + i + "");
ExportQueue.ExportThreadsList[i] = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
ExportQueue.ExportThreadsList[i].Start(i);
}
// 处理小队列
ExportQueue.ExportSmallThread = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(DealExportDataInQueue));
ExportQueue.ExportSmallThread.Start(ExportQueue.SmallIndex);
}
使用Kafka实现生产者消费者系统的整体流程就是这样,说实话我还是对某些实现细节一知半解,但总体而言知道了全套流程是怎么跑通的,至于一些更高阶的认知,期望从之后更深入的学习中来了解到。