一、下载ActiveMQ:http://activemq.apache.org/download-archives.html,此处以window系统为例。
二、解压,得到文件如图:
bin:二进制文件目录,在里边寻找对应的activemq.bat启动activeMQ;
conf:配置文件项;
data:存放日志、临时数据等;
三、启动ActiveMQ:
使用bin目录下activemq.bat启动,启动成功后效果如图:
在浏览器输入:localhost:8161/admin 或者127.0.0.1::8161/admin,查看admin页面效果(不同的MQ版本,界面有细微差别):
四、入门小demo
首先引入对应的依赖包:
<!-- 配置JMS依赖 -->
<!-- https://mvnrepository.com/artifact/org.glassfish.main.javaee-api/javax.jms -->
<dependency>
<groupId>org.glassfish.main.javaee-api</groupId>
<artifactId>javax.jms</artifactId>
<version>3.1.2.2</version>
</dependency>
<!-- 配置MQ依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.2</version>
</dependency>
消息生产者Sender:
public class Sender implements Runnable{
public boolean flag = true;
public void setFlag(boolean flag) {
this.flag = flag;
}
//环境准备
private Connection getConnection(String name,String password,String address) throws Exception{
//建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://"+address);
//通过工厂建立连接
Connection connection = connectionFactory.createConnection();
return connection;
}
public void run() {
Sender sender = null;
Connection connection = null;
MessageProducer producer = null;
try {
sender = new Sender();
connection = sender.getConnection("admin", "admin", "127.0.0.1:61616");
connection.start();
//获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列
Destination destination = session.createQueue("firstQueue");
//创建生产者
producer = session.createProducer(null);
//创建消息
TextMessage msg = null;
int index = 0;
while(flag) {
msg = session.createTextMessage("我是第"+index+++"条信息内容");
//此处可以设定参数:目标地址、具体的参数信息、传达数据的模式、优先级、消息过期时间
//在发送数据时指定队列(Destination)
producer.send(destination, msg);
//休眠一秒
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭连接
try {
if(connection != null) {
connection.close();
}else {
throw new RuntimeException("哈哈哈");
}
}catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消息消费者Receiver:
public class Receiver implements Runnable {
//环境准备
private Connection getConnection(String name,String password,String address) throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://"+address);
//通过工厂建立连接
Connection connection = connectionFactory.createConnection();
return connection;
}
public void run() {
TextMessage msg = null;
Connection connection = null;
try {
connection = getConnection("admin","admin","127.0.0.1:61616");
connection.start();
//获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列
Destination destination = session.createQueue("firstQueue");
//创建MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
while(true) {
//可以设置receive参数: 等/不等/等具体时间后不等
msg = (TextMessage) consumer.receive();
System.out.println("获取的数据为:"+msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动:
/**
* Hello world!
* created by txd 20190115
*/
public class App {
public static void main(String[] args) {
Sender s1 = new Sender();
Thread sender = new Thread(s1);
sender.setName("sender");
Thread receiver = new Thread(new Receiver());
receiver.setName("receiver");
//启动producer
sender.start();
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动消费者
receiver.start();
Random random = new Random(System.currentTimeMillis());
while(true) {
if(random.nextInt(100) > 50) {
s1.setFlag(false);
}
}
}
}
前往浏览器查看效果:
可见,小程序正在健康的运行着...