指尖上的代码
--之ACTIVEMQ
1、 简要介绍
ACTIVEMQ是APACHE旗下一个消息中间件开源项目,符合JMS规范标准,ACTIVEMQ5以后通过集成CAMEL全面支持Enterprise Integration Patterns。
ACTIVEMQ作为一个JMS Provider,在看ACTIVEMQ源码之前假定对JMS有点了解。
2、 作为JMS CLIENT的ACTIVEMQ API
2.1从例子讲起
从简单入手,先来看一下作为JMS CLIENT的ACTIVEMQ API。从ACTIVEMQ官网下载apache-activemq-5.6.0(现在发布的最新版本),example目录下有些基础的例子,选取ProducerTool.java来窥探一下ACTIVEMQ如何实现JMS API的。以下是ProducerTool中创建连接、会话、Producer,发送消息的全貌。具体发送消息的细节在sendLoop方法中,后续会看到。
Connection connection = null;
try {
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
// Start sending messages
sendLoop(session, producer);
System.out.println("[" + this.getName() + "] Done.");
synchronized (lockResults) {
ActiveMQConnection c = (ActiveMQConnection) connection;
System.out.println("[" + this.getName() + "] Results:\n");
c.getConnectionStats().dump(new IndentPrinter());
}
} catch (Exception e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
2.2发送消息前的准备工作
首先看一下ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
user,password顾名思义就是接入ACTIVEMQ的验证所需要的用户名,密码。这里都为null,说明ACTIVEMQ配置的不需要验证。url统一资源标识符定义了JMS Client如何找到JMS Provider。
URI(统一资源标识符)语法结构:
[scheme:][//authority][path][?query][#fragment]
authority部分:
[user-info@]host[:port]
这里url使用了默认值:failover://tcp://localhost:61616,以此为例:
scheme部分为failover,但是没有query部分。对于ACTIVEMQ,query部分现在有以下三个有效参数:
“jms.prefetchPolicy.”,“jms.redeliveryPolicy.”,“jms.blobTransferPolicy.”分别对应ActiveMQPrefetchPolicy,RedeliveryPolicy和BlobTransferPolicy。
这里似乎还没有用到“tcp://localhost:61616”这部分,我们接着往下看。
第二句,connection = connectionFactory.createConnection();创建一个连接。
最终掉用了该函数,
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
Transport transport = createTransport();
connection = createActiveMQConnection(transport, factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
configureConnection(connection);
transport.start();
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} catch (JMSException e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw e;
} catch (Exception e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
}
}
其中userName和password就是第一句中ActiveMQConnectionFactory初始化时传入的user和password。
在创建连接的同时,首先创建了Transport。如果Connection在JMS API中统筹全局,那么Transport作为Connection中的一部分,负责协助Connection底层通讯。