因为某些原因,需要将flask搭建的项目用spring boot重构一遍,其中有一个聊天室的功能,在flask下我采用了flask-sse这个第三方库来实现,该模块采用基于redis的消息订阅系统实现,当然类spring boot下自然没有这个方便的库了,但是spring boot对redis的消息机制的支持还是不错的,所以在看了下相关文档后我觉得可以自己实现一个,接下来就是思路和具体步骤了
参考文档: http://spring.io/guides/gs/messaging-redis/
废话不多说,上代码
配置类
@Configuration public class RedisConf { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver() { return new Receiver(); } @Bean("channelMap") Map<String, List<SseEmitter>> channelMap(){ return new ConcurrentHashMap<String, List<SseEmitter>>(); } }
该类注入了一些必要的实例,顺便说一句,默认情况下,spring的注入是单例模式。
接下来是receiver类,该类接收到redis发送的消息,并且发布给相应的频道中的所有人,该类与上面的配置中注入
public class Receiver { @Autowired private Map<String, List<SseEmitter>> channelMap; public void receiveMessage(String message){ System.out.println("sending message..."); Map map = (Map) JSON.parse(message); String channel = (String) map.get("channel"); Map data = (Map) map.get("data"); String event = (String) map.get("event"); if (!channelMap.containsKey(channel)){ return; } List<SseEmitter> sseEmitters = channelMap.get(channel); Iterator<SseEmitter> it = sseEmitters.listIterator(); while (it.hasNext()){ SseEmitter x = it.next(); try { x.send(SseEmitter.event().data(data).name(event)); } catch (IOException e) { it.remove(); } } } }
其实try/catch部分是为了处理有人突然离线的异常。否则有人突然离线,但是List集合中依然保存着它的sse连接,导致send方法异常,故而出现异常则直接删除导致异常的sse实例
接下来是一个业务类
@Service public class ChannelService { @Autowired private Map<String, List<SseEmitter>> channelMap; @Autowired private RedisMessageListenerContainer container; @Autowired private MessageListenerAdapter listenerAdapter; public void createChannel(String channelName, SseEmitter sseEmitter) throws PPKTException { if (channelMap.containsKey(channelName)){ throw new PPKTException(new PPKTError(400, "频道已存在")); } // 添加reids订阅频道 container.addMessageListener(listenerAdapter, new PatternTopic(channelName)); List<SseEmitter> sseEmitters = new ArrayList<SseEmitter>(); sseEmitters.add(sseEmitter); channelMap.put(channelName, sseEmitters); } public void addInChannel(String channelName, SseEmitter sseEmitter) throws PPKTException { if (!channelMap.containsKey(channelName)){ throw new PPKTException(new PPKTError(400, "频道不存在")); } List<SseEmitter> sseEmitters = channelMap.get(channelName); sseEmitters.add(sseEmitter); channelMap.put(channelName, sseEmitters); } public void delChannel(String channelName){ channelMap.remove(channelName); } public boolean contains(String channelName){ return channelMap.containsKey(channelName); } }该业务类则是封装了一些添加频道,添加人员进入频道的操作,其实该类可写可不写。。。
接下来是两个控制类,一个是加入频道的控制类,一个是聊天的控制类
加入频道控制类
@Controller public class ChannelController { @Autowired private HttpServletRequest httpServletRequest; @Autowired private ChannelService channelService; @Autowired private HttpServletResponse httpServletResponse; @RequestMapping(value = "/stream", method = RequestMethod.GET) public SseEmitter handler(String channel) throws PPKTException, AVException { SseEmitter sseEmitter = new SseEmitter(0L); if (channelService.contains(channel)){ channelService.addInChannel(channel, sseEmitter); }else { channelService.createChannel(channel, sseEmitter); } return sseEmitter; } }该类实现构造了一个SseEmitter实例,并将其加入指定频道,同时将SseEmitter实例返回给客户端
聊天控制类
@RestController @RequestMapping("/api/message") public class ChatController { @Autowired private StringRedisTemplate template; @Autowired private HttpServletRequest httpServletRequest; @RequestMapping(value = "/", method = RequestMethod.POST) public Map sendMessage(@RequestBody Map map, @CookieValue String channel, @CookieValue String sessionToken) throws AVException, PPKTException { String message = (String) map.get("message"); AVQuery<AVUser> query = AVUser.getQuery(); query.whereEqualTo("sessionToken", sessionToken); List<AVUser> users = query.find(); if (users.size() == 0){ throw new PPKTException(new PPKTError(403, "用户未登录")); } String sendMessage = createJsonEvent( message, channel, users.get(0).getUsername(), "chat"); template.convertAndSend(channel, sendMessage); Map m1 = new HashMap(); m1.put("message", "OK"); return m1; } private String createJsonEvent(String message, String channel, String username, String event) { Map data = new HashMap(); data.put("message", message); data.put("sender", username); Map m = new HashMap(); m.put("channel", channel); m.put("data", data); m.put("event", event); return JSON.toJSONString(m); } }
该类则是从客户端获取用户要发送的信息,然后使用StringRedisTemplate进行推送,经过推送的消息自然会被recevier类接受到,接下来就不用说了,事件部分我是用json来表示的。用的是FastJson库。
其实主要就是用了redis自带的消息订阅机制,用rabbitmq也可,详情请参考: http://spring.io/guides/gs/messaging-rabbitmq/