- 为什么需要消息队列?
比如我们系统中常见的邮件、短信发送,把这些不需要及时响应的功能写入队列,异步处理请求,减少响应时间。
- 如何实现?
- 为什么用Redis?
它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础。
- SpringBoot演示
- application.properties
#配置redis #在RedisProperties.class有redis的默认配置,默认host为localhost,默认端口为6379 spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.maxIdle=3 spring.redis.maxTotal=20 |
- RedisConfig.java
package com.niugang; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.niugang.mq.MessageDelegate; import com.niugang.mq.MessageDelegateImpl; import redis.clients.jedis.JedisPoolConfig; @Configuration @ConfigurationProperties(prefix = "spring.redis") public class RedisConfig { public String host; public int port; public int maxIdle; public int maxTotal; @Bean public JedisConnectionFactory jedisConnectionFactory() { JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(); jedisConnectionFactory.setHostName(host); jedisConnectionFactory.setPort(port); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxTotal(maxTotal); jedisConnectionFactory.setPoolConfig(jedisPoolConfig); return jedisConnectionFactory; } // 默认用的是用JdkSerializationRedisSerializer进行序列化的 @Bean @SuppressWarnings({ "rawtypes", "unchecked" }) public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); // 注入数据源 redisTemplate.setConnectionFactory(jedisConnectionFactory()); // 使用Jackson2JsonRedisSerialize 替换默认序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public StringRedisTemplate stringRedisTemplate() { return new StringRedisTemplate(jedisConnectionFactory()); } // ################发布订阅配置################################### @Bean public MessageDelegate messageDelegate() { return new MessageDelegateImpl(); } @Bean public MessageListenerAdapter messageListener() { MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageDelegate()); return messageListenerAdapter; } /** * RedisMessageListenerContainer充当消息侦听器容器;它用于接收来自Redis通道的消息, * 并驱动注入到其中的MessageListeners。侦听器容器负责消息接收的所有线程,并将消息分派到侦听器中进行处理。 * 消息侦听器容器是MDP(message-driven POJOs)和消息传递提供程序之间的中介 负责注册接收消息,资源获取和发布,异常。 * * 转换等 */ @Bean public RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer con = new RedisMessageListenerContainer(); con.setConnectionFactory(jedisConnectionFactory()); ChannelTopic channelTopic = new ChannelTopic("log_queue"); con.addMessageListener(messageListener(), channelTopic); return con; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public int getMaxTotal() { return maxTotal; } public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; } } |
- 日志注解
package com.niugang.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 定义记录日志注解 * @author niugang * */ @Retention(RetentionPolicy.RUNTIME) //注解可用在方法和类上 @Target({ElementType.METHOD, ElementType.TYPE}) public @interface LogApi { /** * 日志描述 */ String value() default ""; /** * 日志类型 * @return */ String type() default LogType.OPERATE_LOG; } |
- 日志注解解析类
package com.niugang.annotation; /** * 日志解析bean * @author niugang * */ public class LogParseBean { private String value; private String type; public String getValue() { return value; } public void setValue(String value) { this.value = value; } public String getType() { return type; } public void setType(String type) { this.type = type; } } |
- 日志类型常量类
package com.niugang.annotation; /** * 日志类型常量类 * @author niugang * */ public class LogType { public static final String OPERATE_LOG="redis:operate:log"; } |
- 消息委托接口(根据Spring-data-redis官方文档配置)
package com.niugang.mq; import java.io.Serializable; import java.util.Map; /** * 考虑下面的接口定义。 * 注意,尽管接口没有扩展MessageListener接口,仍然可以通过使用MessageListenerAdapter将其用作MDP * 类(容器中配置)。还要注意各种消息处理方法是如何根据 * * 它们可以接收和处理的各种消息类型的内容 * * @author niugang * */ public interface MessageDelegate { // 默认监听的方法就是handleMessage void handleMessage(String message); /* * void handleMessage(Map message); void handleMessage(byte[] message); void * handleMessage(Serializable message); // pass the channel/pattern as well * void handleMessage(Serializable message, String channel); */ } |
- 消息委托接口实现类(根据Spring-data-redis官方文档配置)
package com.niugang.mq; import java.io.Serializable; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson.JSON; import com.niugang.annotation.LogParseBean; public class MessageDelegateImpl implements MessageDelegate{ @Autowired private RedisTemplate<String, String> redisTemplate; @Override public void handleMessage(String message) { if(StringUtils.isNotBlank(message)){ System.out.println("message:"+message); //这里演示,存储到redis里面 LogParseBean parseObject = JSON.parseObject(message, LogParseBean.class); String type = parseObject.getType(); redisTemplate.opsForList().leftPush(type, parseObject.getValue()); //项目中可以存储到Elasticsearch等非关系型数据库 } } } |
- 日志注解AOP
package com.niugang.aop; import java.lang.reflect.Method; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.niugang.annotation.LogApi; @Aspect @Component public class LogAscept { @Autowired |
- Controller接口演示
/** * 跳转到登录页面 * * @param map * @return */ @LogApi(value="跳转到登录页面") @RequestMapping(value = "/login", method = RequestMethod.GET) public String toLogin(ModelMap map) { return "login"; } /** |
最后日志记录到Redis中