数据通讯格式封装协议 Protocol Buffer
常用序列化方案比较 参考
https://www.cnblogs.com/johnny666888/p/12841735.html
Protocol Buffer
Protocol Buffer还有一个非常重要的优点就是可以保证同一消息报文新旧版本之间的兼容性
protobuf协议核心思想
基于128bits的数值存储方式(Base 128 Varints)
数据表示方式:每块数据由接连的若干个字节表示(小的数据用1个字节就可以表示),每个字节最高位标识本块数据是否结束(1:未结束,0:结束),低7位表示数据内容。(可以看出数据封包后体积至少增大14.2%)
数字1的表示方法为:0000 0001,这个容易理解
数字300的表示方法为:1010 1100 0000 0010
protobuf字节序是小端字节序,所以这个数字实际是0000 0010 1010 1100
1010 1100 0000 0010 → 010 1100 000 0010 如下: 000 0010 010 1100 → 000 0010 ++ 010 1100 → 10 0101100 → 256 + 32 + 8 + 4 = 300 |
基于序号的协议字段映射(类似key-value结构)
所以字段可以乱序,可缺段(记optional)
message person{ required string name = 1; required string country = 2; optional int32 age = 3; } |
效果相当于json数据:person= [{1: "john"}, {2: "USA"}, {3: 30}],其中{3: 30} 还可以不传,person还可以传成 [{2: "USA"}, {1: "john"}],对端仍旧可以正常解析。
基于无符号数的带符号数表示(ZigZag 编码)
原始的带符号数 | ZigZag编码后的表示 |
---|---|
0 | 0 |
-1 | 1 |
1 | 2 |
-2 | 3 |
2147483647 | 4294967294 |
-2147483648 | 4294967295 |
使用 zigzag 编码,充分利用基于128bits的数值存储(Base 128 Varints)的 技术,只需要加多1个位来表示符号。当绝对值小的数字非常有利,这种方式可以有效减少协议内容长度。
sint32类型编码如下: (n << 1) ^ (n >> 31) sint64类型编码如下: (n << 1) ^ (n >> 63) |
协议数据结构
protobuf怎么在一长串二进制中表示若干个数据?
做法就是每块数据前加一个数据头,表示数据类型及协议字段序号。
msg1_head + msg1 + msg2_head + msg2 + ...
数据头也是基于128bits的数值存储方式,一般1个字节就可以表示:
message Test1 { required int32 a =1; } 如上创建了 Test1 的结构并且把 a 设为 2,序列化好的二进制数据为: 0 000 1000 0 000 0010 |
以上数据转成十六进制也就是 08 02,其中 8 是怎么得到的?
000 1 000 低3位表示数据类型:0,其他表示协议字段序号:1,加上最高位0, 结果就是8 |
数据类型的表示如下:
类型 | 含义 | 用于哪些数据类型 |
---|---|---|
0 | Varint | int32, int64, uint32, uint64, sint32, sint64, bool, enum |
1 | 64-bit | fixed64, sfixed64, double |
2 | Length-delimited | string, bytes, embedded messages, packed repeated fields |
3 | Start group | groups (deprecated) |
4 | End group | groups (deprecated) |
5 | 32-bit | fixed32, sfixed32, float |
TeamTalk proto
pb目录下 定义了消息
基础数据结构的定义在IM.BaseDefine.proto中。
见 IM doc文件下协议说明
消息数据流;
如获取部门列表
buildProtoMsg ====>
//封装数据 SID CID
IMBuddy.IMDepartmentReq imDepartmentReq = IMBuddy.IMDepartmentReq.newBuilder()
.setUserId(userId)
.setLatestUpdateTime(lastUpdateTime).build();
int sid = IMBaseDefine.ServiceID.SID_BUDDY_LIST_VALUE;
int cid = IMBaseDefine.BuddyListCmdID.CID_BUDDY_LIST_DEPARTMENT_REQUEST_VALUE;
//组装包头 header
com.mogujie.tt.protobuf.base.Header header = new DefaultHeader(sid, cid);
int bodySize = requset.getSerializedSize();
header.setLength(SysConstant.PROTOCOL_HEADER_LENGTH + bodySize);
seqNo = header.getSeqnum();
listenerQueue.push(seqNo,packetlistener);
boolean sendRes = msgServerThread.sendRequest(requset,header);
//写入buffer
DataBuffer headerBuffer = header.encode();
DataBuffer bodyBuffer = new DataBuffer();
int bodySize = requset.getSerializedSize();
bodyBuffer.writeBytes(requset.toByteArray());
DataBuffer buffer = new DataBuffer(SysConstant.PROTOCOL_HEADER_LENGTH + bodySize);
buffer.writeDataBuffer(headerBuffer);
buffer.writeDataBuffer(bodyBuffer);
if (null != buffer && null != channelFuture.getChannel()) {
Channel currentChannel = channelFuture.getChannel();
boolean isW = currentChannel.isWritable();
boolean isC = currentChannel.isConnected();
if(!(isW && isC)){
throw new RuntimeException("#sendRequest#channel is close!");
}
//socket 发送
channelFuture.getChannel().write(buffer.getOrignalBuffer());
IM-server接收端
MsgConn.cpp
_HandleClientDepartmentRequest 处理
void CMsgConn::HandlePdu(CImPdu* pPdu)
{
// request authorization check
if (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {
log("HandlePdu, wrong msg. ");
throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");
return;
}
switch (pPdu->GetCommandId()) {
case CID_OTHER_HEARTBEAT:
_HandleHeartBeat(pPdu);
break;
//....
case CID_BUDDY_LIST_USERS_STATUS_REQUEST:
_HandleClientUsersStatusRequest(pPdu);
break;
case CID_BUDDY_LIST_DEPARTMENT_REQUEST:
_HandleClientDepartmentRequest(pPdu);
break;
// for group process
case CID_GROUP_NORMAL_LIST_REQUEST:
s_group_chat->HandleClientGroupNormalRequest(pPdu, this);
break;
case CID_GROUP_INFO_REQUEST:
s_group_chat->HandleClientGroupInfoRequest(pPdu, this);
break;
case CID_GROUP_CREATE_REQUEST:
s_group_chat->HandleClientGroupCreateRequest(pPdu, this);
break;
case CID_GROUP_CHANGE_MEMBER_REQUEST:
s_group_chat->HandleClientGroupChangeMemberRequest(pPdu, this);
break;
//....
default:
log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());
break;
}
}
void CMsgConn::_HandleClientDepartmentRequest(CImPdu *pPdu)
{
//消息检查 时间戳对比数据变更
IM::Buddy::IMDepartmentReq msg;
CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));
log("HandleClientDepartmentRequest, user_id=%u, latest_update_time=%u.", GetUserId(), msg.latest_update_time());
//数据库
CDBServConn* pDBConn = get_db_serv_conn();
if (pDBConn) {
CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);
msg.set_user_id(GetUserId());
msg.set_attach_data(attach.GetBuffer(), attach.GetLength());
pPdu->SetPBMsg(&msg);
pDBConn->SendPdu(pPdu);
}
}
//关键类
//class CDBServConn : public CImPduConn
协议扩展
根据需要修改
BaseDefine.proto等 协议文件 增加相应的业务类型指令 KEY 同时修改server接收端处理业务逻辑 client 修改对应反序列化代码即可