版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_31301681/article/details/79776667
1.安装 ActiveMQ (Linux)
1.上传安装包(apache-activemq-5.15.3-bin.tar.gz)
2. 解压安装包 :tar xzf apache-activemq-5.15.3-bin.tar.gz
3.启动 activeMQ :
a)进入 $/ cdapache-activemq-5.15.3/lib
b)启动mq :./activemq start
4.在浏览器中输入:http://127.0.0.1:8161/admin/(默认用户密码:admin/admin)进入如下界面,表示 activeMQ 启动成功:
2.代码实现
a)生产者:
扫描二维码关注公众号,回复:
3804454 查看本文章
package com.wsy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息生产者
* @author wsy
*
*/
public class Producter {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的链接地址
private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);;
// JMS 客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start(); //开启连接
// 构架 发送消息回话
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字(将消息发送到指定的队列中)
Destination destination = session.createQueue("wsy");
//构建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置消息是否持久化 (一般都将消息设置成持久化,;理由:在高并发的情况下,保证消息一定被消费)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//发送消息
for(int i = 0 ; i < 5 ; i++ ){
//创建消息(根据自己的需要来创建需要的消息类型)
TextMessage message = session.createTextMessage("我是生产者,这是我生产的第" + i +"条消息!");
producer.send(message);
}
// 关闭会话
session.close();
//关闭连接
connection.close();
}
}
b)消费者:
package com.wsy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者
* @author Administrator
*
*/
public class Comsumer {
// 使用默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//使用默认用户密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// ActiveMQ 服务器地址
private static final String BROKEN_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);;
// 在连接池中,新建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
// 构建 连接会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字(将消息发送到指定的队列中)
Destination destination = session.createQueue("wsy");
// 构建 消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 消费者, 接收消息(消费消息)
boolean flag = true;
while(flag){
// 接收消息
TextMessage message = (TextMessage) consumer.receive();
if(message != null){
System.out.println(message.getText());
// 提交会话 (告知 MQ 消息已经被消费)
session.commit();
}else{
flag = false;
}
}
// 关闭会话
session.close();
// 关闭连接
connection.close();
}
}