apache-activemq-5.10.0-bin.tar.gz,下载地址http://activemq.apache.org/download.html
2.解压文件到运行目录
[root@iZ94wmbxqzyZ softs]# tar -xzvf /server/apache-activemq-5.10.0-bin.tar.gz
3.为了方便管理,重命名
[root@iZ94wmbxqzyZ softs]# mv apache-activemq-5.10.0 activemq-5.10.0 [root@iZ94wmbxqzyZ softs]# cd activemq-5.10.0/ [root@iZ94wmbxqzyZ activemq-5.10.0]# ll total 6304 -rwxr-xr-x 1 root root 6371237 Jun 5 2014 activemq-all-5.10.0.jar drwxr-xr-x 5 root root 4096 Jan 11 23:31 bin drwxr-xr-x 2 root root 4096 Jan 11 23:31 conf drwxr-xr-x 2 root root 4096 Jan 11 23:31 data drwxr-xr-x 2 root root 4096 Jan 11 23:31 docs drwxr-xr-x 8 root root 4096 Jan 11 23:31 examples drwxr-xr-x 6 root root 4096 Jan 11 23:31 lib -rw-r--r-- 1 root root 40580 Jun 5 2014 LICENSE -rw-r--r-- 1 root root 3334 Jun 5 2014 NOTICE -rw-r--r-- 1 root root 2610 Jun 5 2014 README.txt drwxr-xr-x 7 root root 4096 Jan 11 23:31 webapps drwxr-xr-x 3 root root 4096 Jan 11 23:31 webapps-demo [root@iZ94wmbxqzyZ activemq-5.10.0]# cd bin/ [root@iZ94wmbxqzyZ bin]# ll total 152 -rwxr-xr-x 1 root root 22126 Jun 5 2014 activemq -rwxr-xr-x 1 root root 5665 Jun 5 2014 activemq-admin -rw-r--r-- 1 root root 15954 Jun 5 2014 activemq.jar -rwxr-xr-x 1 root root 6189 Jun 5 2014 diag drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-32 drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-64 drwxr-xr-x 2 root root 4096 Jan 11 23:31 macosx -rwxr-xr-x 1 root root 83820 Jun 5 2014 wrapper.jar
4.启动服务
[root@iZ94wmbxqzyZ bin]# ./activemq start INFO: Using default configuration (you can configure options in one of these file: /etc/default/activemq /root/.activemqrc) INFO: Invoke the following command to create a configuration file ./activemq setup [ /etc/default/activemq | /root/.activemqrc ] INFO: Using java '/softs/jdk1.6.0_30/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/softs/activemq-5.10.0/data/activemq-iZ94wmbxqzyZ.pid' (pid '28962')
5.查看是否启动成功
[root@iZ94wmbxqzyZ bin]# [root@iZ94wmbxqzyZ bin]# ps -ef | grep activemq root 28962 1 32 23:32 pts/0 00:00:04 /softs/jdk1.6.0_30/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/softs/activemq-5.10.0/conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/softs/activemq-5.10.0/tmp -Dactivemq.classpath=/softs/activemq-5.10.0/conf; -Dactivemq.home=/softs/activemq-5.10.0 -Dactivemq.base=/softs/activemq-5.10.0 -Dactivemq.conf=/softs/activemq-5.10.0/conf -Dactivemq.data=/softs/activemq-5.10.0/data -jar /softs/activemq-5.10.0/bin/activemq.jar start root 29011 28898 0 23:32 pts/0 00:00:00 grep activemq [root@iZ94wmbxqzyZ bin]# [root@iZ94wmbxqzyZ bin]#
6.停止服务
[root@iZ94wmbxqzyZ data]# [root@iZ94wmbxqzyZ data]# kill 28962 [root@iZ94wmbxqzyZ data]# [root@iZ94wmbxqzyZ data]# ps -ef | grep activemq root 29078 28898 0 23:42 pts/0 00:00:00 grep activemq [root@iZ94wmbxqzyZ data]#
到此环境准备成功
demo应用
package com.wzh.activemq; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = 1L; private String username ; private String password ; public User(String username,String password){ this.username = username ; this.password = password ; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { // TODO Auto-generated method stub return "[username="+username+",password="+password+"]" ; } }
----------------------------------------------------------
点对点:
生产者:
package com.wzh.activemq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class P2PMessageProducer { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { try { new P2PMessageProducer().sendObjectMessage(new User("wzh","q123456")); new P2PMessageProducer().sendMapMessage(); new P2PMessageProducer().sendTextMessage("海,你好"); } catch (Exception e) { e.printStackTrace(); } } public P2PMessageProducer() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } /** * 初始化连接信息 */ public P2PMessageProducer(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } /** * 关闭连接 */ public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void sendObjectMessage(Serializable serializable) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createObjectMessage(serializable); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendTextMessage(String text) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createTextMessage(text); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendMapMessage() throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage message = session.createMapMessage(); message.setString("stock", "string"); message.setDouble("price", 11.14); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } }
消费者:
package com.wzh.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class P2PMessageConsumer { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { P2PMessageConsumer consumer = new P2PMessageConsumer(); consumer.receiveMessage(); } public P2PMessageConsumer() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public P2PMessageConsumer(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void receiveMessage() { Session session = null; try { session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageConsumer consumer = session.createConsumer(destination); while (true) { Message message = consumer.receive(); if (null != message) { if (message instanceof ObjectMessage) { System.out.println("deal ObjectMessage...."); dealObjectMessage((ObjectMessage) message); } else if (message instanceof MapMessage) { System.out.println("deal MapMessage...."); dealMapMessage((MapMessage) message); } else if (message instanceof TextMessage) { System.out.println("deal TextMessage...."); dealTextMessage((TextMessage) message); } } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (session != null) { try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * * 处理 TextMessage消息 * * @throws JMSException */ private void dealTextMessage(TextMessage message) throws JMSException { String text = message.getText(); System.out.println("text = " + text); } /** * * 处理 MapMessage消息 * * @throws JMSException */ private void dealMapMessage(MapMessage message) throws JMSException { String stack = message.getString("stock"); Double price = message.getDouble("price"); System.out.println("stock = " + stack + " , price =" + price); } /** * 处理ObjectMessage消息 */ private void dealObjectMessage(ObjectMessage message) throws JMSException { User user = (User) message.getObject(); System.out.println(user.toString()); } }
运行结果:
deal ObjectMessage.... [username=wzh,password=q123456] deal MapMessage.... stock = string , price =11.14 deal TextMessage.... text = 海,你好
===========================================================================
发布与订阅:
-----------------------
消息发布者
package com.wzh.activemq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { try { new Publish().sendObjectMessage(new User("wzh","q123456")); new Publish().sendMapMessage(); new Publish().sendTextMessage("海,你好"); } catch (Exception e) { e.printStackTrace(); } } public Publish() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public Publish(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void sendObjectMessage(Serializable serializable) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createObjectMessage(serializable); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendTextMessage(String text) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createTextMessage(text); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendMapMessage() throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage message = session.createMapMessage(); message.setString("stock", "string"); message.setDouble("price", 11.14); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } }
消息订阅者:
package com.wzh.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Subscriber { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { Subscriber consumer = new Subscriber(); consumer.receiveMessage(); } public Subscriber() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public Subscriber(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void receiveMessage() { Session session = null; try { session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message instanceof ObjectMessage) { System.out.println("deal ObjectMessage...."); dealObjectMessage((ObjectMessage) message); } else if (message instanceof MapMessage) { System.out.println("deal MapMessage...."); dealMapMessage((MapMessage) message); } else if (message instanceof TextMessage) { System.out.println("deal TextMessage...."); dealTextMessage((TextMessage) message); } } }) ; } catch (Exception e) { e.printStackTrace(); } finally { /*if (session != null) { try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } }*/ } } /** * * 处理 TextMessage消息 * * @throws JMSException */ private void dealTextMessage(TextMessage message) { try { String text = message.getText(); System.out.println("text = " + text); } catch (JMSException e) { e.printStackTrace(); } } /** * * 处理 MapMessage消息 * * @throws JMSException */ private void dealMapMessage(MapMessage message){ try { String stack = message.getString("stock"); Double price = message.getDouble("price"); System.out.println("stock = " + stack + " , price =" + price); } catch (JMSException e) { e.printStackTrace(); } } /** * 处理ObjectMessage消息 */ private void dealObjectMessage(ObjectMessage message){ try { User user = (User) message.getObject(); System.out.println(user.toString()); } catch (JMSException e) { e.printStackTrace(); } } }