一、通配符式订阅:
Wildcards 用来支持联合的名字分层体系(federated name hierarchies)。它不是JMS 规范的一部分,而是ActiveMQ 的扩展。ActiveMQ 支持以下三种
1、"." 用于作为路径上名字间的分隔符。 如:AAA.bbb.CCC 2、"*" 用于单级地匹配路径上的任何名字。 如:AAA.*.CCC 或 *.bbb.* 或 *.*.* 3、">" 用于多级地匹配路径上的任何名字但是不能以 > 开始。 如:AAA.>.* 或 AAA.> 或 AAA.bbb.>
通配符式订阅,用于消费者绑定的目的地,不能用于生产者。
测试代码如下:启动的本机的ActiveMQ,连接默认的ip地址
1.工具类:
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQUtil {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static Connection getActiveMQConnection() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL);
Connection connection=null;
try {
connection = factory.createConnection();
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
return connection;
}
public static void close(Connection... conn) {
for(int i=0;i<conn.length;i++) {
Connection c = conn[i];
if(c != null) {
try {
c.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.生产者端:
import java.util.HashMap;
import javax.jms.*;
public class JmsWildcardProducterA {
private static final String DES_ANY = "begin.lqy.mq.any";
private static final String DES_VIP = "begin.lqy.mq.vip";
private static final String DES_REDIS = "begin.lqy.mq.redis";
private static HashMap<String,String> map = new HashMap<String,String>();
static{
map.put(DES_ANY, "这个消息能被所有消费者收到");
map.put(DES_VIP, "这个消息能被VIP消费者收到");
map.put(DES_REDIS, "这个消息能被REDIS消费者收到");
}
public static void main(String[] args) throws JMSException {
Connection connection = ActiveMQUtil.getActiveMQConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination topic1 = session.createTopic(DES_ANY);
Destination topic2 = session.createTopic(DES_VIP);
Destination topic3 = session.createTopic(DES_REDIS);
MessageProducer producer1 = session.createProducer(topic1);
MessageProducer producer2 = session.createProducer(topic2);
MessageProducer producer3 = session.createProducer(topic3);
TextMessage message1 = session.createTextMessage(map.get(DES_ANY));
TextMessage message2 = session.createTextMessage(map.get(DES_VIP));
TextMessage message3 = session.createTextMessage(map.get(DES_REDIS));
producer1.send(message1);
producer2.send(message2);
producer3.send(message3);
System.out.println("发送any");
ActiveMQUtil.close(connection);
}
}
3.消费者端:通配符自己尝试
import javax.jms.*;
public class JmsWildcardConsumerA {
private static final String DES_ANY = "begin.lqy.*.*";
public static void main(String[] args) throws JMSException {
Connection connection = ActiveMQUtil.getActiveMQConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination topicA = session.createTopic(DES_ANY);
MessageConsumer consumerA = session.createConsumer(topicA);
consumerA.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String msg = ((TextMessage)message).getText();
System.out.println(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
二:死信队列:
用来保存处理失败或者过期的消息,死信队列本质上也是普通队列,用来存放消息
如果不进行配置,ActiveMQ有默认的死信队列,当消费者接收消息时发生异常,没有成功消费,根据重发策略进行重发,如果消息还是没有消费成功,则这条消息就会进入到死信队列,也可以手动配置死信队列,在conf/activeMQ.xml中进行配置,如下,在policyEntry节点下添加deadLetterStrategy标签并配置属性,topic模式同样
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!--
queuePrefix:设置死信队列前缀
useQueueForQueueMessages:设置使用queue来保存死信
可以设置useQueueForTopicMessages,使用topic保存死信
-->
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DEL." useQueueForTopicMessages="true"/>
</deadLetterStrategy>
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
这里配置的是queue模式,死信队列的前缀 DEL 表示,则测试代码如下。
生产者端不需要改动:
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.enjoy.wildcards.ActiveMQUtil;
public class DqProducter {
public static void main(String[] args) throws JMSException {
Connection connection = ActiveMQUtil.getActiveMQConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination des = session.createQueue("test.dq");
MessageProducer producer = session.createProducer(des);
TextMessage message = session.createTextMessage("死信队列的内容");
producer.send(message);
ActiveMQUtil.close(connection);
}
}
消费者端进行代码如下:因为死信队列属于ActiveMQ对jms规范的扩展,所以有的类不能用接口方式声明,需要用其子类比如连接需要用ActiveMQConnection声明,队列也要用ActiveMQDestination声明
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import com.enjoy.wildcards.ActiveMQUtil;
public class DqConsumer {
public static void main(String[] args) {
Connection conn = ActiveMQUtil.getActiveMQConnection();
try {
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("test.dq");
//配置消费者端的重发策略
RedeliveryPolicy rp = new RedeliveryPolicy();//创建配置策略的实例
rp.setMaximumRedeliveries(1);//修改重发次数
//拿到消费者端配置重发策略的map
RedeliveryPolicyMap map = ((ActiveMQConnection)conn).getRedeliveryPolicyMap();
map.put(queue, rp);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
String msg = ((TextMessage)message).getText();
System.out.println("收到消息:"+msg);
} catch (JMSException e) {
}
throw new RuntimeException("test");
}
});
} catch (Exception e) {
}
}
}
在接收消息时手动抛出异常,则重新发送一次后,这条消息没有成功处理,进入死信队列,如下图
看到自定义死信队列中有了这个消息,如果死信队列不处理,就会越堆积越多,导致性能下降,可以定义死信队列消费者进行处理死信队列,消费者代码不用改,只需要将消息的目的地改成定义的死信队列前缀的就可以,如下
原消费者中:
ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("test.dq");
改成
ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("DEL.test.dq");
或者
ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("DEL.>");
加一个自定义前缀,就可以消费死信队列中的死信消息