ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1(Java消息服务)和J2EE 1.4规范的 JMS Provider实现。
- 1
- 2
1、ActiveMQ下载
http://activemq.apache.org
有windows和linux两个版本的,分为32和64位,具体按需求下载
2、解压、测试连接
将下载的安装包【apache-activemq-5.8.0-bin.zip】解压,有如下目录:
进入bin目录,双击运行【activemq.bat】文件,出现如下界面,说明服务已启动
通过访问http://localhost:8161/admin/可管理消息,登录时需要输入账户密码,默认admin\admin,界面如下:
通过点击菜单栏Queues进入队列管理界面,可创建一个队列
(1)Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
(2)Number Of Consumers 消费者 这个是消费者端的消费者数量
(3)Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
(4)Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量
3、在eclipse中配置activeMq
创建Java project
所需mq jar包在解压目录的lib中导入项目即可,创建两java类,一个消息发送方,一个接受方。
【消息发送发】:
import java.util.Random; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; public class SendMessage { private static final String url = "tcp://localhost:61616"; private static final String QUEUE_NAME = "bfp_mq"; public void sendMessage() throws JMSException { // JMS 客户端到JMSProvider 的连接 Connection connection = null; try { // 连接工厂,JMS 用它创建连接 // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); connection = (Connection)connectionFactory.createConnection(); // 启动连接 connection.start(); //Session:发送或接收消息的线程 // 获取session Session session = (Session) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地,消息发送到那个队列 Destination destination = session.createQueue(QUEUE_NAME); //MessageProducer:消息发送者(生产者) // 创建消息发送者 MessageProducer producer =session.createProducer(destination); // 设置是否持久化 //DeliveryMode.NON_PERSISTENT:不持久化 //DeliveryMode.PERSISTENT:持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); String msg = ""; int i = 0; do { msg = "第"+i + "次发送的消息:"+new Random(); TextMessage message = session.createTextMessage(msg); Thread.sleep(1000); // 发送消息到目的地方 producer.send(message); System.out.println("发送消息:" +msg); i++; } while (i<1000); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SendMessage sndMsg = new SendMessage(); try { sndMsg.sendMessage(); } catch (Exception ex) { System.out.println(ex.toString()); } }
【消息消费方】:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息接收类 * * @createTime:Apr 7, 2013 5:11:11 PM * @author:<a href="mailto:[email protected]">迷蝶</a> * @version:0.1 *@lastVersion: 0.1 * @updateTime: *@updateAuthor: <a href="mailto:[email protected]">迷蝶</a> * @changesSum: * */ public class ReceiveMessage { private static final String url = "tcp://localhost:61616"; private static final String QUEUE_NAME = "bfp_mq"; public void receiveMessage() { Connection connection = null; try{ try{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); connection= connectionFactory.createConnection(); }catch (Exception e) { System.out.println(e.toString()); } connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAME); //消息接收者,也就是消费者 MessageConsumer consumer = session.createConsumer(destination); consumeMessagesAndClose(connection,session, consumer); }catch (Exception e) { System.out.println(e.toString()); } } /** * 接收和关闭消息,如遇到消息内容为close则,关闭连接 * * @param connection JMS 客户端到JMSProvider 的连接 * @param session 发送或接收消息的线程 * @param consumer 消息接收对象 * @throws JMSException * @auther <ahref="mailto:[email protected]">迷蝶</a> * Apr 8, 2013 10:31:55 AM */ protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException { do{ Message message = consumer.receive(1000); if("close".equals(message)){ consumer.close(); session.close(); connection.close(); } if(message != null) { onMessage(message); } }while (true); } public void onMessage(Message message) { try{ if(message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Received:" + msg); } }catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ReceiveMessage rm = new ReceiveMessage(); rm.receiveMessage(); }
}
可运行代码,在消息管理界面查看待发送、进入队列、出队列的消息以及消费者个数。