一、ActiveMQ简介
1.1什么是ActiveMQ
MQ是一个消息中间件,ActiveMQ、RabbitMQ、kafka
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
1.2ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage – Java原始值的数据流
· MapMessage–一套名称-值对
· TextMessage–一个字符串对象
· ObjectMessage–一个序列化的 Java对象
· BytesMessage–一个字节的数据流
二、ActiveMQ的安装
2.1前置条件
0、进入http://activemq.apache.org/下载ActiveMQ
百度云链接(apache-activemq-5.12.0-bin.tar.gz):链接:https://pan.baidu.com/s/1r-IMoOR_N1uAniDDKY_lKQ 密码:w2jc
1、需要jdk
2、安装Linux系统。生产环境都是Linux系统。
2.2安装步骤
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。
第三步:启动。
使用bin目录下的activemq命令启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2
进入管理后台:
http://服务器ip地址:8161/admin
用户名:admin
密码:admin
三、ActiveMQ的使用方法
3.0消息传递的两种类型
1、点对点(Queue),即一个生产者和一个消费者一一对应;
2、发布/订阅模式(Topic),即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
3.1点对点(Queue)
3.1.1点对点–生产者(Producer)
<!-- activeMq的依耐 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
//测试Queue形式的发布消息(点对点模式--生产者)
@Test
public void testQueueProducer() throws JMSException{
//1、创建一个连接工厂对象ConnectionFactory对象,指定服务的IP和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.206.128:61616");
//2、使用ConnectionFactory对象创建Connection连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用start方法
connection.start();
//4、使用Connection对象创建session对象
//第一个参数:是否开启事务,一般不开启,false;当第一个参数为false时,第二个参数才有意义。
//第二个参数:消息的应答模式:手动应答、自动应答,一般使用自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session创建Destination目的地对象,有两种:Queue、Topic
Queue queue = session.createQueue("testActiveMq");
//6、使用session创建一个producer对象
MessageProducer producer = session.createProducer(queue);
//7、使用session创建一个TextMessage对象(五种格式:StreamMessage 、MapMessage、TextMessage、ObjectMessage、BytesMessage)
String text = "这是使用activemq发送的队列消息内容,消息发送时间:" + new DateFormatUtils().format(new Date(), "yyyy-MM-dd HH:mm:ss");
TextMessage message = session.createTextMessage(text);//消息内容
// TextMessage message2 = session.createTextMessage("这是使用activemq发送的队列消息内容");//这种方式也可以
//8、使用producer对象发布消息
producer.send(message);
System.out.println("点对点模式--生产者--消息发送成功!" + System.currentTimeMillis());
//9、关闭资源
producer.close();
session.close();
connection.close();
}
3.1.2点对点–消费者(Consumer)
//queue消费者测试(点对点模式--消费者)
@Test
public void testQueueConsumer() throws JMSException, IOException{
//1、创建一个连接工厂对象ConnectionFactory对象,指定服务的IP和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.206.128:61616");
//2、使用ConnectionFactory对象创建Connection连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用start方法
connection.start();
//4、使用Connection对象创建session对象
//第一个参数:是否开启事务,一般不开启,false;当第一个参数为false时,第二个参数才有意义。
//第二个参数:消息的应答模式:手动应答、自动应答,一般使用自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session创建Destination目的地对象,有两种:Queue、Topic
Queue queue = session.createQueue("testActiveMq");
//6、使用session创建一个consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//7、接受消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println("接收到的activemq消息内容:" + text + ",消息接收时间:" + new DateFormatUtils().format(new Date(), "yyyy-MM-dd HH:mm:ss"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8、等待键盘输入(程序等待,可以继续接收消息)
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
3.2发布/订阅模式(Topic)
3.2.1发布/订阅模式–生产者
//Topic生产者测试(发布/订阅模式--生产者)
@Test
public void testTopicProducer() throws JMSException{
//1、创建一个连接工厂对象ConnectionFactory对象,指定服务的IP和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.206.128:61616");
//2、使用ConnectionFactory对象创建Connection连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用start方法
connection.start();
//4、使用Connection对象创建session对象
//第一个参数:是否开启事务,一般不开启,false;当第一个参数为false时,第二个参数才有意义。
//第二个参数:消息的应答模式:手动应答、自动应答,一般使用自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session创建Destination目的地对象,有两种:Queue、Topic
Topic topic = session.createTopic("testTopic");
//6、使用session创建一个producer对象
MessageProducer producer = session.createProducer(topic);
//7、使用session创建一个TextMessage对象(五种格式:StreamMessage 、MapMessage、TextMessage、ObjectMessage、BytesMessage)
TextMessage message = session.createTextMessage("这是发布/订阅模式--生产者模式发送的消息!消息发送时间:" + new DateFormatUtils().format(new Date(), "yyyy-MM-dd HH:mm:ss"));
//8、使用producer对象发布消息
producer.send(message);
System.out.println("发布/订阅模式--生产者--发布消息成功!" + System.currentTimeMillis());
//9、关闭资源
producer.close();
session.close();
connection.close();
}
3.2.1发布/订阅模式–消费者
//Topic消费者测试(发布/订阅模式--消费者)
@Test
public void testTopicCustomer() throws JMSException, IOException{
//1、创建一个连接工厂对象ConnectionFactory对象,指定服务的IP和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.206.128:61616");
//2、使用ConnectionFactory对象创建Connection连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用start方法
connection.start();
//4、使用Connection对象创建session对象
//第一个参数:是否开启事务,一般不开启,false;当第一个参数为false时,第二个参数才有意义。
//第二个参数:消息的应答模式:手动应答、自动应答,一般使用自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session创建Destination目的地对象,有两种:Queue、Topic
Topic topic = session.createTopic("testTopic");
//6、使用session创建一个consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//7、接受消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println("接收到的Topic模式的消息内容:" + text + ",消息接收时间:" + new DateFormatUtils().format(new Date(), "yyyy-MM-dd HH:mm:ss"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8、等待键盘输入(程序等待,可以继续接收消息)
//注意:Topic模式下,生产者发送消息之后再服务器上没有缓存,区别于点对点模式(可以先执行testTopicCustomer再执行testTopicProducer)
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}