在本系列上篇http://maoyidao.iteye.com/blog/1636923 实现了基于google protobuf的序列化反序列化,现在看看怎么把他们组装到MINA的nio中。本篇主要描述怎么处理断包。
使用MINA的CumulativeProtocolDecoder是个好主意,先从MINA自己的sample开始。在这个例子中如果接受到的IoBuffer只包含一部分消息, decoders应该继续接收消息知道接收到完整的消息之后再进行解码。
List1
public class CrLfTerminatedCommandLineDecoder extends CumulativeProtocolDecoder { private Command parseCommand(IoBuffer in) { // Convert the bytes in the specified buffer to a // Command object. ... } protected boolean doDecode( IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // Remember the initial position. int start = in.position(); // Now find the first CRLF in the buffer. byte previous = 0; while (in.hasRemaining()) { byte current = in.get(); if (previous == '\r' && current == '\n') { // Remember the current position and limit. int position = in.position(); int limit = in.limit(); try { in.position(start); in.limit(position); // The bytes between in.position() and in.limit() // now contain a full CRLF terminated line. out.write(parseCommand(in.slice())); } finally { // Set the position to point right after the // detected line and set the limit to the old // one. in.position(position); in.limit(limit); } // Decoded one line; CumulativeProtocolDecoder will // call me again until I return false. So just // return true until there are no more lines in the // buffer. return true; } previous = current; } // Could not find CRLF in the buffer. Reset the initial // position to the one we recorded above. in.position(start); return false; } }
IoBuffer继承java.nio.ByteBuffer,MINA定义这个类有两个原因:
ByteBuffer没有提供get/putString这样方便的方法。
ByteBuffer只能分配固定大小的空间,IoBuffer通过setAutoExpand,expand等方法支持写入变长数据,例如:
String greeting = messageBundle.getMessage("hello"); IoBuffer buf = IoBuffer.allocate(16); // Turn on autoExpand (it is off by default) buf.setAutoExpand(true); CharsetEncoder utf8encoder = Charset.forName("UTF-8").newEncoder(); buf.putString(greeting, utf8encoder);
IoBuffer重新分配了底层的ByteBuffer,因为在这个例子里数据超过了16 byte。扩展的时候,ByteBuffer容量(capacity)增倍,limit则指向写入内容的最后一个位置。
List1代码展示的有关断包的逻辑是找到了下\r\n,即认为上一个包已经全部接收到。这是一中常见的断包逻辑,但maoyidao更喜欢使用指定包长度的方法,逻辑更清晰,代码更简单。结合http://maoyidao.iteye.com/blog/1636923 中序列化时加入的验证码,decode代码应该是这样的:
public class MaoyidaoDecoder extends CumulativeProtocolDecoder { protected boolean doDecode( IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { CodedInputStream cis = CodedInputStream.newInstance(getBytes(in, 10)); int pos1 = in.position(); int pos2 = in.limit(); int flag1 = cis.readRawVarint32(); int flag2 = cis.readRawVarint32(); if(flag1 != 3 || flag2 != 7){ // 校验码不对,丢弃这个包 in.position(pos2); return true; }else{ // 校验码无问题,重设postion,读下面的内容 in.position(pos1 + 2); } int contentLength = CodedInputStream.newInstance(getBytes(in, 5)).readRawVarint32(); int contentLength0 = contentLength + CodedOutputStream.computeRawVarint32Size(contentLength); // 是否有足够的数据 if(in.remaining() >= contentLength0){ try { Maoyidao.MaoyidaoPacket.Builder builder = Maoyidao.MaoyidaoPacket.newBuilder(); CodedInputStream.newInstance(getBytes(in, protocolLength)).readMessage(builder, ExtensionRegistryLite.getEmptyRegistry()); out.write(builder.build()); in.position(in.position() + protocolLength); return true; } catch (Exception e) { } } // 没有足够的数据,返回false,IoBuffer退回到最初的位置 else { in.position(pos1); return false; } } }
通过这个decodefilter,向上层的outputstream流中写入了rpc对象protobuf序列化后的字符流,这样后续处理只需根据protobuf对象,即可。
本系列(一)(二)2篇文章介绍了基于protobuf/MINA构建JAVA RPC的主要代码逻辑。
本文系maoyidao原创,转载请引用原链接: