1 首先到官网下载activeMq http://activemq.apache.org/
下载压缩包,由于是做实验,暂时用简单的windows bin zip包
解压到根目录,在bin下,执行命令,可以用cmd或者双击点击activemq
访问 http://localhost:8161/ 用户名密码默认是admin /admin
这样代表消息服务器启动好了,当然暂时用的是内置服务器jetty
2 下面做个实验
利用IDE工具或者其他工具建2个类,一个是生产者类,一个是消费者类,内部有写注释
package activeMq; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * jms 发送代码 生产者 * @author zxg * @version $Id: Provider.java, v 0.1 2015年6月11日 下午4:42:33 zxg Exp $ */ public class Provider { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection =connectionFactory.createConnection(); connection.start(); /** 1、AUTO_ACKNOWLEDGE 是自动确认模式,不需客户端进行确认 * 2、CLIENT_ACKNOWLEDGE 客户端进行确认 3、DUPS_OK_ACKNOWLEDGE 允许重复消息, */ Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for(int i=0; i<3; i++) { MapMessage message = session.createMapMessage(); message.setLong("count", new Date().getTime()); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); } }
下面是 消费者类
package activeMq; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者 * @author Zxg * @version $Id: Consumer.java, v 0.1 2015年6月11日 下午5:03:44 zxg Exp $ */ public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i = 0 ; while(i<3){ i++; MapMessage message = (MapMessage)consumer.receive(); session.commit(); System.out.println("收到消息" + new Date(message.getLong("count"))); } session.close(); connection.close(); } }
3 首先执行生产者的main函数,结果可以这么看到
17:11:19.743 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport. 17:11:19.871 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected 17:11:19.872 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task 17:11:19.875 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616] 17:11:19.907 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting 0th connect to: tcp://localhost:61616 17:11:19.910 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.913 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established 17:11:19.913 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.913 [ActiveMQ Task-1] INFO o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600} 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 17:11:20.951 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 17:11:22.952 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-60887-1434013879767-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 17:11:22.952 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 syncCount: 0 17:11:22.960 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616 17:11:22.961 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
在看下消息服务器的结果
意思就是有3个入列,因为生产者就生产了三个消息。
然后我们执行下消费者的main函数,结果如下
18:12:47.827 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport. 18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected 18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task 18:12:47.956 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616] 18:12:47.992 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting 0th connect to: tcp://localhost:61616 18:12:47.996 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.997 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established 18:12:47.997 [ActiveMQ Task-1] INFO o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616 18:12:47.997 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 18:12:48.037 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 18:12:48.038 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 18:12:48.038 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 syncCount: 1 收到消息Thu Jun 11 17:11:19 CST 2015 18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 18:12:48.052 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 syncCount: 1 收到消息Thu Jun 11 17:11:20 CST 2015 18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 18:12:48.055 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 syncCount: 1 收到消息Thu Jun 11 17:11:21 CST 2015 18:12:48.059 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616 18:12:48.059 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
再看下消息服务器,刷新下队列
好了,大概知道意思了,以后细说细节