http://activemq.apache.org/download-archives.html
选择自己选用版本。
这里本地测试选用window版本
解压zip文件,不做任何修改 /bin目录下有window启动文件
运行activemq.bat启动activemq服务
默认服务连接地址:tcp://localhost:61616
管理地址:http://localhost:8161
运行消息生产者代码:
@Test public void method1() throws Exception{ /* * 从工厂中获取一个jms connection。 * activmq默认连接端口是61616,在 conf/activemq.xml文件可查看配置 */ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); conn.start(); /* * 新建session * 参数一:是否有事物 * 参数二: * AUTO_ACKNOWLEDGE 完成消息接收时,session自动发送一个消息回执 * CLIENT_ACKNOWLEDGE 由客户端程序通过手工调用Message.acknowledge()方法显示确认接收 * DUPS_OK_ACKNOWLEDGE 让Session延迟发送确认回执 */ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个P2P的Queue Destination dest = session.createQueue("com.amq.test"); //创建一个消息生产者 MessageProducer producer = session.createProducer(dest); /* * NON_PERSISTENT 消息只传送一次 * PERSISTENT provider持久化消息,以保证消息不会因为JMS provider的失败而丢失 */ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //创建一个简单的文本消息 TextMessage message = session.createTextMessage("first message"); producer.send(message); //释放资源 producer.close(); session.close(); conn.close(); }
查看http://localhost:8161/admin/queues.jsp页面
发现有一个queue
第一列:queue名字
第二列:未处理消息数
第三列:消费者个数
第四列:入列消息数
第五列:出列消息数
运行消息消费者:
public class AMQConsumer implements Runnable { @Override public void run() { try { //获取连接 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个consumer Destination dest = session.createQueue("com.amq.test"); MessageConsumer consumer = session.createConsumer(dest); //接收消息 Message message = consumer.receive(1000); while(true){ if(message != null){ if(message instanceof TextMessage){ TextMessage tm = (TextMessage) message; System.out.println(tm.getText()); }else{ System.out.println(message); } } message = consumer.receive(1000); } } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { //线程方式,轮询监听 new Thread(new AMQConsumer()).start(); } }
会打印输出:first message
查看消息queue页
刚才生产消息已经被消费