访问"/getUser/1"路径时,加入队列异步查询用户id1的用户,
访问"/getUserSyc/1"路径时,加入队列同步查询用户id1的用户
源码地址https://download.csdn.net/download/weixin_41796956/11129066
ssm框架略
spring版本如下
<properties>
<spring.version>5.1.6.RELEASE</spring.version>
<mybatis.version>3.2.1</mybatis.version>
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
<mysql.version>5.1.35</mysql.version>
</properties>
pom添加依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.4.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
mq配置文件如下
mq.user=guest
mq.pwd=guest
mq.ip=127.0.0.1
mq.port=5672
#队列交换机名
exchange=TestExchange
#队列名
routeKey=testQueue
spring配置文件引入spring-rabbit.xml
<import resource="classpath:spring-rabbit.xml" />
spring-rabbit.xml内容如下
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!--生产者配置 -->
<!-- rabbitMQ创建连接类 -->
<bean class="com.sykj.util.RabbitMQ" />
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<property name="username" value="${mq.user}" />
<property name="password" value="${mq.pwd}" />
<property name="host" value="${mq.ip}" />
<property name="port" value="${mq.port}" />
</bean>
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="connectionFactory" />
</bean>
<!-- 创建rabbitTemplate 消息模板类 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<!-- <constructor-arg ref="connectionFactory"></constructor-arg> -->
<property name="replyTimeout" value="50000" />
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- bean 注入 -->
<bean class="com.sykj.util.SpringUtil" />
<!-- 消费者 配置 -->
<!-- 创建消息转换器为SimpleMessageConverter -->
<bean id="serializerMessageConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
<!-- 设置持久化的队列 -->
<bean id="queue" class="org.springframework.amqp.core.Queue">
<constructor-arg index="0" value="${routeKey}"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
<constructor-arg index="3" value="false"></constructor-arg>
</bean>
<!-- 1、<bean id="queue1" class="org.springframework.amqp.core.Queue"> -->
<!-- <constructor-arg index="0" value="${routeKey4}"></constructor-arg> -->
<!-- <constructor-arg index="1" value="true"></constructor-arg> -->
<!-- <constructor-arg index="2" value="false"></constructor-arg> -->
<!-- <constructor-arg index="3" value="false"></constructor-arg> -->
<!-- </bean> -->
<!--创建交换器的类型 并持久化 -->
<bean id="directExchange" class="org.springframework.amqp.core.DirectExchange">
<constructor-arg index="0" value="${exchange}"></constructor-arg>
<constructor-arg index="1" value="true"></constructor-arg>
<constructor-arg index="2" value="false"></constructor-arg>
</bean>
<util:map id="arguments"></util:map>
<!-- 绑定交换器、队列 -->
<bean id="binding" class="org.springframework.amqp.core.Binding">
<constructor-arg index="0" value="${routeKey}"></constructor-arg>
<constructor-arg index="1" value="QUEUE"></constructor-arg>
<constructor-arg index="2" value="${exchange}"></constructor-arg>
<constructor-arg index="3" value="${routeKey}"></constructor-arg>
<constructor-arg index="4" value="#{arguments}"></constructor-arg>
</bean>
<!-- 2、绑定 -->
<!-- <bean id="binding1" class="org.springframework.amqp.core.Binding"> -->
<!-- <constructor-arg index="0" value="${routeKey4}"></constructor-arg> -->
<!-- <constructor-arg index="1" value="QUEUE"></constructor-arg> -->
<!-- <constructor-arg index="2" value="${exchange}"></constructor-arg> -->
<!-- <constructor-arg index="3" value="${routeKey4}"></constructor-arg> -->
<!-- <constructor-arg index="4" value="#{arguments}"></constructor-arg> -->
<!-- </bean> -->
<!-- 用于接收消息的处理类 -->
<bean id="rmqConsumer" class="com.sykj.util.RmqConsumer"></bean>
<bean id="messageListenerAdapter"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<!--可以指定哪个队列用哪个方法 -->
<property name="queueOrTagToMethodName">
<map>
<entry key="${routeKey}" value="rmqProducerMessage"></entry>
</map>
</property>
<property name="defaultListenerMethod" value="rmqProducerMessage"></property>
<property name="messageConverter" ref="serializerMessageConverter"></property>
</bean>
<!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个 -->
<bean id="listenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="queues">
<list>
<ref bean="queue"></ref>
<!-- 3、<ref bean="queue1"></ref> -->
</list>
</property>
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="messageListener" ref="messageListenerAdapter"></property>
</bean>
</beans>
RabbitMessage类
package com.sykj.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import lombok.Data;
@Data
public class RabbitMessage implements Serializable {
private static final long serialVersionUID = -6487839157908352120L;
private Class<?>[] paramTypes;// 参数类型
private String exchange;// 交换器
private Object[] params;
private String routeKey;// 路由key
private String methodName;
private String beanName;
public RabbitMessage() {
}
public RabbitMessage(String exchange, String routeKey, Object... params) {
this.params = params;
this.exchange = exchange;
this.routeKey = routeKey;
}
@SuppressWarnings("rawtypes")
public RabbitMessage(String exchange, String routeKey,String beanName, String methodName, Object... params) {
this.params = params;
this.exchange = exchange;
this.routeKey = routeKey;
this.methodName=methodName;
this.beanName=beanName;
int len = params.length;
Class[] clazzArray = new Class[len];
for (int i = 0; i < len; i++)
clazzArray[i] = params[i].getClass();
this.paramTypes = clazzArray;
}
public byte[] getSerialBytes() {
byte[] res = new byte[0];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(this);
oos.close();
res = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
}
RabbitMQ类
package com.sykj.util;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
public class RabbitMQ {
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${exchange}")
private String exchange;
@Value("${routeKey}")
private String routeKey;
public void pushMessageToMQ(String className, String methodName,Object... param) {
RabbitMessage msg=new RabbitMessage(exchange,routeKey,className, methodName, param);
try {
rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
} catch (Exception e) {
// TODO: handle exception
}
}
/**
* 同步
*/
public Object pushAndReceiveMessageToMQ(String className, String methodName,Object... param) {
RabbitMessage msg=new RabbitMessage(exchange,routeKey,className, methodName, param);
Object obj=null;
try {
obj= rabbitTemplate.convertSendAndReceive(msg.getExchange(), msg.getRouteKey(), msg);
} catch (Exception e) {
// TODO: handle exception
}
return obj;
}
}
RmqConsumer消费者类
package com.sykj.util;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
public class RmqConsumer {
private static Logger log=Logger.getLogger(RmqConsumer.class);
public static Object rmqProducerMessage(Object object) throws Exception{
RabbitMessage rabbitMessage=(RabbitMessage) object;
log.info("从队列里头取出:bean名-"+rabbitMessage.getBeanName()+",方法名:"+rabbitMessage.getMethodName());
Object o = SpringUtil.getBean(rabbitMessage.getBeanName());
Class clazz = o.getClass();
Method m=clazz.getDeclaredMethod(rabbitMessage.getMethodName(), rabbitMessage.getParamTypes());
Object invoke = m.invoke(o,rabbitMessage.getParams());
return invoke;
}
}
SpringUtil
package com.sykj.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext; // Spring应用上下文环境
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtil.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) applicationContext.getBean(name);
}
@SuppressWarnings("unchecked")
public static <T> T getBean(Class<?> clz) throws BeansException {
return (T) applicationContext.getBean(clz);
}
}
控制层UserControl
package com.sykj.control;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.sykj.entity.User;
import com.sykj.util.RabbitMQ;
@RestController
public class UserControl {
@Resource
private RabbitMQ rabbitMQ;
@RequestMapping("/getUser/{uid}")
public Object getUserSend(@PathVariable (value="uid") Integer uid){
//推送信息
String beanName="userService";
String methodName="getUserById";//调用的方法
//参数
//查询id是1和2的用户信息
List<Object> paramList = new ArrayList<>();
paramList.add(uid);
//异步调用,需要修改RmqConsumer类的方法改为无返回值,
//或者设置多个队列,该队列是异步调用,指定消费者的一个无返回值的方法即可
rabbitMQ.pushMessageToMQ(beanName, methodName, paramList);
return "已发送";
}
@RequestMapping("/getUserSyc/{uid}")
public Object getUserSyc(@PathVariable (value="uid") Integer uid){
//推送信息
String beanName="userService";
String methodName="getUserById";//调用的方法
//参数
//查询id是1和2的用户信息
List<Object> paramList = new ArrayList<>();
paramList.add(uid);
//同步调用
User u=(User)(rabbitMQ.pushAndReceiveMessageToMQ(beanName, methodName, paramList));
return u;
}
}