■配置JMS连接--ActiveMQ提供类用作配置URLs和其他连接到代理的参数。连接工厂能稍后被你的程序用来获取合适的连接。
■配置JMS destination--ActiveMQ destination对象能简单地进行配置,它简单地作为beans,代表用于你的producers和consumers的JMS destinations。
■定义JMS consumers--Spring提供helper类允许你简单地配置消息监听容器并hook消息监听器到它。
■实现JMSproducers--Spring也为建立新的producers提供helper bean classes。
在下面的章节中,这个任务将被演示并且portfolio应用将被修改为使用ActiveMQ和Spring整合的所有优点。
7.4.1配置JMS连接
如前面示例所看到的,创建JMS应用的第一步是创建到ActiveMQ代理的连接。ActiveMQConnectionFactory是建立ActiveMQConnection的工厂,它们都能在Spring中简单的使用。下面的snippet显示了如何使用Spring XML配置定义ActiveMQConnectionFactory:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> <property name="userName" value="admin" /> <property name="password" value="password" /> </bean>
在这个snippet中,注意在ActiveMQConnectionFactory上配置的属性。
在一些使用示例中为了达到想要的效果a pool of connnections是必要的。为了这个目的,ActiveMQ提供了PooledConnectionFactory类,它报刊了一个JMS连接和sessions池。这里有一个PooledConnectionFactory的Spring XML配置:
<bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
在这个示例中只有一个属性在PooledConnectionFactory上被配置--connectionFactory属性。connectionFactory属性配用来定义到ActiveMQ代理的underlying连接工厂,这个连接工厂将被pooled connection factory。在这个用例中,我们使用了我们先前定义的jmsConnectionFactory bean。
因为pooled connection factory在Apache Commons Pool project( http://mng.bz/j3PV)上有一个依赖,即将需要添加JAR到claspath。或者,如果你为你的项目管理使用Maven,只要添加下面的dependency到pom.xml文件:
<dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.4</version> </dependency>
先前的XML定义关于common-pool-1.4.jar 文件的Maven依赖,并且为你自动获取。
一旦JMS连接被定义,你能进一步到定义JMSdestinations,producers,和consumers。
7.4.2配置JMS destinations
JMS destinations能用ActiveMQTopic和ActiveMQQueue类在activemq.xml文件中预定义。下面的snippet包含了用于portfolio示例的两个新的topic定义:
<bean id="cscoDest" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="STOCKS.CSCO" /> </bean> <bean id="orclDest" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="STOCKS.ORCL" /> </bean>
如你所见,这些类为在ActiveMQTopic类上设置一个想要的destination name使用constructor injection。预定义的topics在ActiveMQ是不需要的,但它对环境是便利的,在环境中代理需要客户端为许多操作进行身份验证。更多关于client authentication信息请看第6章。现在一个连接和一系列的destinations已经存在了,你能开始发送和接收消息了。
7.4.3建立JMS consumers
下面的两节touch upon Spring JMS(http://mng.bz/I0Pe) 为建立consumers和producers的基本使用,因为它使建立JMSconsumers和producers极度简单。虽然Spring JMS提供了一些强大的特性,但是这两节将不深入详细,因为这在本书的讨论范围之外。取而代之地,我们将展示使你快速上手和运行于portfolio示例的方面。关于Spring JMS的更多信息,请查阅Spring文档。
在Spring中接收消息的基本的abstraction是消息监听容器(MLC: see http://mng.bz/LJti)。MLC设计提供了一个在你消息监听和代理间处理connections,threading的媒介,使你仅仅要关心你的在监听器上的业务逻辑。在下面的列表中,从第3章来的portfolio 消息监听被用于两个消息监听容器,为两个在先前章节定义的destinations。
Listing 7.22 Defining two Spring message listener containers and a message listener
<!-- The message listener --> <bean id="portfolioListener" class="org.apache.activemq.book.ch3.portfolio.Listener"> </bean> <!-- Spring DMLC --> <bean id="cscoConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="cscoDest" /> <property name="messageListener" ref="portfolioListener" /> </bean> <!-- Spring DMLC --> <bean id="orclConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="orclDest" /> <property name="messageListener" ref="portfolioListener" /> </bean>
在表7.22中的每个MLC实例需要一个连接工厂,一个destination和一个消息监听。所以你所要做的是实现一个消息监听bean并让其他事情交由Spring MLC做。注意在这个示例中我们使用了plain(not pooled)连接工厂。这是因为对于简单的示例connection pooling是不需要的。这个示例使用了Spring DefaultMessageListenerContainer(DMLC),它在MLC中是最普遍使用的。虽然在DMLC上许多其他的属性能被配置,但这个示例仅使用基本的。当这两个DMLC实例被启动,他们将准备好接收消息并转换到消息监听。
现在让我们发送消息到ActiveMQ。
7.4.4建立JMS producers
正如接收消息的示例,Spring也提供发送消息的便利。发送消息的creucial abstraction是Spring的JmsTemplate类。JmsTemplate遵照基本模板模式为发送消息提供方便的类。
使用Spring发送消息的一个最普遍的方法是通过实现Spring MessageCreator接口和使用它 with the appropriate send() method of the JmsTemplate class。下面列表通过借用第三章的stock portfolio publisher 实现整个消息建立逻辑展示了它。
Listing 7.23 Implementation of a MessageCreator for sending messages using Spring
public class StockMessageCreator implements MessageCreator { private int MAX_DELTA_PERCENT = 1; private Map<Destination, Double> LAST_PRICES = new Hashtable<Destination, Double>(); Destination stock; public StockMessageCreator(Destination stock) { this.stock = stock; } public Message createMessage(Session session) throws JMSException { Double value = LAST_PRICES.get(stock); if (value == null) { value = new Double(Math.random() * 100); } // lets mutate the value by some percentage double oldPrice = value.doubleValue(); value = new Double(mutatePrice(oldPrice)); LAST_PRICES.put(stock, value); double price = value.doubleValue(); double offer = price * 1.001; boolean up = (price > oldPrice); MapMessage message = session.createMapMessage(); message.setString("stock", stock.toString()); message.setDouble("price", price); message.setDouble("offer", offer); message.setBoolean("up", up); System.out.println( "Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + stock ); return message; } protected double mutatePrice(double price) { double percentChange = (2 * Math.random() * MAX_DELTA_PERCENT) - MAX_DELTA_PERCENT; return price * (100 + percentChange) / 100; } }
MessageCreator接口仅定义了createMessage()方法,它将返回一个JMS消息。这里,我们为了建立随机stock 价格实现了一些逻辑,并且women建立了一个合适的JMS map message来hold所有的相关了的数据。为了发送消息,JmsTemplate’s send()方法将使用StockMessageCreator如下所示。
Listing 7.24 JMS publisher implementation in Spring
public class SpringPublisher { private JmsTemplate template; private int count = 10; private int total; private Destination[] destinations; private HashMap<Destination,StockMessageCreator> creators = new HashMap<Destination,StockMessageCreator>(); public void start() { while (total < 1000) { for (int i = 0; i < count; i++) { sendMessage(); } total += count; System.out.println("Published '" + count + "' of '" + total + "' price messages"); try { Thread.sleep(1000); } catch (InterruptedException x) { } } } protected void sendMessage() { int idx = 0; while (true) { idx = (int)Math.round(destinations.length * Math.random()); if (idx < destinations.length) { break; } } Destination destination = destinations[idx]; template.send(destination, getStockMessageCreator(destination)); } private StockMessageCreator getStockMessageCreator(Destination dest) { if (creators.containsKey(dest)) { return creators.get(dest); } else { StockMessageCreator creator = new StockMessageCreator(dest); creators.put(dest, creator); return creator; } } // getters and setters goes here }
在表7.24中要注意的重要是send()方法如何使用message creator。在这个示例中的其他部分是和第三章中的原始的stock portfolio publisher相同的。现在你有了使用Spring发布消息到ActiveMQ的所有必要的组件。所有剩下的工作是正确地配置它,如下表所示。
Listing 7.25 JMS publisher configuration in Spring
<!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledJmsConnectionFactory" /> </bean> <bean id="stockPublisher" class="org.apache.activemq.book.ch7.spring.SpringPublisher"> <property name="template" ref="jmsTemplate" /> <property name="destinations"> <list> <ref local="cscoDest" /> <ref local="orclDest" /> </list> </property> </bean>
在表7.25中的snippet展示了Spring JmsTemplate和publisher的实例。publisher简单地需要一个正在使用的JMS destination的引用,并且JmsTemplate需要一个connection工厂。
注意:
The pooled connection factory is used with the JmsTemplate.这是很重要的因为JmsTemplate是为use with Java EE containers in mind 设计的,它按照Java EE 规范代表性地提供connection pooling 能力。每一个对JmsTemplate.send()的调用建立和销毁所有的JMS资源(connections,consumers和producers)。所以如果你不在使用Java EE container,确保为sending message with the JmsTemplate使用一个pooled connection工厂.
connections和destinations被定义了;consumers和producer已经建立。现在让我们运行示例。
7.4.5组装在一起
在实现了这个示例的所有部分之后,应用已经准备好运行了。看看下面的列表来看看将助兴示例的main方法。
Listing 7.26 The main method for the Spring example
public class SpringClient { public static void main(String[] args) { BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.setPersistent(false); broker.start(); FileSystemXmlApplicationContext context = new FileSystemXmlApplicationContext( "src/main/resources/org/apache/activemq/book/ch7/spring-client.xml" ); SpringPublisher publisher = (SpringPublisher)context.getBean("stockPublisher"); publisher.start(); } }
这个简单类启动了一个最小化的ActiveMQ代理配置和初始化了Spring application contex 去启动JMS客户端。
这个示例能使用下面的命令运行。
Listing 7.27 Run the Spring example
$ mvn exec:java \ -Dexec.mainClass=org.apache.activemq.book.ch7.spring.SpringClient \ -Dlog4j.configuration=file:src/main/java/log4j.properties ... Sending: {price=65.958996694, stock=CSCO, offer=66.0249556914, up=false} on destination: topic://STOCKS.CSCO topic://STOCKS.IONA 79.97 80.05 down Sending: {price=80.67595675108, stock=ORCL, offer=80.7566327078, up=true} on destination: topic://STOCKS.ORCL topic://STOCKS.JAVA 65.96 66.02 down Sending: {price=65.63333898492, stock=CSCO, offer=65.69897232391, up=false} on destination: topic://STOCKS.CSCO topic://STOCKS.IONA 80.68 80.76 up Sending: {price=80.50525969261, stock=ORCL, offer=80.58576495231, up=false} on destination: topic://STOCKS.ORCL topic://STOCKS.JAVA 65.63 65.70 down Sending: {price=81.2186806051, stock=ORCL, offer=81.29989928577, up=true} on destination: topic://STOCKS.ORCL topic://STOCKS.IONA 80.51 80.59 down Sending: {price=65.48960846536, stock=CSCO, offer=65.5550980738, up=false} on destination: topic://CSCO topic://STOCKS.IONA 81.22 81.30 up topic://STOCKS.JAVA 65.49 65.56 down ...
如你所见,producer和consumer都打印他们的消息到基本output,如示例所运行的。
在该节,你使用了Spring来augment从第3章来的stock portfolio的示例程序。你能重新使用大部分的原始逻辑,但是这次你使用一些Spring来大大简化了示例。如前面陈述的,这个示例简单的touched on使用Spring JMS最基本的东西。如果你想知道关于Spring JMS的更多信息,请看相关文档( http://mng.bz/I0Pe)。