ActiveMQ使用手册
1.JMS(Java Message Service) 是什么
它是一种与厂商无关的API,用来访问消息收发系统消息。它类似于JDBC,JDBC是可以用来访问不同关系数据库的API,而JMS则提供同样与厂商无关的访问消息收发服务的方法,这样就可以通过消息收发服务实现从一个JMS客户机向另一个JMS客户机发送消息,所需要的是厂商支持JMS。换句话说,JMS是Java平台上有关面向消息中间件的技术规范。
ActiveMQ是由Apache出品的,一款最流行的,
能力强劲的消息中间件(MOM:Message Orient middleware)。并且是对消息通信规范JMS的一种具体实现
2.简述JMS的基本原理
一.JMS包括以下基本构件:
1)连接工厂,是客户用来创建连接的对象,ActiveMQ提供的是ActiveMQConnectionFactory;
2)连接connection;
3)会话session,是发送和接收消息的上下文,用于创建消息生产者,消息消费者,相比rocketMQ会话session是提供事务性的;
4)目的地destination,指定生产消息的目的地和消费消息的来源对象;
生产者、消费者,由会话创建的对象,顾名思义。
二.消息通信机制
1)点对点模式(p2p),每个消息只有1个消费者,它的目的地称为queue队列;
2)发布/订阅模式,每个消息可以有多个消费者,而且订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
3)消息确认机制
Session.AUTO_ACKNOWLEDGE,直接使用receive方法。
Session.CLIENT_ACKNOWLEDGE,通过消息的acknowledge 方法确认消息。
Session.DUPS_ACKNOWLEDGE,该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置为true。
3. ActiveMQ的引言
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的消息中间件(MOM:Message Orient middleware)。并且是对消息通信规范JMS的一种具体实现
4. ActiveMQ的特点
- 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;
- 负责建立网络通信的通道,进行数据的可靠传送。
- 保证数据不重发,不丢失
- 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
5.ActiveMQ的安装
- 下载ActiveMQ,本次安装采用linux版本的安装,下载对应的tar包apache-activemq-5.15.0-bin.tar.gz,官方网站:http://activemq.apache.org/
- 解压对应的tar包 指令tar -zxvf apache-activemq-5.15.0-bin.tar.gz
- 解压后的目录
4.更改配置
然后在/usr/local/soft/apache-activemq-5.12.1/bin下有一个env文件,编辑此文件设置对应的jdk
目录
5.启动ActiveMQ,进入对应的bin目录执行 ./activemq start
6.web监控,ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 admin:http://127.0.0.1:8161/admin/,用户名和密码都是admin。但是ActiveMQ默认使用的TCP连接端口是61616
6.ActiveMQ的点对点开发步骤
1).点对点消息生产者的开发步骤
//1. 创建JMS连接工厂 参数1:用户名 参数二:密码 参数三:url 默认"failover://tcp://139.199.77.17:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://tcp://localhost:61616");
//2.通过ConnectionFactory创建对应connection
Connection connection = connectionFactory.createConnection();
//3.根据connection创建会话对 参数1:开启事务true 参数2:是消息确认机制属性
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地
Destination destination = session.createQueue("queue");
//5.创建消息生产者
MessageProducer producer = session.createProducer(destination);
//6.创建消息
TextMessage message = session.createTextMessage("这是一个消息");
//7.通过生产者发送消息到activeMQ
producer.send(message);
//8.会话提交
Session.commit();
//9.资源关闭
connection.close();
session.close();
2) .点对点消息消费者的开发步骤
//1. 创建JMS连接工厂 参数1:用户名 参数二:密码 参数三:url 默认"failover://tcp://139.199.77.17:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://tcp://localhost:61616");
//2.通过ConnectionFactory创建对应connection 并且启动connection
Connection connection = connectionFactory.createConnection();
connection.start(); // 启动连接
//3.根据connection创建会话对 参数1:开启事务true 参数2:是消息确认机制属性 第一个参数是是说明第二个参数是否生效
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建消息目的地
Destination destination = session.createQueue("queue");
//5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(destination);
//6.取出消息并打印
while (true) {
Message message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
if (textMessage != null) {
System.out.println(textMessage.getText());
}else{
break;
}
}
7.ActiveMQ的消息类型
JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种。接下来的代码就是各个消息类型的生产和消费。
1)五种消息类型的生产
try {
//文本消息
if (messageType.getName().equals(MessageType.TEXT.getName())) {
TextMessage textMessage = session.createTextMessage("文本消息");
producer.send(textMessage);
} else if (messageType.getName().equals(MessageType.MAP.getName())) {
//键值对消息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("age", new Long(32));
mapMessage.setDouble("sarray1", new Double(5867.15));
mapMessage.setString("username", "键值对消息");
producer.send(mapMessage);
} else if (messageType.getName().equals(MessageType.STREAM.getName())) {
//流消息
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("streamMessage流消息");
streamMessage.writeLong(55);
producer.send(streamMessage);
} else if (messageType.getName().equals(MessageType.BYTE.getName())) {
//字节消息
String s = "BytesMessage字节消息";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(s.getBytes());
producer.send(bytesMessage);
} else {
User user = new User("cjm1", "对象消息", "1"); //User对象必须实现Serializable接口
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(user);
producer.send(objectMessage);
}
} catch (JMSException e) {
e.printStackTrace();
}
2)五种消息类型的消费
try {
if (m instanceof TextMessage) {
//接收文本消息
TextMessage message = (TextMessage) m;
System.out.println(message.getText());
} else if (m instanceof MapMessage) {
//接收键值对消息
MapMessage message = (MapMessage) m;
System.out.println(message.getLong("age"));
System.out.println(message.getDouble("sarray1"));
System.out.println(message.getString("username"));
} else if (m instanceof StreamMessage) {
//接收流消息
StreamMessage message = (StreamMessage) m;
System.out.println(message.readString());
System.out.println(message.readLong());
} else if (m instanceof BytesMessage) {
//接收字节消息
byte[] b = new byte[1024];
int len = -1;
BytesMessage message = (BytesMessage) m;
while ((len = message.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
} else if (m instanceof ObjectMessage) {
//接收对象消息1
ObjectMessage message = (ObjectMessage) m;
User user = (User) message.getObject();
System.out.println(user.getName() + " _ " + user.getPassword());
} else {
System.out.println(m);
}
} catch (JMSException e) {
e.printStackTrace();
}
8. ActiveMQ发布订阅模式
1) .生产者相关的开发,注意:发布订阅与点对点唯一的区别,就是点对点需要指定消息存放在哪个队列里,而发布订阅的模式只需要指定该消息的TOPIC即可。
2) .发布订阅模式生产者的相关代码
//1. 创建JMS连接工厂 参数1:用户名 参数二:密码 参数三:url 默认"failover://tcp://139.199.77.17:61616"
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://tcp://localhost:61616");
//2.通过ConnectionFactory创建对应connection
Connection connection = connectionFactory.createConnection();
//3.根据connection创建会话对 参数1:开启事务true 参数2:是消息确认机制属性
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建主题
Destination destination = session.createTopic("FirstTopic");
//5.创建消息生产者并指定主题
MessageProducer producer = session.createProducer(destination);
//6.创建消息
TextMessage message = session.createTextMessage("这是一个消息");
//7.通过生产者发送消息到activeMQ
producer.send(message);
//8.会话提交
Session.commit();
//9.资源关闭
connection.close();
session.close();
3) .发布订阅消费者相关代码
//1. 创建JMS连接工厂 参数1:用户名 参数二:密码 参数三:url 默认"failover://tcp://139.199.77.17:61616"
TopicConnectionFactory cf= new ActiveMQConnectionFactory("failover://tcp://localhost:61616");
//2.通过ConnectionFactory创建对应connection 并且启动connection
TopicConnection topicConnection = cf.createConnection();
topicConnection .start(); // 启动连接
//3.根据connection创建会话对 参数1:开启事务true 参数2:是消息确认机制属性 第一个参数是是说明第二个参数是否生效
TopicSession topicSession = topicConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//4.创建消息主题
Topic firstTopic = topicSession .createTopic("FirstTopic");
//5.创建消息的订阅者根据主题
TopicSubscriber subscriber = session.createSubscriber(firstTopic);
//6.取出消息并打印
while (true) {
Message message = subscriber.receive();
TextMessage textMessage = (TextMessage) message;
if (textMessage != null) {
System.out.println(textMessage.getText());
}else{
break;
}
}