1、概念
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC 数据交换格式。可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
将 程序数据转化成能被存储并传输的格式的过程被称为“序列化”(Serialization),而它的逆过程则可被称为“反序列化” (Deserialization)。
简单来说,序列化就是将对象实例的状态转换为可保持或传输的格式的过程。与序列化相对的是反序列化,它根据流重构对象。这两个过程结合起来,可以轻 松地存储和传输数据。
序列化:将对象变成字节流的形式传出去。 反序列化:从字节流恢复成原来的对象。
protobuf的反射功能
-
Message:Person是自定义的pb类型,继承自Message. MessageLite作为Message基类,更加轻量级一些。
通过Message的两个接口GetDescriptor/GetReflection
,可以获取该类型对应的Descriptor/Reflection。 -
Descriptor:Descriptor是对message类型定义的描述,包括message的名字、所有字段的描述、原始的proto文件内容等,提供的接口:获取所有字段的个数:
int field_count() const
获取单个字段描述类型FieldDescriptor
的接口。
-
FieldDescriptor:描述message中的单个字段,例如字段名,字段属性(optional/required/repeated)等。
-
Reflection:提供了动态读写pb字段的接口,对pb对象的自动读写主要通过该类完成。对每种类型,Reflection都提供了一个单独的接口用于读写字段对应的值。
//读操作
virtual int32 GetInt32 (const Message& message,const FieldDescriptor* field) const = 0;
virtual int64 GetInt64 (const Message& message,const FieldDescriptor* field) const = 0;
//对于枚举和嵌套的message
virtual const EnumValueDescriptor* GetEnum(const Message& message, const FieldDescriptor* field) const = 0;
virtual const Message& GetMessage(const Message& message,const FieldDescriptor* field,MessageFactory* factory = NULL) const = 0;
反射使用
- 通过类型名创建类型对象
// 先获得类型的Descriptor .
auto descriptor = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName("T.Test");
if (nullptr == descriptor)
{
return 0 ;
}
// 利用Descriptor拿到类型注册的instance. 这个是不可修改的.
auto prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);
if ( nullptr == descriptor)
{
return 0 ;
}
// 构造一个可用的消息.
auto message = prototype->New();
// 只有当我们预先编译了test消息并且正确链接才能这么干.
auto test = dynamic_cast<T::Test*>(message);
// 直接调用message的具体接口
// 其实这些接口是语法糖接口.所以并没有对应的反射机制来对应调用.
// 反射机制实现了的Set/Get XXX系列接口,是属于Reflection的接口,接收Message作为参数.
test->set_id(1);
- 通过对象和对象的属性的名字获取、修改对应的属性
// 拿到一个对象,不在乎怎么拿到,可以是通过反射拿到。
// 这里简单直接的创建一个.
T::Test p_test ;
// 拿到对象的描述包.
auto descriptor = p_test.GetDescriptor() ;
// 拿到对象的反射配置.
auto reflecter = p_test.GetReflection() ;
// 拿到属性的描述包.
auto field = descriptor->FindFieldByName("id");
// 设置属性的值.
reflecter->SetInt32(&p_test , field , 5 ) ;
// 获取属性的值.
std::cout<<reflecter->GetInt32(p_test , field)<< std::endl ;
2、实现
编写.proto文件
syntax = "proto3";
package pt;
option optimize_for = LITE_RUNTIME;
message req_login
{
string username = 1;
string password = 2;
}
message obj_user_info
{
string nickname = 1;
string icon = 2; //头像
int64 coin = 3; //金币
string location = 4; //所属地
}
//游戏数据统计
message obj_user_game_record
{
string time = 1;
int32 kill = 2; //击杀数
int32 dead = 3; //死亡数
int32 assist= 4; //助攻数
}
message rsp_login
{
enum RET {
SUCCESS = 0;
ACCOUNT_NULL = 1; //账号不存在
ACCOUNT_LOCK = 2; //账号锁定
PASSWORD_ERROR = 3; //密码错误
ERROR = 10;
}
int32 ret = 1;
obj_user_info user_info = 2;
repeated obj_user_game_record record = 3;
}
protobuf的message中有很多字段,每个字段的格式为: 修饰符 字段类型 字段名 = 域号;
序列化后的Value是按原样保存到字符串或者文件中,Key按照一定的转换条件保存起来,序列化后的结果就是 KeyValueKeyValue…。Key的序列化格式是按照message中字段后面的域号与字段类型来转换 .
通过:protoc -I=. --cpp_out=. game.proto 生成.h文件和.cc文件
测试程序:
#include <iostream>
#include <string>
#include "game.pb.h"
int main()
{
pt::rsp_login rsp{};
rsp.set_ret(pt::rsp_login_RET_SUCCESS);
auto user_info = rsp.mutable_user_info();
user_info->set_nickname("dsw");
user_info->set_icon("345DS55GF34D774S");
user_info->set_coin(2000);
user_info->set_location("zh");
for (int i = 0; i < 5; i++) {
auto record = rsp.add_record();
record->set_time("2017/4/13 12:22:11");
record->set_kill(i * 4);
record->set_dead(i * 2);
record->set_assist(i * 5);
}
std::string buff{};
rsp.SerializeToString(&buff);
//------------------解析----------------------
pt::rsp_login rsp2{};
if (!rsp2.ParseFromString(buff)) {
std::cout << "parse error\n";
}
auto temp_user_info = rsp2.user_info();
std::cout << "nickname:" << temp_user_info.nickname() << std::endl;
std::cout << "coin:" << temp_user_info.coin() << std::endl;
for (int m = 0; m < rsp2.record_size(); m++) {
auto temp_record = rsp2.record(m);
std::cout << "time:" << temp_record.time() << " kill:" << temp_record.kill() << " dead:" << temp_record.dead() << " assist:" << temp_record.assist() << std::endl;
}
}
编译:g++ Writer.cc game.pb.cc -o s -std=c++11 -I /usr/local/include/ -L /usr/local/lib/ -lprotobuf -lpthread
运行结果:
3、muduo中的protobuf编解码器
codec是一个间接层位于Tcpconnection与Server之间,将接到的数据解析出消息对象,再调用Server对应的处理函数进行处理。
代码解析:
编解码代码ProtobufCodec
//codec的基本功能之一是TCP分包
class ProtobufCodec : boost::noncopyable
{
public:
//出错处理
enum ErrorCode
{
kNoError = 0,
kInvalidLength,//长度超出范围
kCheckSumError,//check num不正确
kInvalidNameLen,//
kUnknownMessageType,//不能识别
kParseError,//解析出错
};
explicit ProtobufCodec(const ProtobufMessageCallback& messageCb)
: messageCallback_(messageCb),errorCallback_(defaultErrorCallback)
{
}//默认处理方式
ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb)
: messageCallback_(messageCb),errorCallback_(errorCb)
{
}//注册出错处理方式
};
//关键处理函数
//fillEmptyBuffer用消息内容来填充Buffer
void ProtobufCodec::fillEmptyBuffer(Buffer* buf, const google::protobuf::Message& message)
{
// buf->retrieveAll();
assert(buf->readableBytes() == 0);
const std::string& typeName = message.GetTypeName();//获取消息类型名
int32_t nameLen = static_cast<int32_t>(typeName.size() + 1);//类型名长度
buf->appendInt32(nameLen);//添加类型名长度到buffer
buf->append(typeName.c_str(), nameLen);//添加消息类型名
// code copied from MessageLite::SerializeToArray() and MessageLite::SerializePartialToArray().
GOOGLE_DCHECK(message.IsInitialized()) << InitializationErrorMessage("serialize", message);
int byte_size = message.ByteSize();
buf->ensureWritableBytes(byte_size);
uint8_t* start = reinterpret_cast<uint8_t*>(buf->beginWrite());//获取buffer可写指针
uint8_t* end = message.SerializeWithCachedSizesToArray(start);//将消息从buffer的writerindex处开始写
if (end - start != byte_size)//判断是否将消息完整写入
{
ByteSizeConsistencyError(byte_size, message.ByteSize(), static_cast<int>(end - start));
}
buf->hasWritten(byte_size);//更新写指针
int32_t checkSum = static_cast<int32_t>(
::adler32(1,
reinterpret_cast<const Bytef*>(buf->peek()),
static_cast<int>(buf->readableBytes())));//计算效验值
buf->appendInt32(checkSum);//写入效验值
assert(buf->readableBytes() == sizeof nameLen + nameLen + byte_size + sizeof checkSum);
int32_t len = sockets::hostToNetwork32(static_cast<int32_t>(buf->readableBytes()));
buf->prepend(&len, sizeof len);//在前面加上总长度
}
//onMessage
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime) //接收完整的一条消息,交给messageCallback_回调函数处理
{
while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)//判断缓冲区中是否满足最小的消息长度条件
{
const int32_t len = buf->peekInt32();
if (len > kMaxMessageLen || len < kMinMessageLen)//长度是否在规定范围内
{
errorCallback_(conn, buf, receiveTime, kInvalidLength);//长度超出范围,调用回调进行处理
break;
}
else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
{
ErrorCode errorCode = kNoError;
MessagePtr message = parse(buf->peek() + kHeaderLen, len, &errorCode);//解析出MessagePtr
if (errorCode == kNoError && message)
{
messageCallback_(conn, message, receiveTime);//调用消息处理函数
buf->retrieve(kHeaderLen + len);
}
else
{
errorCallback_(conn, buf, receiveTime, errorCode);//根据错误码进行相应的处理
break;
}
}
else
{
break;
}
}
}
//parse
MessagePtr ProtobufCodec::parse(const char* buf, int len, ErrorCode* error)
{
MessagePtr message;
// check sum
int32_t expectedCheckSum = asInt32(buf + len - kHeaderLen);
int32_t checkSum = static_cast<int32_t>(
::adler32(1,
reinterpret_cast<const Bytef*>(buf),
static_cast<int>(len - kHeaderLen)));//adler效验算法
if (checkSum == expectedCheckSum)//满足效验
{
// get message type name
int32_t nameLen = asInt32(buf);
if (nameLen >= 2 && nameLen <= len - 2 * kHeaderLen)//保证namelen大于2,同时保证消息的长度大于最小的值
{
std::string typeName(buf + kHeaderLen, buf + kHeaderLen + nameLen - 1);//取出消息类型名
// create message object
message.reset(createMessage(typeName));//建立消息对象
if (message)
{
// parse from buffer
const char* data = buf + kHeaderLen + nameLen;//从buffer中取出消息内容
int32_t dataLen = len - nameLen - 2 * kHeaderLen;
if (message->ParseFromArray(data, dataLen))//对新建的消息对象赋值
{
*error = kNoError;
}
else
{
*error = kParseError;
}
}
else
{
*error = kUnknownMessageType;
}
}
else
{
*error = kInvalidNameLen;
}
}
else
{
*error = kCheckSumError;
}
return message;
}
//createMessage
google::protobuf::Message* ProtobufCodec::createMessage(const std::string& typeName)
{
google::protobuf::Message* message = NULL;
const google::protobuf::Descriptor* descriptor =
google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);//通过类型名从DescriptorPool中得到Descriptor
if (descriptor)
{
const google::protobuf::Message* prototype =
google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);//通过Descriptor从MessageFactory中得到Message
if (prototype)
{
message = prototype->New();//创建新的消息对象
}
}
return message;
}
4、muduo中的protobuf消息分发器ProtobufDispatcher:按消息类型对消息进行分发
通过多态和模板实现,Callback是基类,定义了一个纯虚函数onMessage;CallbackT是一个模板类,定义了一个回调函数的指针,重写了基类的虚函数,该类是一个模板类,根据不同的类型,实现对不同消息的处理。ProtobufDispatcher是分发器实现的类,该类成员map是以Descriptor为key,以Callback*为值,提供了注册任意消息函数,以及消息分发函数。
template <typename T>
class CallbackT : public Callback //任意消息的对应的处理
{
public:
CallbackT(const ProtobufMessageTCallback& callback)
: callback_(callback)
{
}
virtual void onMessage(const muduo::net::TcpConnectionPtr& conn,
const MessagePtr& message,
muduo::Timestamp receiveTime) const //重写基类虚函数
{
boost::shared_ptr<T> concrete = muduo::down_pointer_cast<T>(message);
assert(concrete != NULL);
callback_(conn, concrete, receiveTime);//消息处理的回调函数
}
private:
ProtobufMessageTCallback callback_;
};
class ProtobufDispatcher //消息分发器
{
public:
explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb)
: defaultCallback_(defaultCb)
{
}
void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
const MessagePtr& message,
muduo::Timestamp receiveTime) const
{
CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());//根据传入消息的Descriptor,找到对应的callbackT
if (it != callbacks_.end())//找到
{
it->second->onMessage(conn, message, receiveTime);//调用相应消息的onMessage
}
else//未找到,选择默认处理方式
{
defaultCallback_(conn, message, receiveTime);
}
}
template<typename T>
void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback) //注册消息
{
boost::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));
callbacks_[T::descriptor()] = pd;
}
private:
typedef std::map<const google::protobuf::Descriptor*, boost::shared_ptr<Callback> > CallbackMap;
CallbackMap callbacks_;
ProtobufMessageCallback defaultCallback_;
};
#endif
消息分发器的使用
void onUnknownMessageType(const muduo::net::TcpConnectionPtr&,
const MessagePtr& message,
muduo::Timestamp) //定义未注册消息的处理方式
{
cout << "onUnknownMessageType: " << message->GetTypeName() << endl;
}
void onQuery(const muduo::net::TcpConnectionPtr&,
const MessagePtr& message,
muduo::Timestamp) //Query消息的处理
{
cout << "onQuery: " << message->GetTypeName() << endl;
boost::shared_ptr<muduo::Query> query = muduo::down_pointer_cast<muduo::Query>(message);
assert(query != NULL);
}
void onAnswer(const muduo::net::TcpConnectionPtr&,
const MessagePtr& message,
muduo::Timestamp) //Answer消息的处理
{
cout << "onAnswer: " << message->GetTypeName() << endl;
boost::shared_ptr<muduo::Answer> answer = muduo::down_pointer_cast<muduo::Answer>(message);
assert(answer != NULL);
}
int main()
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
//定义分发器
ProtobufDispatcherLite dispatcher(onUnknownMessageType);
//注册消息到分发器
dispatcher.registerMessageCallback(muduo::Query::descriptor(), onQuery);
dispatcher.registerMessageCallback(muduo::Answer::descriptor(), onAnswer);
muduo::net::TcpConnectionPtr conn;
muduo::Timestamp t;
boost::shared_ptr<muduo::Query> query(new muduo::Query);
boost::shared_ptr<muduo::Answer> answer(new muduo::Answer);
boost::shared_ptr<muduo::Empty> empty(new muduo::Empty);
dispatcher.onProtobufMessage(conn, query, t);
dispatcher.onProtobufMessage(conn, answer, t);
dispatcher.onProtobufMessage(conn, empty, t);
google::protobuf::ShutdownProtobufLibrary();
}
boost::bind使用说明:function是函数对象的“容器”类型,bind绑定成员函数,返回函数对象。bind可以绑定函数指针、函数应用、成员函数指针、函数对象作为回调,在绑定非成员函数或者类静态成员函数时,函数参数最多可以达到9个,在绑定成员函数时,函数参数最多可以达到8个,另外一个用于指明实例对象或者this指针。