消息队列中间件是分布式系统中的重要组件,主要解决应用耦合、异步消息、流量削锋等问题。可帮助实现高性能,高可用,可伸缩和最终一致性的架构
在消息队列方面,除了 ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ,Kafka等,还有很多其他的竞争者。这篇文章我们不会去讲解它们之间的区别,仅只详细的介绍一下 ActiveMQ,以及它在 .NET 中的使用
消息队列应用场景
异步任务
比如有以下场景:现在很多网站或App注册时都采用了验证码的机制,因此,当服务器收到客户端发起获取验证码的请求,有以下处理方式
- 在当前线程中立即发送短信(会阻塞当前线程一小会儿)
- 新建立一个线程发送短信(在 .NET 中建立一个 Task 就行)
- 交由其他的服务来处理这个任务(转发给消息队列,让消息队列处理)
那么,以上几种方式哪种更好呢?
- 第一种:实时性肯定更好,收到请求立即处理,但它阻塞了当前线程,会造成其他客户端的请求被阻塞(请求少的时候我们可能根本感觉不到);
- 第二种:在当前进程中建立一个线程来处理,实时性不如第一种,但它不会阻塞其他客户端的请求。不过一个进程中能创建的线程数量有限,因此也有瓶颈
- 第三种:使用其他特定场景的服务,这种实时性最差(但如果服务器配置好,我们也不一定能感觉到差异),但其是使用的最多的,并且其上线后效果是最好的(稳定性、可伸缩性)
因此,如果是正式上线的版本(比如项目初期用于验证市场的版本,往往会为了速度而不考虑架构,这时可能会选择第一种或第二种方案),且峰值较高的服务,选用第三种方案无疑是最好的。因为对于上线的服务,稳定性是非常重要的
对于发送短信这样的任务(对实时性要求不是那么高),使用消息队列是非常合适的。将任务交由消息队列之后,发送短信具体要做的事情主服务就不需要干涉了。如果需要,主服务订阅任务的处理结果即可(发送成功或者失败)。这样,主服务就可以继续处理其他客户端的请求,并且,有消息队列的参与,主服务的压力就没有那么重了
当然,实际项目中,这样的场景还有很多,比如记录日志,我们都知道,写文件(磁盘I/O)很耗时。因此现在很多大型的服务,都有专门的日志服务器来处理其他服务器发送过来的日志,这时候我们可以使用 Kafka 来做这样的事情(因为它就是为了处理日志而生的)
消息服务
比如现如今的微服务、分布式集群等,各个节点之间的通信,就可以使用消息队列来处理。具体使用什么方式,可更具场景从以下两种选择
- P2P(Point to Point)点对点模式
- Publish/Subscribe(Pub/Sub) 发布订阅模式
后面在给出案例时会具体讲解这两种模式
ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。另外,在很多大型的网站或服务中,也都会使用到它
它具有以下特性
- 多种语言和协议编写客户端
语言:Java、C、C++、C#、Ruby、Perl、Python、PHP;
应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP - 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring 的最新特性
- 通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传输协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器以及点对点的通信
- 支持Ajax
- 支持与Axis的整合
- 可以很容易调用内嵌 JMS provider 进行测试
它的优势
- 稳定性:失败重连机制,持久化服务, 容错机制, 多种恢复机制
- 高效性:支持多种传送协议如TCP, SSL, NIO, UDP等,集群消息在多个代理之间转发防止消息丢失,支持超快的JDBC消息持久化和高效的日志系统
- 可扩展:ActiveMQ 的高级特性都可以配置的形式来表现,很好的实现例如游标,容错机制,消息group及监控服务,同时扩展了很多成熟的框架
- 高级特性:消息群组(Message Groups)、虚拟端点(Virtual Destinations)、通配符(Wildcards)、复合端点(Composite Destinations)
ActiveMQ在Windows上的安装配置
这方面的教程在网上有很多,我们在这就不提供了,只提供一些移动端友好的链接以帮助朋友安装配置
- www.cnblogs.com/yangw/p/591…
- www.cnblogs.com/chy123/p/87…
- www.cnblogs.com/donsenChen/…
- blog.csdn.net/j080624/art…
ActiveMQ在C#中的使用
首先,需要在 Apache官网 上下载 .NET 的驱动,也可以通过以下链接下载
mirrors.hust.edu.cn/apache/acti…
要在项目中使用 ActiveMQ,需要引入上面下载的包中的两个 dll 文件:Apache.NMS.ActiveMQ.dll
和 Apache.NMS.dll
P2P模式案例
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。
每条消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时
P2P的特点:
- 每条消息只有一个消费者(即一旦被消费,消息就会被移除消息队列):在运行了多个消费者之后,一条消息只会有一个消费者收到,其他的消费者是不可以收到的
- 接收者在成功接收消息之后需向队列应答成功:我们可以通过指定应答模式来更改,默认是自动应答模式
因此,如果希望发送的每个消息都会被成功处理的话,则应该P2P模式
示例代码的基类如下
public abstract class ActiveMQBase {
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
public virtual void Init() {
try {
//初始化工厂, 端口默认为61616,指定其他会抛异常
factory = new ConnectionFactory("tcp://localhost:61616");
connection = factory.CreateConnection();
connection.Start();
session = connection.CreateSession();
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
public abstract void Run();
// 释放相关资源
public virtual void Release() {
try {
if (session != null) session.Close();
if (connection != null) connection.Close();
} finally {
session = null;
connection = null;
factory = null;
}
}
}
复制代码
生产者(Producer)如下
public class ActiveMQP2PDemoProducer : ActiveMQBase {
private IMessageProducer messageProducer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
// 指定队列,以实现点对点的通信
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// 创建生产者对象
messageProducer = session.CreateProducer(demoQueue);
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
public override void Run() {
while (true) {
Console.WriteLine("请输入消息,exit 退出");
string line = Console.ReadLine();
if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) {
break;
}
// 创建一条文本消息,在 MessageProvider 中存在多个创建消息的方法
// 在实际项目中灵活选择即可
ITextMessage message = messageProducer.CreateTextMessage(line);
// 发送消息,可调用其他的重载,以设置是否持久化、优先级等特性
messageProducer.Send(message);
}
}
public override void Release() {
base.Release();
try {
if (demoQueue != null) demoQueue.Dispose();
if (messageProducer != null) messageProducer.Close();
} finally {
demoQueue = null;
messageProducer = null;
}
}
}
复制代码
消费者(Consumer)如下
public class ActiveMQP2PDemoComsumer : ActiveMQBase {
private IMessageConsumer messageConsumer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// 创建消息的消费者
messageConsumer = session.CreateConsumer(demoQueue);
// 添加监听,当消息来临时,会触发此事件
messageConsumer.Listener += this.MessageConsumer_Listener;
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
private void MessageConsumer_Listener(IMessage message) {
// 解析接收到的消息
if (message is ITextMessage msg) {
Console.WriteLine($"Received Message: {msg.Text}");
}
}
public override void Run() {
// 此处用于阻止控制台结束,以保证消息可被正确处理
Console.WriteLine("请输入消息,exit 退出");
string line = Console.ReadLine();
}
public override void Release() {
base.Release();
try {
if (demoQueue != null) demoQueue.Dispose();
if (messageConsumer != null){
messageConsumer.Listener -= this.MessageConsumer_Listener;
messageConsumer.Close();
}
} finally {
demoQueue = null;
messageConsumer = null;
}
}
}
复制代码
使用方式如下
// 生产者初始化
ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer();
// 消费者初始化代码则为: ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer();
demo.Init();
demo.Run();
demo.Release();
复制代码
在 ActiveMQ 管理界面可以看到如下,表示生产者发送的消息,都已经被消费者消费了
Pub/Sub模式
Pub/Sub模式:包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic, 系统将这些消息传递给多个订阅者,可以认为生产者与消费者之间是多对多的关系
Pub/Sub的特点
- 每条消息可以有多个消费者
- 为了消费消息,订阅者必须保持运行的状态
- 为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样即使订阅者没有运行,在运行之后它也能接收到发布者的消息
因此,如果允许发送的消息可以被一个或多个消费者消费、或者可以不被消费,那么可以采用 Pub/Sub 模型
在 C# 中,它与 P2P 的使用区别不大,只需要将上述代码生产者和消费者初始化代码中
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
复制代码
这部分换成
demoTopic = new ActiveMQTopic("DEMO_TOPIC");
复制代码
在管理员界面可以看到如下数据
通过示例可以看出,P2P 是基于 Queue 的,而 Pub/Sub 模式则是基于 Topic 的。
在 Pub/Sub 模式下,可以实现多对多的通信,即可以有多个生产者,也可以有多个消费者,一旦有消息到来,它们会都会收到消息。
而P2P模式下,它可以允许有多个生产者,也可以有多个消费者。与 Pub/Sub 不同的是,如果有多个消费者,如果有消息到来,这些消费者会轮流着去消费该消息,而不是每个消费者都收到消息。即一条消息只会有一个消费者
由于在 C# 中,这两种模式的使用方式差别很小,而运行之后产生的行为却差别较大。因此,在实际项目中,我们需要注意这两者之间的区别,以免带来不必要的困惑
实际项目中的一些问题
ActiveMQ服务器宕机怎么办 如果我们想要在服务器宕机之后恢复数据,则需要对消息进行持久化
在通常的情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。它们的最大限制在配置文件的<systemUsage>
节点中配置
但是,在非持久化消息堆积到一定程度,内存告急的时候,ActiveMQ 会将内存中的非持久化消息写入临时文件中,以腾出内存。虽然都保存到了文件里,但它和持久化消息的区别是,重启后持久化消息会从文件中恢复,非持久化的临时文件会直接删除(即重启之后不会从临时文件中恢复消息)
因此,为了保证数据的可靠性
- 尽量使用持久化消息(消息不重要也可以不用持久化)
- 可以将持久化与非持久化文件的限制调大一点,以保证服务最大可用
丢消息
这同样是持久化消息的问题。对于这种情况,我们可以
- 尽量将消息持久化
- 如果不想持久化,那么我们应该尽可能的及时处理非持久化的消息
- 使用事务,它可以保证消息不会因为连接关闭而丢失
持久化消息比较慢
默认的情况下,非持久化消息是异步发送的;而持久化消息是同步发送的。遇到慢一点的硬盘,发送消息的速度也会很慢
但如果开启事务的情况下,消息都会异步发送,效率会有非常大的提升。所以在发送持久化消息时,我们应该务必开启事务。并且我们也建议发送非持久化消息时也开启事务
自定义 ActiveMQ 的重发策略(Redelivery Policy)
可通过 ConnectionFactory.RedeliveryPolicy
属性设置
CollisionAvoidancePercent
:默认值 0.15, 设置防止冲突范围的正负百分比,只有启用UseCollisionAvoidance
参数时才生效MaximumRedeliveries
:默认值 6, 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传InitialRedeliveryDelay
:默认值 1000, 初始重发延迟时间UseCollisionAvoidance
:默认值false
, 启用防止冲突功能UseExponentialBackOff
:默认值false
, 启用指数倍数递增的方式增加延迟时间BackOffMultiplier
:默认值 5, 重连时间间隔递增倍数,只有值大于1和启用UseExponentialBackOff
参数时才生效。
多消费者并发处理
在有多个消费者,ActiveMQ 中累积了大量的数据的情况下,有可能会出现只有一个消费者消费、其他消费者不“工作”的情况
这种情况下,我们只需要将 ActiveMQ 的 prefetch
值设置得小一点即可。在 Queue模式时,其默认值为 1000;Topic 下为 32766。可通过 ConnectionFactory.PrefetchPolicy
设置
这篇文章就先讲到这里,后面我们会讲解 ActiveMQ 的一些其他场景,如分布式集群。欢迎持续关注公众号【嘿嘿的学习日记】,Thank you~