打开Eclipse
创建Maven工程
项目创建完毕 需要配置一下 pom.xml
在pom.xml中 添加ActiveMQ架包
扫描二维码关注公众号,回复:
13612720 查看本文章
添加一个com.tong文件夹 在里面创建一个名为HelloWordProducer的类
如果javax.jms 报错可以在pom.xml里面添加架包
编写消息的生产者
package com.tong;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Message;
import org.apache.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloWordProducer {
/***
* 生产消息
*/
public void sendHelloWordActiveMQ(String msgTest)
{
//定义链接工厂
ConnectionFactory connectionFactory=null;
//定义链接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息的发送者
MessageProducer producer=null;
//定义消息
Message messag=null;
try
{
/***
* username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
//创建连接对象
connection =connectionFactory.createConnection();
//启动连接对象
connection.start();
/**
* transacted:是否使用事务:true|false
* true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledge参数必须设置
*
* acknowledgeMode :
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
*/
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
destination=session.createQueue("helloword-destination");
//创建消息的生产者
producer=session.createProducer(destination);
//创建消息对象
messag =session.createTextMessage(msgTest);
//发送消息
producer.send(messag);
}catch(Exception e)
{
e.printStackTrace();
}finally
{
//回收消息发送者资源
if(producer!=null)
{
try
{
producer.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(session!=null)
{
try
{
session.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(connection!=null)
{
try
{
connection.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}
}
}
编写消息的消费者
创建新的Maven项目
pom.xml 文件和消息生产者 一致
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tong</groupId>
<artifactId>mq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- MQ -->
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
</project>
package com.tong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloworldConsumer {
/***
* 生产消息
*/
public void sendHelloWordActiveMQ()
{
//定义链接工厂
ConnectionFactory connectionFactory=null;
//定义链接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息的消费者
MessageConsumer consumer=null;
//定义消息
Message messag=null;
try
{
/***
* username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
//创建连接对象
connection =connectionFactory.createConnection();
//启动连接对象
connection.start();
/**
* transacted:是否使用事务:true|false
* true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledge参数必须设置
*
* acknowledgeMode :
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
*/
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
destination=session.createQueue("helloword-destination");
//创建消息的消费者
consumer=session.createConsumer(destination);
//创建消息对象
messag = consumer.receive();
//处理消息
String msg= ((TextMessage)messag).getText();
System.out.println("从ActiveMQ服务中获取文本信息"+msg);
}catch(Exception e)
{
e.printStackTrace();
}finally
{
//回收消息发送者资源
if(consumer!=null)
{
try
{
consumer.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(session!=null)
{
try
{
session.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(connection!=null)
{
try
{
connection.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}
}
}
创建Test.java当启动类
package com.tong;
import com.tong.HelloWordProducer;
public class Test {
public static void main(String[] args)
{
HelloWordProducer producer=new HelloWordProducer();
producer.sendHelloWordActiveMQ("HelloWord");
}
}
处理对象消息
关键必须实现 Serializable 接口
package com.tong;
import java.io.Serializable;
public class Users implements Serializable {
private int userid;
private String username;
private int userage;
@Override
public String toString() {
return "Users [userid=" + userid + ", username=" + username + ", userage=" + userage + "]";
}
public int getUserid() {
return userid;
}
public void setUserid(int userid) {
this.userid = userid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getUserage() {
return userage;
}
public void setUserage(int userage) {
this.userage = userage;
}
}
生产者
package com.tong;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Message;
import org.apache.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloWordProducer2 {
/***
* 生产消息
*/
public void sendHelloWordActiveMQ(Users user)
{
//定义链接工厂
ConnectionFactory connectionFactory=null;
//定义链接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息的发送者
MessageProducer producer=null;
//定义消息
Message messag=null;
try
{
/***
* username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
//创建连接对象
connection =connectionFactory.createConnection();
//启动连接对象
connection.start();
/**
* transacted:是否使用事务:true|false
* true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledge参数必须设置
*
* acknowledgeMode :
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
*/
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
destination=session.createQueue("my-users");
//创建消息的生产者
producer=session.createProducer(destination);
//创建消息对象
messag =session.createObjectMessage(user);
//发送消息
producer.send(messag);
}catch(Exception e)
{
e.printStackTrace();
}finally
{
//回收消息发送者资源
if(producer!=null)
{
try
{
producer.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(session!=null)
{
try
{
session.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(connection!=null)
{
try
{
connection.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}
}
}
消费者
package com.tong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class HelloworldConsumer2 {
/***
* 生产消息
*/
public void sendHelloWordActiveMQ()
{
//定义链接工厂
ConnectionFactory connectionFactory=null;
//定义链接对象
Connection connection=null;
//定义会话
Session session=null;
//定义目的地
Destination destination=null;
//定义消息的消费者
MessageConsumer consumer=null;
//定义消息
Message messag=null;
try
{
/***
* username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
* brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
*/
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
//创建连接对象
connection =connectionFactory.createConnection();
//启动连接对象
connection.start();
/**
* transacted:是否使用事务:true|false
* true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
* false:不适用事务,设置次变量 则acknowledge参数必须设置
*
* acknowledgeMode :
* Session.AUTO_ACKNOWLEDGE:自动消息确认机制
* Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
*/
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
destination=session.createQueue("my-users");
//创建消息的消费者
consumer=session.createConsumer(destination);
//创建消息对象
messag = consumer.receive();
//处理消息
ObjectMessage objMessage=(ObjectMessage) messag;
Users users=(Users)objMessage.getObject();
System.out.println(users);
}catch(Exception e)
{
e.printStackTrace();
}finally
{
//回收消息发送者资源
if(consumer!=null)
{
try
{
consumer.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(session!=null)
{
try
{
session.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
if(connection!=null)
{
try
{
connection.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}
}
}
测试启动类
package com.tong;
import com.tong.HelloWordProducer;
public class Test {
public static void main(String[] args)
{
// HelloWordProducer producer=new HelloWordProducer();
// producer.sendHelloWordActiveMQ("HelloWord");
Users users=new Users();
users.setUserid(1);
users.setUsername("童小纯");
users.setUserage(20);
HelloWordProducer2 producer=new HelloWordProducer2();
producer.sendHelloWordActiveMQ(users);
}
}