JMS介绍
JMS是java消息服务应用的接口,是java平台面向消息中间件的一套规范的java API接口,用于两个应用程序之间或分布式系统中发消息,进行异步通信。
简单来说,JMS和消息中间件的关系类似于JDBC与数据库的关系
JMS使用(以Active为例)
1.Maven依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9 </version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.4</version>
</dependency>
2.启动MQ
public class StaterMQ {
public static void main(String[] args) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("EmbedMQ");
brokerService.addConnector("tcp://localhost:6000");
brokerService.setManagementContext(new ManagementContext());
brokerService.start();
}
}
3.生产者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer {
// 默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认连接地址
private static final String BROKEURL = "tcp://localhost:6000";
// 发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer messageProducer;
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("EmbedMQ");
messageProducer = session.createProducer(destination);
for (int i = 0; i < SENDNUM; i++) {
String msg = " 发送" + i + " " + System.currentTimeMillis();
TextMessage message = session.createTextMessage(msg);
System.out.println("发送消息:" + msg);
messageProducer.send(message);
}
session.commit();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4.消费者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
private static final String BROKEURL = "tcp://localhost:6000";// 默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 会话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME, JmsConsumer.PASSWORD,
JmsConsumer.BROKEURL);
try {
// 通过连接工厂获取连接
connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个连接HelloWorld的消息队列
destination = session.createTopic("EmbedMQ");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("Accept msg : " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}