目录
0.2 ::std::string中在std前面加::是啥意思?
0.3 复习equal_range的用法,在注册RPC接口的时候用到
0.复习与引用与问题
0.1 复习
如链接中的【Servant,XXXServant及XXXServantImp之间的关系】中阐述的那样,在tars中,Servant,XXXServant及XXXServantImp有如下关系:
搜索__tars__QueryF_all,__tars__Registry_all
0.2 ::std::string中在std前面加::是啥意思?
0.3 复习equal_range的用法,在注册RPC接口的时候用到
1.基于QueryImp的研究
1.1 源码及QueryImp相关类对应的关系
D:\005-02-代码\016-TARS\TARS\TarsFramework\RegistryServer\QueryImp.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\RegistryServer\QueryImp.cpp
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\protocol\servant\QueryF.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\Servant.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\BaseNotify.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_thread_mutex.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_autoptr.h
class QueryImp: public QueryF{}
class QueryF : public tars::Servant{}
class Servant : public BaseNotify{}
class BaseNotify : public TC_ThreadRecMutex, public TC_HandleBase{}
1.2 各个类概况
1.2.1 BaseNotify
class BaseNotify : public TC_ThreadRecMutex, public TC_HandleBase
{
public:
BaseNotify();// 构造函数
virtual ~BaseNotify(); // 析构函数
bool notify(const string& command, const string& params, CurrentPtr current, string& result);// 接收管理命令
using TAdminFunc = std::function<bool (const string&, const string&, string& )>;// 处理命令的函数类型
void addAdminCommandNormal(const string& command, TAdminFunc func);// 添加Servant管理命令和对应的处理方法
void addAdminCommandPrefix(const string& command, TAdminFunc func);// 添加管理命令和对应的处理方法
protected:
void setNotifyObserver(const shared_ptr<NotifyObserver> ¬ifyObserver) { _observer = notifyObserver; }
protected:
map<string, TAdminFunc> _procFunctors;// 命令处理方法
shared_ptr<NotifyObserver> _observer;// notify observer
};
1.2.2 Servant
class Servant : public BaseNotify
{
public:
Servant();// 构造函数
~Servant();// 析构函数
void setName(const string &name);// 设置名称
string getName() const;// 获取名称
void setHandle(TC_EpollServer::Handle* handle);// 设置所属的Handle
void setApplication(Application *application);// 设置全局的应用
Application* getApplication() const;// 获取应用
TC_EpollServer::Handle* getHandle();// 获取所属的Handle
virtual void initialize() = 0;// 初始化
virtual void destroy() = 0;// 退出
public:
virtual int dispatch(CurrentPtr current, vector<char> &buffer);// 分发收到的请求
virtual int onDispatch(CurrentPtr current, vector<char> &buffer) { return -1; }// 分发并处理请求
public:
virtual int doRequest(CurrentPtr current, vector<char> &buffer) { return -1; }// 非tars的普通协议的请求,没有方法名,不需要Dispatch
virtual int doResponse(ReqMessagePtr resp) { return -1; }// 作为客户端访问其他server时,成功返回的响应接口
virtual int doResponseException(ReqMessagePtr resp) { return -1; }// 作为客户端访问其他server时,返回其他异常的响应接口
virtual int doResponseNoRequest(ReqMessagePtr resp) { return -1; }// 作为客户端访问其他server时,如果resp没有找到request,则响应该接口,例如:push消息或者超时后服务端迟到的响应
/*
每次handle被唤醒后都会调用,业务可以通过在其他线程中调用handle的notify
[一般都需要配合业务自有的队列使用,队列可以封装在ServantImp对象中]
关于参数bExpectIdle: bExpectIdle为true时,在循环Adapter时调用的, 处理请求前调用(心跳)
*/
virtual int doCustomMessage(bool bExpectIdle) { return -1; }
virtual int doCustomMessage() { return -1; }// 不带参数的是为了兼容老版本。尽量用带参数的函数,现在不带参数和带参数的都会调用
virtual int doClose(CurrentPtr current){ return -1; }// 客户端关闭连接时的处理
TC_CasQueue<ReqMessagePtr>& getResponseQueue();// 获得响应的数据队列
protected:
string _name;// 名字
Application *_application;// 应用
TC_EpollServer::Handle* _handle;// 所属的Handle
/*异步响应队列, 每个Servant一个队列, 这个用于在ServantImp中, 再异步发请求给其他服务,回调Callback使用ServantCallback时,
能保证响应包在Servant::doResponse中响应, 且和发送ServantImp是同一个线程,缺点就是Servant::onDispatch, 通知ServantImp时, 需要把所有线程都唤醒*/
TC_CasQueue<ReqMessagePtr> _asyncResponseQueue;
};
1.2.3 QueryF
class QueryF : public tars::Servant
{
public:
virtual ~QueryF(){}
virtual vector<tars::EndpointF> findObjectById(const std::string & id,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectById(tars::TarsCurrentPtr current, const vector<tars::EndpointF> &_ret){...}
virtual tars::Int32 findObjectById4All(const std::string & id,vector<tars::EndpointF> &activeEp,vector<tars::EndpointF> &inactiveEp,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectById4All(tars::TarsCurrentPtr current, tars::Int32 _ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp){...}
virtual tars::Int32 findObjectById4Any(const std::string & id,vector<tars::EndpointF> &activeEp,vector<tars::EndpointF> &inactiveEp,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectById4Any(tars::TarsCurrentPtr current, tars::Int32 _ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp){...}
virtual tars::Int32 findObjectByIdInSameGroup(const std::string & id,vector<tars::EndpointF> &activeEp,vector<tars::EndpointF> &inactiveEp,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectByIdInSameGroup(tars::TarsCurrentPtr current, tars::Int32 _ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp){...}
virtual tars::Int32 findObjectByIdInSameSet(const std::string & id,const std::string & setId,vector<tars::EndpointF> &activeEp,vector<tars::EndpointF> &inactiveEp,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectByIdInSameSet(tars::TarsCurrentPtr current, tars::Int32 _ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp){...}
virtual tars::Int32 findObjectByIdInSameStation(const std::string & id,const std::string & sStation,vector<tars::EndpointF> &activeEp,vector<tars::EndpointF> &inactiveEp,tars::TarsCurrentPtr current) = 0;
static void async_response_findObjectByIdInSameStation(tars::TarsCurrentPtr current, tars::Int32 _ret, const vector<tars::EndpointF> &activeEp, const vector<tars::EndpointF> &inactiveEp){...}
public:
//VIP Pay Attenton and Lookup the concrete implement in the source
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer){...}
};
1.2.4 QueryImp
class QueryImp: public QueryF
{
public:
QueryImp(){};// 构造函数
virtual void initialize();// 初始化
virtual void destroy() {};// 退出
// 根据id获取所有该对象的活动endpoint列表
virtual vector<EndpointF> findObjectById(const string & id, CurrentPtr current);
// 根据id获取所有对象,包括活动和非活动对象
virtual Int32 findObjectById4Any(const std::string & id, vector<EndpointF> &activeEp, vector<EndpointF> &inactiveEp, CurrentPtr current);
// 根据id获取对象所有endpoint列表
Int32 findObjectById4All(const std::string & id, vector<EndpointF> &activeEp, vector<EndpointF> &inactiveEp, CurrentPtr current);
// 根据id获取对象同组endpoint列表
Int32 findObjectByIdInSameGroup(const std::string & id, vector<EndpointF> &activeEp, vector<EndpointF> &inactiveEp, CurrentPtr current);
// 根据id获取对象指定归属地的endpoint列表
Int32 findObjectByIdInSameStation(const std::string & id, const std::string & sStation, vector<EndpointF> &activeEp, vector<EndpointF> &inactiveEp, CurrentPtr current);
// 根据id获取对象同set endpoint列表
Int32 findObjectByIdInSameSet(const std::string & id,const std::string & setId,vector<EndpointF> &activeEp,vector<EndpointF> &inactiveEp, CurrentPtr current);
private:
// 打印按天日志
void doDaylog(const FUNID eFnId,const string& id,const vector<EndpointF> &activeEp, const vector<EndpointF> &inactiveEp, const CurrentPtr& current,const std::ostringstream& os,const string& sSetid="");
string eFunTostr(const FUNID eFnId);// 转化成字符串
protected:
CDbHandle _db;//数据库操作
bool _openDayLog = false;
};
1.3 提供的RPC接口
static ::std::string __tars__QueryF_all[]=
{
"findObjectById",
"findObjectById4All",
"findObjectById4Any",
"findObjectByIdInSameGroup",
"findObjectByIdInSameSet",
"findObjectByIdInSameStation"
};
2.基于RegistryImp的研究
2.1 源码及RegistryImp相关类对应的关系
D:\005-02-代码\016-TARS\TARS\TarsFramework\RegistryServer\RegistryImp.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\RegistryServer\RegistryImp.cpp
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\protocol\framework\Registry.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\Servant.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\BaseNotify.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_thread_mutex.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_autoptr.h
class RegistryImp: public Registry{}
class Registry : public tars::Servant{}
class Servant : public BaseNotify{}
class BaseNotify : public TC_ThreadRecMutex, public TC_HandleBase{}
2.2 各个类概况
2.2.1 BaseNotify
见本文1.2.1节
2.2.2 Servant
见本文1.2.2节
2.2.3 Registry
class Registry : public tars::Servant
{
public:
virtual ~Registry(){}
virtual tars::Int32 destroyNode(const std::string & nodeName,tars::TarsCurrentPtr current) = 0;
static void async_response_destroyNode(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 getClientIp(std::string &sClientIp,tars::TarsCurrentPtr current) = 0;
static void async_response_getClientIp(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &sClientIp){...}
virtual tars::Int32 getNodeTemplate(const std::string & nodeName,std::string &profileTemplate,tars::TarsCurrentPtr current) = 0;
static void async_response_getNodeTemplate(tars::TarsCurrentPtr current, tars::Int32 _ret, const std::string &profileTemplate){...}
virtual vector<tars::ServerDescriptor> getServers(const std::string & app,const std::string & serverName,const std::string & nodeName,tars::TarsCurrentPtr current) = 0;
static void async_response_getServers(tars::TarsCurrentPtr current, const vector<tars::ServerDescriptor> &_ret){...}
virtual tars::Int32 keepAlive(const std::string & nodeName,const tars::LoadInfo & load,tars::TarsCurrentPtr current) = 0;
static void async_response_keepAlive(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 registerNode(const std::string & nodeName,const tars::NodeInfo & ni,const tars::LoadInfo & li,tars::TarsCurrentPtr current) = 0;
static void async_response_registerNode(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 reportVersion(const std::string & app,const std::string & serverName,const std::string & nodeName,const std::string & version,tars::TarsCurrentPtr current) = 0;
static void async_response_reportVersion(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 updatePatchResult(const tars::PatchResult & result,tars::TarsCurrentPtr current) = 0;
static void async_response_updatePatchResult(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 updateServer(const std::string & app,const std::string & serverName,const std::string & nodeName,const tars::ServerStateInfo & state,tars::TarsCurrentPtr current) = 0;
static void async_response_updateServer(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
virtual tars::Int32 updateServerBatch(const vector<tars::ServerStateInfo> & vState,tars::TarsCurrentPtr current) = 0;
static void async_response_updateServerBatch(tars::TarsCurrentPtr current, tars::Int32 _ret){...}
public:
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer){...}
};
2.2.4 RegistryImp
class RegistryImp: public Registry
{
public:
RegistryImp(){};// 构造函数
virtual void initialize();// 初始化
virtual void destroy() {};// 退出
int get(int &i, CurrentPtr current);// 获取数据
virtual int registerNode(const string & name, const NodeInfo & ni, const LoadInfo & li, CurrentPtr current);// node启动的时候往registry注册一个session
virtual int keepAlive(const string& name, const LoadInfo & li, CurrentPtr current);// node上报心跳负载
// 获取在该node部署的server列表
virtual vector<ServerDescriptor> getServers(const std::string & app, const std::string & serverName, const std::string & nodeName, CurrentPtr current);
// 更新server状态
virtual int updateServer(const string & nodeName, const string & app, const string & serverName, const ServerStateInfo & stateInfo, CurrentPtr current);
// 批量更新server状态
virtual int updateServerBatch(const std::vector<ServerStateInfo> & vecStateInfo, CurrentPtr current);
// node停止,释放node的会话
virtual int destroyNode(const string & name, CurrentPtr current);
// 上报server的tars库版本
virtual int reportVersion(const string & app, const string & serverName, const string & nodeName, const string & version, CurrentPtr current);
virtual Int32 getNodeTemplate(const std::string & nodeName, std::string &profileTemplate, CurrentPtr current);// 获取node的模板配置
virtual Int32 getClientIp(std::string &sClientIp, CurrentPtr current);// node通过接口获取连接上主控的node ip
virtual Int32 updatePatchResult(const PatchResult & result, CurrentPtr current);// 发布任务完成后,UPDATE版本号和发布人
protected:
CDbHandle _db;// 数据库操作
};
2.3 提供的RPC接口
在class Registry : public tars::Servant中搜索【::std::string __tars__Registry_all】.
static ::std::string __tars__Registry_all[]=
{
"destroyNode",
"getClientIp",
"getNodeTemplate",
"getServers",
"keepAlive",
"registerNode",
"reportVersion",
"updatePatchResult",
"updateServer",
"updateServerBatch"
};
3.基于AdminServant的研究
3.1 源码及AdminServant相关类对应的关系
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\AdminServant.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\libservant\AdminServant.cpp
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\AdminF.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\Servant.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\servant\servant\BaseNotify.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_thread_mutex.h
D:\005-02-代码\016-TARS\TARS\TarsFramework\tarscpp\util\include\util\tc_autoptr.h
class AdminServant : public AdminF{}
class AdminF : public tars::Servant{}
class Servant : public BaseNotify{}
class BaseNotify : public TC_ThreadRecMutex, public TC_HandleBase{}
3.2 各个类概况
3.2.1 BaseNotify
见本文1.2.1节
3.2.2 Servant
见本文1.2.2节
3.2.3 AdminF
class AdminF : public tars::Servant
{
public:
virtual ~AdminF(){}
virtual std::string notify(const std::string & command,tars::TarsCurrentPtr current) = 0;
static void async_response_notify(tars::TarsCurrentPtr current, const std::string &_ret){...}
virtual void shutdown(tars::TarsCurrentPtr current) = 0;
static void async_response_shutdown(tars::TarsCurrentPtr current){...}
public:
int onDispatch(tars::TarsCurrentPtr _current, vector<char> &_sResponseBuffer){}
}
3.2.4 AdminServant
// 管理Servant
class AdminServant : public AdminF
{
public:
AdminServant();// 构造函数
virtual ~AdminServant();// 析构函数
virtual void initialize();// 初始化
virtual void destroy();// 退出
void shutdown(CurrentPtr current);// 关闭服务
string notify(const string &command, CurrentPtr current);
};
3.3 提供的RPC接口
在类class AdminF : public tars::Servant中搜索【::std::string __tars__AdminF_all】
static ::std::string __tars__AdminF_all[]=
{
"notify",
"shutdown"
};
4.总体概括
5.通过
/usr/local/app/web/node_modules/@tars/registry/node_modules/@tars/rpc
6.管理平台 API
https://doc.tarsyun.com/adminer/start/index.html#/tarsdoc/SUMMARY.md