AcitveMQ是什么?
AcitveMQ是Apach的消息中间件,是完全支持JMS规范的一个实现,主要应用与分布式系统架构中,帮助构建高可用、高性能、可伸缩的面向消息服务的系统。
AcitveMQ能做什么?
1、消息队列 - 异步消息处理
2、消息队列 - 流量削峰(秒杀场景,用户请求放到消息队列里面排队)
3、消息队列 - 应用解耦
安装AcitveMQ
在官网下载解压,就可以启动直接用了,测试访问:http://localhost:8161 账号密码都是admin
JMS基本概念
概念:java消息服务(java message service)是java平台中关于面向消息中间件的API(只提供了规范,自己没提供实现,类似于JDBC),用于两个应用程序之间,或者分布式系统中发送消息,进行异步通信。JMS是一个与平台无关的API,绝大多数MOM(面向消息中间件)提供商都对JMS提供了支持(例如AcitveMQ)。
什么是MON
面向消息的中间件,使用消息传送提供者来协调消息传输操作。MOM需要提供API和管理工具,客户端调用API,把消息发送到消息传送提供者指定的目的地,在消息发送之后,客户端会执行其它的工作,并且在接受方收到这个消息确认之前,提供者一直保留改消息。
JSM Provider就是现在要学习的AcitveMQ。
创建producer(生产者)服务(springboot):
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) throws JMSException {
SpringApplication.run(ProducerApplication.class, args);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建队列(如果队列已经存在则不会创建,first queue是队列名称)
//destination表示目的地
Destination destination = session.createQueue("first queue");
//创建消息的发送者
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("你好,我是发送者小明");
producer.send(textMessage);
//session.commit()消费端无数次启动就能无数次消费这条消息,消息会一直存在
session.commit();
session.close();
connection.close();
}
}
启动服务:访问activemq管理界面可以看到有一条消息发送上去了,"你好,我是发送者小明"。
创建consumer(消费者)服务(springboot):
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.properties
server.port=8081
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) throws JMSException {
SpringApplication.run(ConsumerApplication.class, args);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//创建连接
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//destination表示目的地
Destination destination = session.createQueue("first queue");
//创建消息的消费者
MessageConsumer consumer = session.createConsumer(destination);
TextMessage textMessage = (TextMessage) consumer.receive();
System.out.println(textMessage.getText());
session.commit();
session.close();
connection.close();
}
}
启动后消费者服务控制台接受到消息"你好,我是发送者小明"
消息模型(消息传递域)
点对点(P2P):上面已经演示过了
1、每个消息只有一个消费者
2、消息的产生者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息。
3、如果session关闭时,有一些消息已经收到,但还没有被签收,那么消费者下次连接到相同的队列时,消息还会被签收
4、如果用户仔receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中不会被接受
5、队列可以长久保存消息直到消息被消费者签收,消费者不需要担心因为消息丢失而时刻与jms provider保持连接状态
发布订阅:
1、每个消息可以有多个消费者
2、消息的生产者和消费者之间存在时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息,JMS规范允许提供客户端创建持久订阅
3、订阅可以分为非持久订阅和持久订阅
JMS API
ConnectionFactory:连接工厂,用来创建连接对象的。
Connection:封装客户端和JMS provider之间的虚拟连接。
Session:是生产和消费之间消费消息的单线程的上下文,用来创建Producer、Consumer、Message、Queue等等。
Destination:消息发送或者接受的目的地
MessageProducer/MessageConsumer:消息生产者/消费者
消息组成:
消息头:包含消息的识别信息和路由信息
消息体:具体传递消息的类型(TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage)
属性
JMS的可靠性机制
JMS消息只有被确认后,才会认为被成功消费,消息的消费包括三个阶段:客户端接受消息、客户端处理消息、消息被确认
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
事务性会话:Boolean.TRUE当一个消息提交以后,这个消息会在session.commit()之后自动签收。
非事务性会话:Boolean.FLASE只有当这个设置成FLASE,后面的参数才会生效,在该模式下消息何时被确认取决于创建会话时的应答模式。
AUTO_ACKNOWLEDGE:当客户端成功从recive方法返回以后,或者[MessageListener.onMessage]方法返回以后,会话自动确认该消息。
CLIENT_ACKNOWLEDGE:
客户端通过调用下面代码确认消息。
textMessage.acknowledge();
在这种模式中,如果一个发送端发送了10个消息,消息消费者在循环消费第5个消息调用了上面代码,那么之前的所有消息都会被消费。后面的不会消费。
DUPS_OK_ACKNOWLEDGE:延迟确认
JMS (pub/sub)模型
生产者和消费者:都只需要修改一句代码,其它保持一致
Destination destination = session.createTopic("first-topic");
先启动两个消费者服务实例,再启动生产者,会看到两个消费者服务都会接收到消息。
消息持久化存储
1、kahaDB:默认的存储方式
保存数据路径 apache-activemq-5.8.0\data\kahadb
2、AMQ:基于文件的存储方式
写入速度块,且容易恢复,文件默认大小32M
3、JDBC:基于数据库的存储
4、Memory:基于内存的存储
5、LevelDB:MQ5.8以后的持久化策略,通常用于集群
NetWorkConnector
主要用来配置broker和broker之间的通信连接
两种网络连接方式:静态和动态
消息的发送策略
持久化消息
默认情况下,生产者发送的消息是持久化的,消息发送到broker后,producerh会等待broker对这条消息的处理情况的反馈。
可以设置消息发送端发送持久化消息的异步方式
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
connectionFactory.setUseAsyncSend(true);
怎么确认broker已经接收满了?
可以设置回执窗口大小,达到了一定数量就等待服务器消息的回执,size递减后才能再发送消息。
connectionFactory.setProducerWindowSize();
非持久化消息
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
非持久化消息模式下,默认就是异步发送过程,也要设置setProducerWindowSize。
如果需要每次对非持久化消息的每次发送的消息都获得broker的回执的话
connectionFactory.setAlwaysSyncSend();
默认情况下,mq服务器(broker)采用异步方式向客户端主动推送消息(push)。也就是说broker在向某个消费者会话推送消息后,不会等待消费者相应消息,直到消费者处理完消息以后,主动向broker返回处理结果。
prefetchsize(预取消息数量):
broker端一旦有消息,就主动按照默认设置的规则推送给当前活动的消费者,每次推送都有一定的数量限制,而这个数量就是prefetchsize。
Queue:
持久化消息:prefetchsize=1000
非持久化消息:prefetchsize=1000
topic:
持久化消息:prefetchsize=100
非持久化消息:prefetchsize=32766
假如prefetchsize=0,此时对于consumer来说,就是一个pull模式。
Destination destination = session.createTopic("first-topic?customer.perfetchSize=1000");
AcitveMQ支持的传输协议:
client端和broker端的通讯协议
TCP、UDP、NIO、SSL、HTTP、VM
上面演示代码用的是TCP,默认的。