版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lkx444368875/article/details/50454097
刚刚接触activemq,网上也有很多资料,需要花很多时间去整理资料和自我尝试,有的能成功,有的也可能是因为自己的步骤原因导致调试失败,所以特意总结了一下自己所学习到的知识,与大家分享一下.
activemq发送指定消息给指定的人:
1. 消费者代码
package com.clgg.job.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://localhost:61616)");
try {
// 得到连接对象
connection = connectionFactory.createConnection();
String clientId = "002";
<span style="white-space: pre;"> </span>//标识当前人
connection.setClientID(clientId);
System.out.println("==================>" + clientId);
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建Queue
destination = session.createQueue("FirstQueue");
<span style="white-space:pre"> </span>//创建消费者的时候,需要可以指定当前登录的标识
consumer = session.createConsumer(destination,"username='A'");
System.out.println("A");
while (true) {
// 设置接收者接收消息的时间,为了便于测试,这里定为100s
TextMessage message = (TextMessage) consumer.receive(1000000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
<pre name="code" class="java">2.生产者代码
package com.clgg.job.activemq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
private static final int SEND_NUMBER = 10;
public static void main(String[] args) {
long starTime = System.currentTimeMillis();
// ConnectionFactory :连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS客户端到JMS Provider的连接
Connection connection = null;
// Session:一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://localhost:61616,tcp://localhost:61618,tcp://localhost:61619)");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 获取session,FirstQueue是一个服务器的queue
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setTimeToLive(10000);
// 构造消息
sendMessage(session, producer);
// session.commit();
connection.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generatedcatch block
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
long Time = endTime - starTime;
System.out.println("=============>" + (Time/1000)+"秒");
}
}
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i);
System.out.println("发送消息:ActiveMQ发送的消息" + i);
<span style="white-space:pre"> </span>//发送给指定的用户
message.setStringProperty("username","A");
producer.send(message);
}
}
}
<span style="white-space:pre"> </span>当你消费者代码设定了指定标识之后,我试了一下好像是只接受生产者指定给你的消息.其实也就相当与一个过滤器,你消费者设定了指定接受的条件,就能接受到指定的消息;
<span style="white-space:pre"> </span>如果你要发送给两个以上的人,则多多创建几条消息多发几遍就行了,把那个接受的人指定一下就行了.
<span style="white-space:pre"> </span>TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i);
System.out.println("发送消息:ActiveMQ发送的消息" + i);
//发送给指定的用户
message.setStringProperty("username","A");
producer.send(message);
<span style="white-space:pre"> </span>TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i);
System.out.println("发送消息:ActiveMQ发送的消息" + i);
//发送给指定的用户
message.setStringProperty("username","B");
producer.send(message);
订阅消息(topic)也是一样的;不过topic默认是发所有的,而队列(queue)一对一的发送;如果你不设定接受者的条件的话,默认的顺序好像是你先注册的顺序来的;先注册先接受到消息;