AcitveMQ

AcitveMQ是什么?

AcitveMQ是Apach的消息中间件,是完全支持JMS规范的一个实现,主要应用与分布式系统架构中,帮助构建高可用、高性能、可伸缩的面向消息服务的系统。

AcitveMQ能做什么?

1、消息队列 - 异步消息处理

2、消息队列 - 流量削峰(秒杀场景,用户请求放到消息队列里面排队)

3、消息队列 - 应用解耦

安装AcitveMQ

在官网下载解压,就可以启动直接用了,测试访问:http://localhost:8161 账号密码都是admin

JMS基本概念

概念:java消息服务(java message service)是java平台中关于面向消息中间件的API(只提供了规范,自己没提供实现,类似于JDBC),用于两个应用程序之间,或者分布式系统中发送消息,进行异步通信。JMS是一个与平台无关的API,绝大多数MOM(面向消息中间件)提供商都对JMS提供了支持(例如AcitveMQ)。

什么是MON

面向消息的中间件,使用消息传送提供者来协调消息传输操作。MOM需要提供API和管理工具,客户端调用API,把消息发送到消息传送提供者指定的目的地,在消息发送之后,客户端会执行其它的工作,并且在接受方收到这个消息确认之前,提供者一直保留改消息。

JSM Provider就是现在要学习的AcitveMQ。

创建producer(生产者)服务(springboot):

引入依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
@SpringBootApplication
public class ProducerApplication {
	public static void main(String[] args) throws JMSException {
		SpringApplication.run(ProducerApplication.class, args);
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
		//创建连接
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
		//创建队列(如果队列已经存在则不会创建,first queue是队列名称)
		//destination表示目的地
		Destination destination = session.createQueue("first queue");
		//创建消息的发送者
		MessageProducer producer = session.createProducer(destination);
		TextMessage textMessage = session.createTextMessage("你好,我是发送者小明");
		producer.send(textMessage);
		//session.commit()消费端无数次启动就能无数次消费这条消息,消息会一直存在
		session.commit();
		session.close();
		connection.close();
	}
}

启动服务:访问activemq管理界面可以看到有一条消息发送上去了,"你好,我是发送者小明"。

创建consumer(消费者)服务(springboot):

引入依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.properties

server.port=8081
@SpringBootApplication
public class ConsumerApplication {
	public static void main(String[] args) throws JMSException {
		SpringApplication.run(ConsumerApplication.class, args);
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
		//创建连接
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
		//destination表示目的地
		Destination destination = session.createQueue("first queue");
		//创建消息的消费者
		MessageConsumer consumer = session.createConsumer(destination);
		TextMessage textMessage = (TextMessage) consumer.receive();
		System.out.println(textMessage.getText());
		session.commit();
		session.close();
		connection.close();
	}
}

启动后消费者服务控制台接受到消息"你好,我是发送者小明" 

消息模型(消息传递域)

点对点(P2P):上面已经演示过了

1、每个消息只有一个消费者

2、消息的产生者和消费者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否处于运行状态,都可以提取消息。

3、如果session关闭时,有一些消息已经收到,但还没有被签收,那么消费者下次连接到相同的队列时,消息还会被签收

4、如果用户仔receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中不会被接受

5、队列可以长久保存消息直到消息被消费者签收,消费者不需要担心因为消息丢失而时刻与jms provider保持连接状态

发布订阅:

1、每个消息可以有多个消费者

2、消息的生产者和消费者之间存在时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息,JMS规范允许提供客户端创建持久订阅

3、订阅可以分为非持久订阅和持久订阅

 

JMS API

ConnectionFactory:连接工厂,用来创建连接对象的。

Connection:封装客户端和JMS provider之间的虚拟连接。

Session:是生产和消费之间消费消息的单线程的上下文,用来创建Producer、Consumer、Message、Queue等等。

Destination:消息发送或者接受的目的地

MessageProducer/MessageConsumer:消息生产者/消费者

消息组成:

消息头:包含消息的识别信息和路由信息 

消息体:具体传递消息的类型(TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage)

属性

JMS的可靠性机制

JMS消息只有被确认后,才会认为被成功消费,消息的消费包括三个阶段:客户端接受消息、客户端处理消息、消息被确认

Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

事务性会话:Boolean.TRUE当一个消息提交以后,这个消息会在session.commit()之后自动签收。

非事务性会话:Boolean.FLASE只有当这个设置成FLASE,后面的参数才会生效,在该模式下消息何时被确认取决于创建会话时的应答模式。

AUTO_ACKNOWLEDGE当客户端成功从recive方法返回以后,或者[MessageListener.onMessage]方法返回以后,会话自动确认该消息。

CLIENT_ACKNOWLEDGE:

客户端通过调用下面代码确认消息。

textMessage.acknowledge();

在这种模式中,如果一个发送端发送了10个消息,消息消费者在循环消费第5个消息调用了上面代码,那么之前的所有消息都会被消费。后面的不会消费。

DUPS_OK_ACKNOWLEDGE延迟确认

JMS (pub/sub)模型

生产者和消费者:都只需要修改一句代码,其它保持一致

Destination destination = session.createTopic("first-topic");

先启动两个消费者服务实例,再启动生产者,会看到两个消费者服务都会接收到消息。

消息持久化存储

1、kahaDB:默认的存储方式

保存数据路径 apache-activemq-5.8.0\data\kahadb

2、AMQ:基于文件的存储方式

写入速度块,且容易恢复,文件默认大小32M

3、JDBC:基于数据库的存储

4、Memory:基于内存的存储

5、LevelDB:MQ5.8以后的持久化策略,通常用于集群

NetWorkConnector

主要用来配置broker和broker之间的通信连接

两种网络连接方式:静态和动态

消息的发送策略

持久化消息

默认情况下,生产者发送的消息是持久化的,消息发送到broker后,producerh会等待broker对这条消息的处理情况的反馈。

可以设置消息发送端发送持久化消息的异步方式

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
connectionFactory.setUseAsyncSend(true);

怎么确认broker已经接收满了?

可以设置回执窗口大小,达到了一定数量就等待服务器消息的回执,size递减后才能再发送消息。

connectionFactory.setProducerWindowSize();

非持久化消息

textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);

非持久化消息模式下,默认就是异步发送过程,也要设置setProducerWindowSize。

如果需要每次对非持久化消息的每次发送的消息都获得broker的回执的话

connectionFactory.setAlwaysSyncSend();

默认情况下,mq服务器(broker)采用异步方式向客户端主动推送消息(push)。也就是说broker在向某个消费者会话推送消息后,不会等待消费者相应消息,直到消费者处理完消息以后,主动向broker返回处理结果。

prefetchsize(预取消息数量):

broker端一旦有消息,就主动按照默认设置的规则推送给当前活动的消费者,每次推送都有一定的数量限制,而这个数量就是prefetchsize。

Queue:

持久化消息:prefetchsize=1000

非持久化消息:prefetchsize=1000

topic:

持久化消息:prefetchsize=100

非持久化消息:prefetchsize=32766

假如prefetchsize=0,此时对于consumer来说,就是一个pull模式。

Destination destination = session.createTopic("first-topic?customer.perfetchSize=1000");

AcitveMQ支持的传输协议:

client端和broker端的通讯协议

TCP、UDP、NIO、SSL、HTTP、VM

上面演示代码用的是TCP,默认的。

猜你喜欢

转载自blog.csdn.net/qq_26857649/article/details/83010859