新手刚刚学习ActiveMQ,从网上找了很多案例,查阅请教之后总结,接收端监听有些小问题,使用线程来接收,希望可以多多评论互相学习。如有相同请联系删除。
项目A发送消息端:
// TODO Auto-generated method stub
ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS
// Provider 的连接
Connection connection = null; // Session: 一个发送或接收消息的线程
Session session; // Destination :消息的目的地;消息发送给谁.
Destination destination; // MessageProducer:消息发送者
MessageProducer producer; // TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try { // 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session
.createObjectMessage();
msg.setObject((Serializable) user);
//将对象发送
producer.send(msg);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
项目B 接受对象:
因为使用监听一直触发不了,没有解决,所以使用项目运行时开启线程持续接收消息。
开启线程:
public class MyListener implements ServletContextListener { private MyThread myThread; public void contextDestroyed(ServletContextEvent e) { if (myThread != null && myThread.isInterrupted()) { myThread.interrupt(); } } public void contextInitialized(ServletContextEvent e) { String str = null; if (str == null && myThread == null) { myThread = new MyThread(); myThread.start(); // servlet 上下文初始化时启动 socket } } }
线程开启接收消息:
public class MyThread extends Thread {
public void run() {
while (!this.isInterrupted()) {// 线程未中断执行循环
try {
Thread.sleep(200); // 每隔2000ms执行一次
} catch (InterruptedException e) {
e.printStackTrace();
}
// ------------------ 开始执行 ---------------------------
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
User user;
try {
user = (User) ((ObjectMessage) message).getObject();
if (message != null) {
// User s = (User) message.getObject();
System.out.println("收到的消息对象:"
+ user.getLoginName());
user.setCreateBy(new User("1"));
user.setUpdateBy(new User("1"));
//使用getBean方式获取Bean 因注解方式扫描不到.
SystemService systemService = (SystemService)ApplicationContextHandle.getBean("systemService");
systemService.saveUser(user);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
}
配置Web.xml 配置线程监听。项目开启时启动线程
<listener> <listener-class>com.thinkgem.jeesite.modules.test.web.MyListener</listener-class> </listener>