jms整合activemq实现动态注册监听器
自定注解代替@JmsListener,目前项目里面没有使用@JmsListener注解,自己玩的时候又觉得挺方便的,于是自己实现一个简单的版本。本文为消费者端
思路解析:注册jms监听器,jms监听器依赖于连接池,为了避免bean的创建顺序问题决定在spring容器刷新完的时候执行。
->后置处理器处理注解
-> 监听器注册
项目依赖
<properties>
<spring.version>5.2.1.RELEASE</spring.version>
<activemq.version>5.15.10</activemq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!--<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<version>2.0.3</version>
</dependency>-->
<!-- ActiveMQ -->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-broker -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
</dependency>
<!-- 队列持久化 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<version>${activemq.version}</version>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
</dependency>
</dependencies>
注解定义
@EnableMyJms开启jms注解注册监听器
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EnableMyJmsListener.class)
public @interface EnableMyJms {
}
@MyJmsListener
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyJmsListener {
//目前就用到了两个属性,一个destination 表示队列的名称,一个jmsType 表示队列或者主题模式
String id() default "";
JmsType jmsType() default JmsType.Queue;
String destination();
String subscription() default "";
String selector() default "";
String concurrency() default "";
}
EnableMyJmsListener解析
/**
* BeanPostProcessor 吼住处理器用于收集带有@MyJmsListener的方法
* ApplicationContextAware 用于获取连接池
* ApplicationListener 监听容器事件,注册jms监听器
* @author authorZhao
* @date 2019/12/24
*/
public class EnableMyJmsListener implements BeanPostProcessor, ApplicationContextAware , ApplicationListener<ContextRefreshedEvent> {
private static final Logger logger = LoggerFactory.getLogger(EnableMyJmsListener.class);
private ApplicationContext applicationContext;
//需要注册jms监听器的方法的结合,用什么集合无所谓,这里只是为了熟悉一下CopyOnWriteArrayList集合
private List<JmsListenerEntity> jms = new CopyOnWriteArrayList<>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//1.判断bean有没有@MyJmsListener注解
// ->如果目标类是代理生成的没有@MyJmsListener注解,何解?
//2.注册监听器
// -> 需不需要交给spring管理
//获取正在初始化的bean
Class<?> targetClass = bean.getClass();
//判断有没有@MyJmsListener注解
List<Method> collect = Arrays.stream(targetClass.getMethods()).filter(s -> s.isAnnotationPresent(MyJmsListener.class)).collect(Collectors.toList());
if (collect==null||collect.size()<=0)return bean;
// Non-empty set of methods
collect.forEach(m->processJmsListener(m.getAnnotation(MyJmsListener.class), m, bean));
logger.info("正在为{}类注册jms监听器,需要注册的数量为{}",beanName,collect.size());
return bean;
}
//需要注册的方法存进集合
private void processJmsListener(MyJmsListener listener, Method method, Object bean) {
try{
Class<?>[] parameterTypes = method.getParameterTypes();
if(parameterTypes!=null && parameterTypes.length==1){
logger.info("添加JMS监听器定义");
JmsListenerEntity jmsListenerEntity = new JmsListenerEntity();
jmsListenerEntity.setMyJmsListener(listener);
jmsListenerEntity.setMethod(method);
jmsListenerEntity.setBean(bean);
jms.add(jmsListenerEntity);
}
}catch (Exception e){
logger.error("监听器创建失败");
}
}
/**
* 为了避免连接池没有,选择在容器刷新完毕的时候创建
* @param event
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(jms==null||jms.size()<=0)return;
jms.forEach(this::registListener);
}
//注册jms监听器
private void registListener(JmsListenerEntity jmsListenerEntity) {
CachingConnectionFactory cachingConnectionFactory = applicationContext.getBean(CachingConnectionFactory.class);
Object bean = jmsListenerEntity.getBean();
Method method = jmsListenerEntity.getMethod();
MyJmsListener myJmsListener = jmsListenerEntity.getMyJmsListener();
if(myJmsListener==null||bean==null||method==null)return;
JmsType jmsType = myJmsListener.jmsType();
String destinationName = myJmsListener.destination();
if(StringUtils.isBlank(destinationName)){
logger.error("{}的{}方法所监听的jms名称为空",bean.getClass().getName(),method.getName());
throw new RuntimeException("监听器创建失败");
}
try {
Connection conn = cachingConnectionFactory.createConnection();
Session session = conn.createSession(false, 1);
Destination destination;
switch (jmsType){
case Queue:
destination = session.createQueue(destinationName);break;
case Topic:
destination = session.createTopic(destinationName);break;
default:
destination = session.createQueue(destinationName);
}
MessageConsumer consumer = session.createConsumer(destination);
conn.start();
consumer.setMessageListener(new MyJmsListenerConsumer(bean,method));
}catch (Exception e){
throw new RuntimeException("jms监听器创建失败");
}
}
/**
* 判断哪些方法上面有注解@
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
}
MyJmsListenerConsumer自己的jms监听器实现类
/**
* @author authorZhao
* @date 2019/12/28
*/
public class MyJmsListenerConsumer implements MessageListener {
private Object bean;
private Method method;
//采用构造注入,通过反射执行方法
public MyJmsListenerConsumer(Object bean, Method method) {
this.bean = bean;
this.method = method;
}
private static final Logger log = LoggerFactory.getLogger(MyJmsListenerConsumer.class);
@Override
public void onMessage(Message m) {
Class<?>[] parameterTypes = method.getParameterTypes();
if(parameterTypes==null||parameterTypes.length!=1)return;
try {
if (m instanceof TextMessage) { //接收文本消息
TextMessage message = (TextMessage) m;
Object param = JSON.parseObject(message.getText(), parameterTypes[0]);
method.invoke(bean, param);
}else if (m instanceof ObjectMessage) { //接收对象消息
ObjectMessage message = (ObjectMessage) m;
method.invoke(bean, message.getObject());
} else {
log.warn("所发送消息不支持接受");
}
}catch (Exception e){
log.error("消息处理失败");
}
}
}
service
public class MqService2 {
private static final Logger logger = LoggerFactory.getLogger(MqService2.class);
@MyJmsListener(destination = "queuq22", jmsType = JmsType.Queue)
public void receiveStringQueue1(String msg) {
logger.info("====111===");
System.out.println(msg.toString());
}
@MyJmsListener(destination = "topic22",jmsType = JmsType.Topic)
public void receiveStringQueue2(String msg) {
logger.info("====222===");
System.out.println(msg.toString());
}
@MyJmsListener(destination = "topic22", jmsType = JmsType.Topic)
public void receiveStringQueue3(String msg) {
logger.info("====222第二个===");
System.out.println(msg.toString());
}
}
配置类
@Configuration
@Import(MqService2.class)
@EnableMyJms
public class MqConfig4 {
/**
* 连接工厂
* @return
*/
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
return activeMQConnectionFactory;
}
/**
* 连接池工厂 使用ActiveMQ连接工厂
* @param connectionFactory
* @return
*/
@Bean
public PooledConnectionFactory pooledConnectionFactory(ActiveMQConnectionFactory connectionFactory){
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(500);
return pooledConnectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(ActiveMQConnectionFactory connectionFactory){
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsTemplate jmsTemplate(CachingConnectionFactory cachingConnectionFactory){
return new JmsTemplate(cachingConnectionFactory);
}
}
测试
@Test
public void trest4() {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MqConfig4.class);
context.start();
logger.info("项目启动成功");
MqMsgDto mqMsgDto = new MqMsgDto();
mqMsgDto.setContent("测试");
int i = 1;
while (true){
}
}
结束,jms练习
github地址