高效编解码有很多方式:json、protobuf、msgpack、hession1、hession2、XStream、默认序列化等
protobuf表现突出(码流小、响应时间低)跨语言时应用广泛。关于protobuf后续再介绍。
本章主要介绍使用MsgPack完成编解码,在Netty中使用,达到可以传输的效果。
这里要注意MsgPack反序列化的两种方式:下面图是官网粘的,Template可以换成被@Message标记的JavaBean类
这里其实还隐藏一个问题,List的自定义对象 msgPack通过上面的方式好像无法做到反序列化。只找到两个方式
:https://github.com/msgpack/msgpack-java/blob/develop/msgpack-jackson/README.md
这种是通过jackson来做到的,总感觉很low
第二种:
1、MsgPackDecoder 解码器
package com.back.codec;
import java.util.List;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final byte[] array;
int length = msg.readableBytes();
array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
MessagePack msgPack = new MessagePack();
out.add(msgPack.read(array));
}
}
2、MsgPackEncoder 编码器
package com.back.codec;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 编码器
* @author Administrator
*
*/
public class MsgPackEncoder extends MessageToByteEncoder<Object>{
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MessagePack msgPack = new MessagePack();
byte[] raw = msgPack.write(msg);
out.writeBytes(raw);
}
}
3、RouteHandler 这里使用了我们自定义的编解码器 及两个用于处理TCP粘包/拆包的处理器
package com.back.server;
import com.back.codec.MsgPackDecoder;
import com.back.codec.MsgPackEncoder;
import com.back.constant.NettyConstant;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 设置请求 要经过哪些处理的handler
* 公共的handler逻辑等 可以在此处添加 如:记录日志、消息编解码等
* LineBasedFrameDecoder+StringDecoder 来解决TCP粘包拆包问题原理
* 前者识别换行符,同时设置单行最大字节 所以这两个组合就是按行切换的文本解码器
* DelimiterBasedFrameDecoder+StringDecoder 则是通过制定分隔符(同时设置最大字节)来区分 每次消息末尾都要加指定分隔符
* FixedLengthFrameDecoder+StringDecoder 则代表不管怎样 每次读取指定长度 字节的包 不在演示
*
* LengthFieldPrepender会在消息头处以两个字节来标识消息长度
* LengthFieldBasedFrameDecoder 按上面的标识和标识的长度来解码消息
* @author back
*
*/
public class RouteHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
//ByteBuf delimiter = Unpooled.copiedBuffer(NettyConstant.DELIMITER.getBytes());
//arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
arg0.pipeline().addLast("msgpack decoder", new MsgPackDecoder());
arg0.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
arg0.pipeline().addLast("msgpack encoder", new MsgPackEncoder());
arg0.pipeline().addLast(new MessageHandler());
}
}
4、MessageHandler
package com.back.server;
import org.msgpack.type.MapValue;
import org.msgpack.type.Value;
import org.msgpack.unpacker.Converter;
import com.back.test.UserInfo;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 真实处理信息的handler
* @author Administrator
*
*/
public class MessageHandler extends ChannelHandlerAdapter {
private RequestHandler handler = RequestHandler.getRequestHandler();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//String message = (String)msg;
//System.out.println("请求报文 : " + message);
//String dilimiter = System.getProperty("line.separator");
//String resp = handler.simpleHandlerRequest(message)+NettyConstant.DELIMITER;
//ByteBuf copiedBuffer = Unpooled.copiedBuffer(resp.getBytes());
//ctx.writeAndFlush(copiedBuffer);
Value req = (Value)msg;
UserInfo read = new Converter(req).read(UserInfo.class);
System.out.println("接收报文:" + read);
UserInfo resp = handler.handlerUser(read);
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
5、NettyClient && ClientRouteHandler
package com.back.client;
import com.back.codec.MsgPackDecoder;
import com.back.codec.MsgPackEncoder;
import com.back.constant.NettyConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class SimpleNettyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.handler(new ClientRouteHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();
System.out.println("client start!");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println(e.getCause());
}finally{
group.shutdownGracefully();
}
}
}
class ClientRouteHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
//ByteBuf delimiter = Unpooled.copiedBuffer(NettyConstant.DELIMITER.getBytes());
//ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder", new MsgPackDecoder());
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder", new MsgPackEncoder());
ch.pipeline().addLast(new ClientMessageHandler());
}
}
6、ClientMessageHandler
package com.back.client;
import org.msgpack.type.Value;
import org.msgpack.unpacker.Converter;
import com.back.constant.NettyConstant;
import com.back.test.UserInfo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ClientMessageHandler extends ChannelHandlerAdapter {
//byte[] req = ("time"+System.getProperty("line.separator")).getBytes();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
// byte[] req = ("time"+NettyConstant.DELIMITER).getBytes();
// ByteBuf buf = Unpooled.buffer(req.length);
// buf.writeBytes(req);
// ctx.writeAndFlush(buf);
UserInfo user = new UserInfo(i, "back"+i, "1234", "[email protected]");
ctx.write(user);
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Value req = (Value)msg;
UserInfo response = new Converter(req).read(UserInfo.class);
System.out.println("接收到报文:"+response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}