spp worker Demo
spp_rpc/src/worker/main.cpp
main函数执行入口:
#include "defaultworker.h"
#include "comm_def.h"
using namespace spp;
using namespace spp::worker;
CServerBase* g_worker = NULL;
// ...
int main(int argc, char* argv[])
{
// 生成worker
g_worker = new CDefaultWorker;
if (g_worker)
{
// worker 运行并且阻塞
g_worker->run(argc, argv);
// worker 结束运行释放内存
delete g_worker;
}
return 0;
}
spp_rpc/src/worker/defaultworker.h
namespace spp
{
namespace worker
{
class CDefaultWorker : public CServerBase, public CFrame
{
public:
// 构造函数+析构函数
// 获取是否启用微线程
bool get_mt_flag();
// 微线程切换函数
void handle_switch(bool block);
// 重写CServerBase的realrun函数
void realrun(int argc, char* argv[]);
// 定义服务类型
// 注册spp框架信号处理函数
void assign_signal(int signo);
// 框架循环调用的逻辑,用于config reload
int loop();
//初始化配置
int initconf(bool reload = false);
static void shm_delay_stat(int64_t time_delay)
{
if(time_delay <= 1)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_1);
}
else if(time_delay <= 10)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_10);
}
else if(time_delay <= 50)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_50);
}
else if(time_delay <= 100)
{
MONITOR(MONITOR_WORKER_RECV_DELAY_100);
}
else
{
MONITOR(MONITOR_WORKER_RECV_DELAY_XXX);
}
}
//一些回调函数
static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要
static int ator_senddata(unsigned flow, void* arg1, void* arg2); //非必要
static int ator_overload(unsigned flow, void* arg1, void* arg2); //非必要
static int ator_senderror(unsigned flow, void* arg1, void* arg2); //必要
//接受者
CTCommu* ator_;
inline int get_TOS(){return TOS_;}
private:
unsigned msg_timeout_;
int TOS_;
int notify_fd_; // socket commu need notify mircro thread
};
}
}
spp_rpc/src/comm/serverbase.h
namespace spp {
namespace comm {
// 服务器程序基础类,包含运行环境初始化、日志、统计、监控对象
class CServerBase {
public:
// 构造函数+析构函数
// 可重写的虚函数
virtual void run(int argc, char *argv[]);
virtual void startup(bool bg_run = true);
virtual void realrun(int argc, char *argv[]) {}
// 业务名和服务类型的描述,估计会和业务监控日志以及部署相关联
// 业务日志
CTLog log_;
// 统计
CTStat stat_;
// 监控
CTProcMonCli moncli_;
protected:
// 内部监控时间间隔
public:
///////////////////////////////////////////////////////////////////////
// 服务reload退出以及相关信号处理
static bool reload();
static bool quit();
static void sigusr1_handle(int signo);
static void sigusr2_handle(int signo);
};
}
}
看看比较重要的realrun虚函数的实现
spp_rpc/src/worker/defaultworker.cpp
void CDefaultWorker::realrun(int argc, char* argv[])
{
// 初始化配置
SingleTon<CTLog, CreateByProto>::SetProto(&flog_);
initconf(false);
time_t nowtime = time(NULL), montime = 0;
int64_t now_ms = 0;
while (true)
{
///< start: micro thread handle loop entry add 20130715
if (sppdll.spp_handle_loop)
{
sppdll.spp_handle_loop(this);
}
///< end: micro thread handle loop entry 20130715
// == 0 时,表示没取到请求,进入较长时间异步等待
bool isBlock = (ator_->poll(false) == 0);
static CSyncFrame* sync_frame = CSyncFrame::Instance();
sync_frame->HandleSwitch(isBlock);
// Check and reconnect net proxy, default 10 ms
now_ms = get_time_ms();
// 检查quit信号
if (unlikely(CServerBase::quit()) || unlikely(CServerBase::reload()))
{
now_ms = get_time_ms();
// 保证剩下的请求都处理完
if (unlikely(CServerBase::quit()))
{
flog_.LOG_P_PID(LOG_FATAL, "recv quit signal at %u\n", now_ms);
ator_->poll(true);
}
else
{
flog_.LOG_P_PID(LOG_FATAL, "recv reload signal at %u\n", now_ms);
}
int timeout = 0;
//微线程
while (CSyncFrame::Instance()->GetThreadNum() > 1 && timeout < 1000)
{
CSyncFrame::Instance()->sleep(10000);
timeout += 10;
}
now_ms = get_time_ms();
flog_.LOG_P_PID(LOG_FATAL, "exit at %u\n", now_ms);
break;
}
//监控信息上报
nowtime = time(NULL);
if ( unlikely(nowtime - montime > ix_->moni_inter_time_) )
{
CLI_SEND_INFO(&moncli_)->timestamp_ = nowtime;
moncli_.run();
montime = nowtime;
flog_.LOG_P_PID(LOG_DEBUG, "moncli run!\n");
}
loop();
}
g_check_point = CoreCheckPoint_HandleFini; // 设置待调用插件的CheckPoint
if (sppdll.spp_handle_fini != NULL)
// 调用spp_handle_fini函数释放资源
sppdll.spp_handle_fini(NULL, this);
g_check_point = CoreCheckPoint_SppFrame; // 恢复CheckPoint,重置为CoreCheckPoint_SppFrame
CStatMgrInstance::destroy();
}
ator_ 类型为CTCommu*,负责接受信号。
//通讯类抽象接口
class CTCommu
{
public:
CTCommu() {
memset(func_list_, 0, sizeof(cb_func) *(CB_TIMEOUT + 1));
memset(func_args_, 0, sizeof(void*) *(CB_TIMEOUT + 1));
}
virtual ~CTCommu() {}
//初始化
//config:配置文件名或者配置参数内存指针
virtual int init(const void* config) = 0;
//轮询,收发数据
//block: true表示使用阻塞模式,否则非阻塞模式
virtual int poll(bool block = false) = 0;
//发送数据提交
//flow: 数据包唯一标示
//arg1: 通用参数指针1, 一般指向数据blob
//arg2: 通用参数指针2,保留
virtual int sendto(unsigned flow, void* arg1, void* arg2) = 0;
//控制接口
//flow: 数据包唯一标示
//type: 控制命令
//arg1: 通用参数指针1,具体组件有具体的含义
//arg2: 通用参数指针2,具体组件有具体的含义
virtual int ctrl(unsigned flow, ctrl_type type, void* arg1, void* arg2) = 0;
//注册回调
//type: 回调函数类型
//func: 回调函数
//args: 用户自定义参数指针, 作为回调函数的第2个通用参数传递
virtual int reg_cb(cb_type type, cb_func func, void* args = NULL) {
if (type <= CB_TIMEOUT) {
func_list_[type] = func;
func_args_[type] = args;
return 0;
} else {
return -1;
}
}
//清空所有共享内存队列,仅供proxy启动时使用
virtual int clear() = 0;
protected:
cb_func func_list_[CB_TIMEOUT + 1];
void* func_args_[CB_TIMEOUT + 1];
//释放资源
virtual void fini() = 0;
};
ator_->poll(false) 按照注解说明,使用非阻塞模式接受数据。
那么ator 具体实现是initconf中
spp_rpc/src/worker/defaultworker.cpp
类比golang的gpm模型
// 使用CTShmCommu实现
ator_ = new CTShmCommu;
ret = ator_->init(&shm);
// 注册了一些数据操作的回调
...
// 调用了spp_handle_init函数
handle_init_ret = sppdll.spp_handle_init((void*)module_etc.c_str(), this);
spp_rpc/src/comm/tbase/tshmcommu.cpp
有空再分析如何实现poll。
[poll 实现](https://github.com/Tencent/MSEC/blob/master/document/msec/cpp_dev_manual.md)
spp_rpc/src/comm/serverbase.cpp
run->startup+realrun
void CServerBase::run(int argc, char* argv[])
{
// 启动参数解析
startup(true);
realrun(argc, argv);
}
// 信号处理
void CServerBase::startup(bool bg_run)
{
//默认需要root权限才能setrlimit
struct rlimit rlim;
if (0 == getrlimit(RLIMIT_NOFILE, &rlim))
{
rlim.rlim_cur = rlim.rlim_max;
setrlimit(RLIMIT_NOFILE, &rlim);
if (rlim.rlim_cur < 100000) // fix for limits over 100000
{
rlim.rlim_cur = 100000;
rlim.rlim_max = 100000;
setrlimit(RLIMIT_NOFILE, &rlim);
}
}
mallopt(M_MMAP_THRESHOLD, 1024*1024); // 1MB,防止频繁mmap
mallopt(M_TRIM_THRESHOLD, 8*1024*1024); // 8MB,防止频繁brk
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGTTOU, SIG_IGN);
signal(SIGTTIN, SIG_IGN);
signal(SIGCHLD, SIG_IGN);
if (bg_run)
{
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
daemon(1, 1);
}
CServerBase::flag_ = 0;
//signal(SIGSEGV, CServerBase::sigsegv_handle);
signal(SIGUSR1, CServerBase::sigusr1_handle);
signal(SIGUSR2, CServerBase::sigusr2_handle);
}
启动流程大体走一遍了,那么接受到数据会进行怎样的回调呢?
spp_rpc/src/worker/defaultworker.h
// 回调入口
static int ator_recvdata(unsigned flow, void* arg1, void* arg2); //必要
static int ator_recvdata_v2(unsigned flow, void* arg1, void* arg2); //必要
static int ator_senderror(unsigned flow, void* arg1, void* arg2); //必要
spp_rpc/src/worker/defaultworker.cpp
int CDefaultWorker::ator_recvdata_v2(unsigned flow, void* arg1, void* arg2)
{
blob_type* blob = (blob_type*)arg1;
CDefaultWorker* worker = (CDefaultWorker*)arg2;
if (likely(blob->len > 0))
{
TConnExtInfo* ptr = NULL;
MONITOR(MONITOR_WORKER_FROM_PROXY);
blob->len -= sizeof(TConnExtInfo);
blob->extdata = blob->data + blob->len;
ptr = (TConnExtInfo*)blob->extdata;
int64_t recv_ms = int64_t(ptr->recvtime_) * 1000 + ptr->tv_usec / 1000;
int64_t now = get_time_ms();
int64_t time_delay = now - recv_ms;
worker->fstat_.op(WIDX_MSG_SHM_TIME, time_delay);
add_memlog(blob->data, blob->len);
// 超时处理
worker->flog_.LOG_P_FILE(LOG_DEBUG, "ator recvdone, flow:%u, blob len:%d\n", flow, blob->len);
worker->fstat_.op(WIDX_SHM_RX_BYTES, blob->len); // 累加接收字节数
g_check_point = CoreCheckPoint_HandleProcess; // 设置待调用插件的CheckPoint
// 调用spp_handle_process函数
int ret = sppdll.spp_handle_process(flow, arg1, arg2);
g_check_point = CoreCheckPoint_SppFrame; // 恢复CheckPoint,重置为CoreCheckPoint_SppFrame
if (likely(!ret))
{
MONITOR(MONITOR_WORKER_PROC_SUSS);
return 0;
}
else
{
MONITOR(MONITOR_WORKER_PROC_FAIL);
CTCommu* commu = (CTCommu*)blob->owner;
blob_type rspblob;
rspblob.len = 0;
rspblob.data = NULL;
commu->sendto(flow, &rspblob, NULL);
}
}
return -1;
}
具体几个spp函数的实现:
spp_rpc/src/module/example/simple/echo_example.cpp
/**
* Tencent is pleased to support the open source community by making MSEC available.
*
* Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the GNU General Public License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* https://opensource.org/licenses/GPL-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
//必须包含spp的头文件
#include "sppincl.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
//格式化时间输出
char *format_time( time_t tm);
//初始化方法(可选实现)
//arg1: 配置文件
//arg2: 服务器容器对象
//返回0成功,非0失败
extern "C" int spp_handle_init(void* arg1, void* arg2)
{
//插件自身的配置文件
const char* etc = (const char*)arg1;
//服务器容器对象
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_init, config:%s, servertype:%d\n", etc, base->servertype());
return 0;
}
//数据接收(必须实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回值:> 0 表示数据已经接收完整且该值表示数据包的长度
// == 0 表示数据包还未接收完整
// < 0 负数表示出错,将会断开连接
extern "C" int spp_handle_input(unsigned flow, void* arg1, void* arg2)
{
//数据块对象,结构请参考tcommu.h
blob_type* blob = (blob_type*)arg1;
//extinfo有扩展信息
TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
//服务器容器对象
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P(LOG_DEBUG, "spp_handle_input[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
format_time(extinfo->recvtime_),
flow,
blob->len,
inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));
return blob->len;
}
//路由选择(可选实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回值表示worker的组号
extern "C" int spp_handle_route(unsigned flow, void* arg1, void* arg2)
{
//服务器容器对象
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_FILE(LOG_DEBUG, "spp_handle_route, flow:%d\n", flow);
return 1;
}
//数据处理(必须实现)
//flow: 请求包标志
//arg1: 数据块对象
//arg2: 服务器容器对象
//返回0表示成功,非0失败(将会主动断开连接)
extern "C" int spp_handle_process(unsigned flow, void* arg1, void* arg2)
{
//数据块对象,结构请参考tcommu.h
blob_type* blob = (blob_type*)arg1;
//数据来源的通讯组件对象
CTCommu* commu = (CTCommu*)blob->owner;
//extinfo有扩展信息
TConnExtInfo* extinfo = (TConnExtInfo*)blob->extdata;
//服务器容器对象
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_process[recv time:%s] flow:%d, buffer len:%d, client ip:%s\n",
format_time(extinfo->recvtime_),
flow,
blob->len,
inet_ntoa(*(struct in_addr*)&extinfo->remoteip_));
//echo logic
int ret = commu->sendto(flow, arg1, arg2 );
if (ret != 0)
{
base->log_.LOG_P_PID(LOG_ERROR, "send response error, ret:%d\n", ret);
return ret;
}
//if all responses were sent, send a NULL blob to release flow
//blob_type release_cmd;
//release_cmd.data = NULL;
//release_cmd.len = 0;
//commu->sendto(flow, &release_cmd, arg2);
return 0;
}
//析构资源(可选实现)
//arg1: 保留参数
//arg2: 服务器容器对象
extern "C" void spp_handle_fini(void* arg1, void* arg2)
{
//服务器容器对象
CServerBase* base = (CServerBase*)arg2;
base->log_.LOG_P_PID(LOG_DEBUG, "spp_handle_fini\n");
}
char *format_time( time_t tm)
{
static char str_tm[1024];
struct tm tmm;
memset(&tmm, 0, sizeof(tmm) );
localtime_r((time_t *)&tm, &tmm);
snprintf(str_tm, sizeof(str_tm), "[%04d-%02d-%02d %02d:%02d:%02d]",
tmm.tm_year + 1900, tmm.tm_mon + 1, tmm.tm_mday,
tmm.tm_hour, tmm.tm_min, tmm.tm_sec);
return str_tm;
}
转自: https://blog.csdn.net/qq_17612199/article/details/80828205