众多周知,在进行网络通信的时候都需要编解码,而RocketMQ依托Netty框架进行网络交互,只需要在上行和下行通道处理类中进行扩展处理即可完成编解码的工作。
// 编码,下行通道往网络通道中写数据
class NettyEncoder extends MessageToByteEncoder<RemotingCommand>
class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
// 解码,上行通道从网络通道中读数据
class NettyDecoder extends LengthFieldBasedFrameDecoder
class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
创建请求
这里以broker启动后往注册中心注册自身信息为例
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
创建request请求,设置请求编码以及版本号
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
编码NettyEncoder
编码完成后往ByteBuf中写入请求头以及body信息。
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
对请求头编码,计算总共需要的长度length,它主要包括序列化方式他们所在的4个字节长度,headerData的长度,bodyLength的长度。消息头信息ByteBuffer的长度计算是:4代表需要来存储总长度length值所占用的,之所以设置这个总长度值是为了解决TCP拆包、粘包问题情况,在加上length代表的那三个长度,但是这个时候不需要存储body,所以把bodyLength减去。
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
序列化方式和headerData长度的值共同组成了4个字节,第一个字节是序列化方式,因为headerData长度的值正常情况下不可能超过三个字节也就是16777216。
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
请求头编码需要把自定义类的属性都打包到extFields属性中,最后序列化成对应的byte数组形式,最后组成新的ByteBuf编码完成。
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
解码NettyDecoder
解码时先设置最大长度16777216,因为正常来说不可能有这么大的消息,有的话也是非法操作,截断前四个字节代表请求数据总长度的值所占用的数据空间,因为这个时候已经解决了TCP拆包、粘包问题,而且现在ByteBuffer的大小也正好是剩下的数据的大小。
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
获取剩余总长度,主要由序列化他们所在的4个字节,bodyData的长度,headerData的长度组成。
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
先获取前四个字节解析出headerLength以及序列化方式,对应第一个字节是序列化方式,后三个字节代表长度,使用位运算计算即可,最后读取各个数据解码完成。
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}