在上一篇博文中,我们启动了门户Facade的基于NIO技术的服务器,可以监听到外部系统发送过来的REST类型请求,门户Facade在收到请求后,会将其转化为系统消息,并发送到消息总线Plato中,这样对这个消息感举趣的微服务控件器Caesar就可以得到消息,从而完成所需的业务逻辑,并将最终处理结果发送到消息总线Plato上,最后门户Facade从消息总线上获取消息,然后转化为HTTP响应,发送给外部调用者。在本篇博文中,我们将向大家讲解系统消息生成过程的代码实现。
门户Facade在接收到外部调用者的请求内容后,会调用消息引擎生成系统消息,代码如下所示:
private void readRequest(SelectionKey key, Selector selector) { ...... String[] urls = null; String msgStr = ImsaMsgEngine.createMsg(AppConsts.MT_HTTP_GET_REQ, AppConsts.MT_MSG_V1, req.toString(), null); ...... } catch (IOException e) { } }为了清晰起见,我们删除了无关的代码。下面我们来看消息引擎的代码实现,如下所示:
/** * 用于生成和解析系统消息,消息的描述信息在msg_data保存的Json字符串中,如果该消息中具有二进制数据,可以保存在msg_urls的链接中 * @author 闫涛 2018.01.24 * */ public class ImsaMsgEngine { public static String createMsg(int msgTypeId, int msgVersion, String req, String[] urls) { JSONObject msgObj = new JSONObject(); long msgId = JedisEngine.getKeyUniqueId(AppConsts.MSG_ID); msgObj.put(AppConsts.MSG_ID, msgId); msgObj.put(AppConsts.MSG_TYPE, 102); msgObj.put(AppConsts.MSG_VERSION, 1); msgObj.put(AppConsts.MSG_DATA, req); JSONArray msgUrls = new JSONArray(); if (urls != null) { for (String url : urls) { msgUrls.put(url); } } msgObj.put(AppConsts.MSG_URLS, msgUrls); return msgObj.toString(); } }方法的参数为:msgTypeId在AppConsts中进行定义,每种类型消息对应一到多个微服务;msgVersion为消息的版本,微服务组件根据消息版本调用对应版本的业务逻辑实现,系统可以实现多个版本同时共存并且互不影响;req为消息的具体内容;如果请求中有二进制对象,则将二进制对象的URL保存在urls字符数组中。这个函数的主要作用就是将这些信息生成一个JSON对象,并将其变为JSON字符串并返回。
在系统中,每个消息将有一个系统唯一标识,我们将其以redis上的msg_id键值进行保存,每次从Redis中取出该值,作为本消息的唯一标识,同时将该值加1,重新存加redis的msg_id键值中。
下面我们来看通过Jedis库来操作Redis服务器。
首先我们来看Jedis连接池的设置,如下所示:
/** * 生成Redis连接池对象 * @author 闫涛 2018.01.24 v0.0.1 */ private static void createJedisPool() { // 建立连接池配置参数 JedisPoolConfig config = new JedisPoolConfig(); // 设置最大连接数 config.setMaxTotal(MAX_TOTAL); // 设置最大阻塞时间,记住是毫秒数milliseconds config.setMaxWaitMillis(MAX_WAIT_MILLIS); // 设置空间连接 config.setMaxIdle(MAX_IDLE); // 创建连接池 pool = new JedisPool(config, HOST, PORT); } /** * 在调用本类方法之前,先调用本方法,确保已经创建了Redis连接池 * @author 闫涛 2018.01.24 v0.0.1 */ private static void initJedisEngine() { if (null == pool) { createJedisPool(); } }我们在使用Redis时,首先从Jedis连接池中获取实例,使用完毕后向连接池归还该实例。注意在Jedis旧版本中,归还实例采用returnResource方法,而在新版本中,该方法已经过期了,可以直接使用jedis.close方法来归还实例。代码如下所示:
/** * 从Redis连接池中获取一个连接,需要在用完后调用releaseRedis将其返回到连接池中 * @return * @author 闫涛 2018.01.24 v0.0.1 */ private static Jedis getJedis() { if (null == pool) { initJedisEngine(); } return pool.getResource(); } /** * 将之前获取的jedis对象归还到连接池中 * @param jedis * @author 闫涛 2018.01.24 v0.0.1 */ private static void releaseJedis(Jedis jedis) { if (jedis != null) { jedis.close(); } }下面我们来看获取键值和设置键值的方法:
/** * 从Redis中获取指定Key所对应的值,该值可能为空 * @param key * @return 可能为空的字符串 * @author 闫涛 2018.01.24 v0.0.1 */ private static Optional<String> get(String key) { Jedis jedis = getJedis(); if (null == jedis) { return Optional.empty(); } return Optional.ofNullable(jedis.get(key)); } /** * 设置Redis中指定Key的值,需要由调用者保证Key和Val不为空 * @param key * @param val * @return 设置成功还是失败 * @author 闫涛 2018.01.24 v0.0.1 */ private static boolean set(String key, String val) { Jedis jedis = getJedis(); if (null == jedis) { return false; } jedis.set(key, val); return true; }在这里,我们在获取键值时,该值可能为空,如果处理不好的话会产生烦人的空指针异常。这里采用了Java8中新引进的Optional工具类,向调用者表明,我们的返回值可能为空,要求其必须处理这种情况。而在设置键值时,键值也有可能为空,而这里我们却不用Optional工具类,因为这时需要让调用者保证值不为空,在Java8中,Optional主要用于方法返回值。
有了上述准备工作,我们就可以开始看消息唯一标识的生成逻辑了。代码如下所示:
/** * 取出指定key的当前长整型唯一标识数字,并将该数字加1再存回原来的key中,类似Mysql中的自增字段功能 * @param key * @return 长整型唯一标识数字 * @author 闫涛 2018.01.24 v0.0.1 */ public static long getKeyUniqueId(String key) { Optional<String> val = get("msg_id"); long uniqueId = Long.parseLong(val.orElseGet(() -> createUniqueIdKey(key))); set(key, "" + (uniqueId + 1)); return uniqueId; } /** * 如果key在redis中不存在,则创建该key,并将初始化值设置为1 * @param key * @return * @author 闫涛 2018.01.24 v0.0.1 */ public static String createUniqueIdKey(String key) { System.out.println("key=" + key + "!"); set(key, UNIQUE_ID_INIT_VAL); return UNIQUE_ID_INIT_VAL; }其中private static String UNIQUE_ID_INIT_VAL = "1"; // 唯一标识数字的初始值
这里重点讲述一个Java8新特性Optional的使用方法。当调用get方法从Redis中获取到msg_id的键值时val时,val的值可能为空,对Java8不熟悉的同学可能会写出这样的代码:
if (val.isPresent()) { // 做存在的操作 } else { // 处理不存在情况 }
如果这样处理,除了语法上与之前略有不同之外,其实和原来的方法没有任何区别。在Java8使用中,我们更推荐使用orElseGet方法,该方法的参数为一个函数式接口(只有一个抽象方法的接口),其参数为空返回值为字符串。在上面的代码中,就是用Lambda表达式作为实参传入orElseGet方法,用以处理Redis中不存在msg_id的情况。
由外部请求生成系统消息的代码就是这些,在下一节中,我们将讨论消息总线Plato的启动过程,以及怎样向消息总线Plato上发送消息,以及消息总线Plato怎样高效管理这些消息。
具体代码请参考Github上项目:https://github.com/yt7589/imsa 。如果大家觉得项目有意义,对大家有帮助,请勇敢地给项目点赞,谢谢!