版权声明:转载请注明来源 https://blog.csdn.net/u013702678/article/details/81252461
我们接着factory的start过程分析worker进程的启动过程,这个对应的入口代码在E:\swoole-src-master\src\network\Manager.c里面,也就是下面的代码。
/**
* create worker process
*/
for (i = 0; i < serv->worker_num; i++)
{
//close(worker_pipes[i].pipes[0]);
pid = swManager_spawn_worker(factory, i);
if (pid < 0)
{
swError("fork() failed.");
return SW_ERR;
}
else
{
serv->workers[i].pid = pid;
}
}
这块代码的其他的内容已经在其他篇章分析过了,这里我们只分析swManager_spawn_worker的实现。
static pid_t swManager_spawn_worker(swFactory *factory, int worker_id)
{
pid_t pid;
int ret;
pid = fork();//fork调用
//fork失败,这里失败原因可能是内存不足等原因。
if (pid < 0)
{
swWarn("Fork Worker failed. Error: %s [%d]", strerror(errno), errno);
return SW_ERR;
}
//子进程逻辑
else if (pid == 0)
{
//子进程执行swWorker_loop函数
ret = swWorker_loop(factory, worker_id);
exit(ret);
}
//父进程逻辑,返回子进程号,这个也是这个函数调用的返回值
else
{
return pid;
}
}
int swWorker_loop(swFactory *factory, int worker_id)
{
swServer *serv = factory->ptr;//获取server对象
//SwoolG的进程间通信属性初始化
#ifndef SW_WORKER_USE_SIGNALFD
SwooleG.use_signalfd = 0;
#elif defined(HAVE_SIGNALFD)
SwooleG.use_signalfd = 1;
#endif
//timerfd
#ifdef HAVE_TIMERFD
SwooleG.use_timerfd = 1;
#endif
//worker_id
SwooleWG.id = worker_id;//设置SwooleWG的id属性,从这里可以看到,SwooleWG抽象是worker对象
SwooleG.pid = getpid();//设置SwoolG的pid属性,从这里可以看到,SwoolG抽象的是worker进程
//获取worker对象,并且做初始化,这里会设置worker的各种回调函数,后面用到了再分析。
swWorker *worker = swServer_get_worker(serv, worker_id);
swServer_worker_init(serv, worker);
//申请worker进程对应的main_reactor的空间,每个worker进程只有一个reactor就够了
SwooleG.main_reactor = sw_malloc(sizeof(swReactor));
if (SwooleG.main_reactor == NULL)
{
swError("[Worker] malloc for reactor failed.");
return SW_ERR;
}
//创建main_reactor,这里后续展开分析
if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0)
{
swError("[Worker] create worker_reactor failed.");
return SW_ERR;
}
worker->status = SW_WORKER_IDLE;
//获取worker对象的管道信息
int pipe_worker = worker->pipe_worker;
swSetNonBlock(pipe_worker);//管道设置为非阻塞的
SwooleG.main_reactor->ptr = serv;//保存serv属性
//添加worker对象管道的事件到reactor中
SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ);
//注册读事件对应的回调函数
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive);
//注册写事件对应的回调函数
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_WRITE, swReactor_onWrite);
//设置worker对应的pipe_socket的buffer大小
int i;
swConnection *pipe_socket;
for (i = 0; i < serv->worker_num + serv->task_worker_num; i++)
{
worker = swServer_get_worker(serv, i);//获取worker对象
pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_master);//获取写的pipe_socket对象
pipe_socket->buffer_size = SW_MAX_INT;//设置pipe_socket对象的buffer_size属性
pipe_socket = swReactor_get(SwooleG.main_reactor, worker->pipe_worker);//获取读的pipe_socket对象
pipe_socket->buffer_size = SW_MAX_INT;//设置pipe_socket对象的buffer_size属性
}
if (serv->dispatch_mode == SW_DISPATCH_STREAM)//server的分发模式为unix域模式
{
//注册unix域描述符的SW_FD_LISTEN和SW_EVENT_READ事件
SwooleG.main_reactor->add(SwooleG.main_reactor, serv->stream_fd, SW_FD_LISTEN | SW_EVENT_READ);
//设置SW_FD_LISTEN事件的回调函数
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_LISTEN, swWorker_onStreamAccept);
//设置SW_FD_STREAM事件的回调函数
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_STREAM, swWorker_onStreamRead);
//初始化server的stream_protocol属性,这个属性控制着协议的一些操作,类似读多少属于协议头等部分
swStream_set_protocol(&serv->stream_protocol);
serv->stream_protocol.package_max_length = SW_MAX_INT;//协议最大包体长度信息
serv->stream_protocol.onPackage = swWorker_onStreamPackage;//协议接收到Package时的回调函数
serv->buffer_pool = swLinkedList_new(0, NULL);//server的buffer池的初始化,这里是通过双向链表的方式做控制
}
//worker对象的start过程
swWorker_onStart(serv);
#ifdef HAVE_SIGNALFD //注册信号处理事件到reactor中
if (SwooleG.use_signalfd)
{
swSignalfd_setup(SwooleG.main_reactor);
}
#endif
//执行reactor主事件循环,这里根据reactor的不同,会这些不同的操作,如果是通过select做的Multi IO,则这里执行的就是select调用
SwooleG.main_reactor->wait(SwooleG.main_reactor, NULL);
//如果退出了主事件循环,则执行下面的一些清理动作
swWorker_clean();//执行用于进程通行的pipe的清理
swWorker_onStop(serv);//执行PHP侧的onWorkerStop回调函数
return SW_OK;
}
这里把用户自定义的回调函数处理过程也一起分析下,在代码文件E:\swoole-src-master\src\network\Manager.c中,对应的代码如下图所示:
/**
* create user worker process
*/
if (serv->user_worker_list)
{
swUserWorker_node *user_worker;
LL_FOREACH(serv->user_worker_list, user_worker)
{
/**
* store the pipe object
*/
if (user_worker->worker->pipe_object)
{
swServer_store_pipe_fd(serv, user_worker->worker->pipe_object);
}
swManager_spawn_user_worker(serv, user_worker->worker);
}
}
展开swManager_spawn_user_worker分析下。
pid_t swManager_spawn_user_worker(swServer *serv, swWorker* worker)
{
pid_t pid = fork();//fork进程,这里对于用户回调函数的处理都是在单独的进程中处理,这样不会阻塞worker进程
if (pid < 0) //fork失败
{
swWarn("Fork Worker failed. Error: %s [%d]", strerror(errno), errno);
return SW_ERR;
}
//child
else if (pid == 0)
{
SwooleG.process_type = SW_PROCESS_USERWORKER;
SwooleWG.worker = worker;
SwooleWG.id = worker->id;
worker->pid = getpid();
//close tcp listen socket
if (serv->factory_mode == SW_MODE_SINGLE)
{
swServer_close_port(serv, SW_TRUE);
}
serv->onUserWorkerStart(serv, worker);//调用用户侧自定义的onWorkerStart回调函数
exit(0);
}
//父进程逻辑
else
{
if (worker->pid)
{
swHashMap_del_int(serv->user_worker_map, worker->pid);
}
worker->pid = pid;
swHashMap_add_int(serv->user_worker_map, pid, worker);
return pid;
}
}