尽管各类标准的系统间交互组件已经非常流行,但采用自定义报文、基于原生socket进行系统间数据交互的模式依然存在。
原生socket在做一般的测试性开发时,确实能简便的达成目标。但在做企业应用时,若是在报文交互的需求上,再增加关于传输状态、处理策略等实际需求时,则显得吃力。
mina作为高性能开源网格框架,其功能相当丰富。尽管本文的短连接需求从实现上来看,使用mina作为核心,显得大材小用,但是,从实际应用上,能很好的满足业务需要及便捷的使用(本文很大意义上是马上造了一个简单的轮子)。
为了不陷入细节,后文将首先介绍可使用本文组件的场景、以及如何使用,然后再对组件的内部实现进行介绍,最后提供一个组件的下载地址。
一、本文假定的需求场景
1、两个系统间使用自定义报文进行短连接同步交互。即:一方为服务方,一直处于监听状态,另外一方作为客户端连接后,进行数据发送,当收到回应或者设定的超时时间到达后,断开与服务方的连接;
*2、假设自定义报文的格式为:8字节长度+对象的json字符串(UTF-8编码),其中“8字节长度”的值不包含这8个字节,其值为包含“对象的json字符串”经过UTF-8编码之后得到的长度值,8字节长度的编码为long的高节节在前。
*:关于第2点数据报文的定义在后文的介绍中,主要是影响工具类MessageUtils以及消息解码方法的实现。若采用其他的报文格式,则需要调整MessageUtils中相关方法的实现,另外就是关于长度,需要调整MessageDecoder的doDecode解码方法的实现
二、使用本文的组件准备相关
本文基于的mina版本是2.0.17,jdk为1.7。
在使用本文提供的组件时,对maven项目来说,需要做如下依赖:
<dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency>
对非maven项目,则需要在你设定的lib里包含以上的jar包。
三、如何使用
本文组件的目标是在mina的基础上,将server、client的初始化实现的尽量简单且贴近实际,因此,一般情况下,你仅需构造一个MessageServer、若干个MessageShortClient对象(可在短连接的业务实现处即时构造),然后对server、client对象各自实现一个业务层的MessageProcessor对象并进行关联,对业务层的MessageProcessor,可选择性的实现其方法。
以下服务端测试代码、客务端测试代码是对以上使用方法的直观解释。
1、服务端测试代码:
package com.bn.zbase; import com.alibaba.fastjson.JSON; import com.bn.zbase.message.*; import com.bn.zbase.message.biz.MessageProcessor; import com.bn.zbase.message.biz.DefaultServerMessageProcessor; import com.bn.zbase.message.util.MessageUtils; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Created by zcn on 2018/4/20. */ public class BizServerTests { private static final Logger log = LoggerFactory.getLogger(BizServerTests.class); public static void main(String[] args) { MessageProcessor messageProcessor = new DefaultServerMessageProcessor() { @Override public void processReceivedMessage(IoSession session, String strJsonMessage) { log.info("接收到客户端消息:" + strJsonMessage); Object objRet = JSON.parse(strJsonMessage); session.write(MessageUtils.encodeMessageObj2IoBuffer(objRet)); } }; MessageServer messageServer = new MessageServer("localhost", 8989); messageServer.setMessageProcessor(messageProcessor); messageServer.start(); try { Thread.sleep(30 * 10000); } catch (InterruptedException e) { e.printStackTrace(); } messageServer.shutdown(); } }
2、客户端代码:
package com.bn.zbase; import com.alibaba.fastjson.JSON; import com.bn.zbase.message.biz.MessageProcessor; import com.bn.zbase.message.biz.DefaultClientMessageProcessor; import com.bn.zbase.message.MessageShortClient; import com.bn.zbase.message.util.MessageSendResult; import org.apache.mina.core.session.IoSession; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Created by zcn on 2018/4/23. */ public class BizClientTests { private static final Logger log = LoggerFactory.getLogger(BizClientTests.class); @Test public void testSendObj() throws IOException { DiagramEntity objMessage = new DiagramEntity(); objMessage.setName("张三三"); objMessage.setCode("X001"); MessageProcessor clientProcessor = new DefaultClientMessageProcessor() { @Override public void processReceivedMessage(IoSession session, String strJsonMessage) { DiagramEntity objRet = JSON.parseObject(strJsonMessage, DiagramEntity.class); log.info("收到服务器反馈:" + JSON.toJSONString(objRet)); } @Override public void complete(MessageSendResult sendResult, Object objOriginalMessage) { log.info("操作完成状态:" + sendResult.getDesc()); } }; for (int i = 0; i < 1000; i++) { MessageShortClient messageShortClient = new MessageShortClient("localhost", 8989, 10); messageShortClient.setMessageProcessor(clientProcessor); messageShortClient.sendMessage(objMessage); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } class DiagramEntity { private String name; private String code; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } }
四、实现细节
1、结构
上图为本组件的实现类图:
1.1 在mina派生上,按照其约定,扩展了IoHandler,将其分为ClientMessageHandler、ServerMessageHandler,当然,其实也没做什么特别的工作,主要是在事件处理方法内,添加了业务可能使用的几个探点(探点的实现是MessageProcessor);
1.2 还是mina派生,对编解码器MessageCodecFactory以及具体的编、解码单元(MessageEncoder、MessageDecoder)进行了扩展,这部分也是标准处理(为适应自定义的报文格式),参考了网友的文章,在组件内部的代码内也做了说明;
1.3 报文数据转换工具类MessageUtils。本类提供了适用于假定报文格式的常用数据转换方法,其使用贯穿于整个组件,你可以随时使用其进行数据转换;
1.4 业务层报文数据处理接口MessageProcessor以及其实现类。mina的IoHandler已经对常用的会话、消息事件等做了抽象,然而我们在这里仍做了一个自己的实现:一方面是由于作者想提供一个相对简单明了的业务层处理派生体系;另外一方面,当面对连接参数错误等情况时,一时没有找到IoHandler如何实现判断的方法(当然,之后会更深入的查看mina)。
MessageProcessor接口的实现是你在实际业务应用时,应该必需要做的,否则按照默认的体系,你除了很方便的创建出server、client对象,将干不了任何业务相关的事儿……
1.5 枚举MessageSendResult。定义了几个常用的发送结果状态,当你在实现MessageProcessor.complete方法进行发送完成后的处理时,可以获取到某种状态,这些状态当前有以下:
OK("会话正常完成", 0), CONNECT_ERROR("连接错误", 1), CONNECT_PARAERROR("连接参数错误", 2), CONNECT_NORESPONSE("未收到服务器应答", 3);
1.6 最后,也是必需要使用到的两个类:MessageServer、MessageShortClient。
对MessageServer,如“三、如何使用”节中的“服务端测试代码”内示例,你通常仅需要使用带参的构造函数(MessageServer(hostname, port))来构造一个server对象,然后设置好其业务消息处理对象MessageProcessor,最后start就完成了(程序结束的时候你应该再使用shutdown关掉服务器)。当然在,spring中集成时,并不需要shutdown。
对MessageClient,当你使用带参构造函数MessageClient(destHostname,destPort)时,默认的空闲超时时间是10秒,即当你发送完消息(sendMessage)后,最多10秒钟,短连接结束,关于连接完成的发送结果状态,你应该使用MessageProcessor的complete完成处理。
四、与spring集成
尽管我们当前使用的spring版本已经基本都是支持注解构造bean了,这里我仍然使用xml配置的方式对组件在spring中的集成做说明。其实是非常简单的,仅需要集成MessageServer:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <bean id="defaultMinaServerMessageProcessor" class="com.bn.zbase.message.biz.DefaultServerMessageProcessor"></bean> <bean id="minaMessageServer" class="com.bn.zbase.message.MessageServer"> <property name="hostname" value="127.0.0.1"></property> <property name="port" value="8989"></property> <property name="messageProcessor" ref="defaultMinaServerMessageProcessor"></property> </bean> </beans>
五、与spring boot集成
spring boot的方式当然是通吃了,这里使用注解配置实现:
package com.bn.zbase.config; import com.alibaba.fastjson.JSON; import com.bn.zbase.message.MessageServer; import com.bn.zbase.message.biz.MessageProcessor; import com.bn.zbase.message.biz.DefaultServerMessageProcessor; import com.bn.zbase.message.terminal.AbstractMessageTerminal; import com.bn.zbase.message.util.MessageUtils; import org.apache.mina.core.session.IoSession; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by zcn on 2018/4/24. */ @Configuration public class MinaMessageConfiguration { @Bean(name="messageServer", initMethod = "start") public AbstractMessageTerminal messageServer() { MessageProcessor messageProcessor = new DefaultServerMessageProcessor() { @Override public void processReceivedMessage(IoSession session, String strJsonMessage) { System.out.println("收到客户端信息:" + strJsonMessage); Object objRet = JSON.parse(strJsonMessage); session.write(MessageUtils.encodeMessageObj2IoBuffer(objRet)); } }; MessageServer obj = new MessageServer("localhost", 8989); obj.setMessageProcessor(messageProcessor); return obj; } }
六、其他问题
1、MessageShortClient类并非是线程安全的,事实上,也没有必要,因为本来就是用完即焚;
2、开源问题,在最后的组件下载中,当前提供的是一个源码类型的jar包,并没有提供本组件的实现工程,原因是本组件是基于当前一个未完成的开源框架内实现的(子功能模块),在未来完成开源框架后,将会提供整个项目。
七、组件下载
见: https://download.csdn.net/download/smartcore/10374257