activeMQ
一 ActiveMQ简介
1.1 什么是ActiveMQ
ActiveMQ是Apache推出的,一款开源的,完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现的消息中间件(Message Oriented Middleware,MOM),实际上为什么把MQ叫做消息中间件。它最初的来源当然是由于系统A与系统B之间有消息的传递。这个时候我们把系统A与系统B之间消息传递的过程打断。A与B通过MQ来间接通信的过程。所以这个时候的MQ就叫做消息中间件。
1.2 ActiveMQ的作用
最主要的功能就是:实现JMS Provider,用来帮助实现高可用、高性能、可伸缩、 易用和安全的企业级面向消息服务的系统。
1.3 ActiveMQ特点
完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务)
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
可插拔的体系结构,可以灵活定制,如:消息存储方式、安全管理等
很容易和Application Server集成使用
多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP
从设计上保证了高性能的集群,客户端-服务器,点对点
可以很容易的和Spring结合使用
支持通过JDBC和journal提供高速的消息持久化
支持与Axis的整合。
1.5 ActiveMQ的主要功能
将信息以消息的形式,从一个应用程序传送到另一个或多个应用程序。
1.6 ActiveMQ的主要特点
1:消息异步接受,类似手机短信的行为,消息发送者不需要等待消息接受者的响应,减少软件多系统集成的耦合度。
2:消息可靠接收,确保消息在中间件可靠保存,只有接收方收到后才删除消息,多个消息也可以组成原子事务。
1.7 ActiveMQ的主要应用场景
在多个系统间进行整合和通讯的时候,通常会要求:
1:可靠传输,数据不能丢失,有的时候,也会要求不能重复传输;
2:异步传输,否则各个系统同步发送接受数据,互相等待,造成系统瓶颈
1.8 比较知名的消息中间件
IBM MQSeries
BEA WebLogicJMS Server
Oracle AQ
Tibco
SwiftMQ
AcitveMQ:是免费的java实现的消息中间件
二 ActiveMQ安装与基本使用
注意:安装gcc,jdk等
2.1 安装解压
ActiveMQ服务器端 1:从http://activemq.apache.org/download.html下载最新的ActiveMQ
2.2 启动运行
1:普通启动:到ActiveMQ/bin下面,./activemq start
2:启动并指定日志文件 ./activemq start > /tmp/activemqlog
2.3 启动检查
ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服 务,执行以下命令以便检验是否已经成功启动ActiveMQ服务:
1:比如查看61616端口是否打开: netstat -an | grep 61616
2:也可以直接查看控制台输出或者日志文件
3:还可以直接访问ActiveMQ的管理页面:
默认的用户名和密码是admin/admin
2.4 停止ActiveMQ
可以用./activemq stop
暴力点的可以用ps aux| grep activemq 来得到进程号,然后kill掉
2.5 生产者
public class MsgSendder {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = ConnectionFactoryconnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message--" + i);
Thread.sleep(1000);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
2.6 消费者
public class MsgReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到消息:" + message.getText());
}
session.close();
connection.close();
}
}
三 activeMQ消息通信
3.1 p2p的消息通信
3.1.1 producer
public class MsgSendder {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = ConnectionFactoryconnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message--" + i);
Thread.sleep(1000);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
3.1.2 consumer
public class MsgReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到消息:" + message.getText());
}
session.close();
connection.close();
}
}
3.1.3 管理平台
3.2 非持久化消息
3.2.1 producer
public class NonPersisiTopicSender {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = conFactory.createConnection();
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("蜘蛛侠");
MessageProducer createProducer = createSession.createProducer(createTopic);
for(int i=0;i<3;i++){
TextMessage createTextMessage = createSession.createTextMessage("message"+i);
createProducer.send(createTextMessage);
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.2.2 consumer
public class NonPersisiTopicReceiver {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("蜘蛛侠");
MessageConsumer createConsumer = createSession.createConsumer(createTopic);
TextMessage message = (TextMessage)createConsumer.receive();
while(message!=null){
System.out.println(message.getText());
message = (TextMessage)createConsumer.receive();
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.2.3 管理平台
3.3 持久化消息
3.3.1 producer
public class PersisiTopicSender {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = conFactory.createConnection();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("persisitent");
MessageProducer createProducer = createSession.createProducer(createTopic);
createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
createConnection.start();
for(int i=0;i<3;i++){
TextMessage createTextMessage = createSession.createTextMessage("message"+i);
createProducer.send(createTextMessage);
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.3.2 consumer
public class PersisiTopicReceiver {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");
Connection createConnection = activeMQConnectionFactory.createConnection();
createConnection.setClientID("订阅者B_ID");
createConnection.start();
Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = createSession.createTopic("persisitent");
TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");
TextMessage message = (TextMessage)createDurableSubscriber.receive();
while(message!=null){
System.out.println(message.getText());
message = (TextMessage)createDurableSubscriber.receive();
}
createSession.commit();
createSession.close();
createConnection.close();
}
}
3.3.3 管理平台
3.4 总结
3.4.1持久化消息
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。 这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
3.4.2非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式; 如: producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式