版权声明:转载请注明来源 https://blog.csdn.net/u013702678/article/details/81176683
上一篇文章分析了单进程单线程模型的server的启动过程,这篇文章分析其他模型的启动流程,我们从刚才的else流程开始,也就是从函数swReactorThread_create开始。
int swReactorThread_create(swServer *serv)
{
int ret = 0;
//从swoole自带的内存池申请reactor线程的空间,其中serv的属性reactor_num是swoole_server创建的时候初始化的。
serv->reactor_threads = SwooleG.memory_pool->alloc(SwooleG.memory_pool, (serv->reactor_num * sizeof(swReactorThread)));
if (serv->reactor_threads == NULL)//申请内存空间失败,一般是内存不足会造成这里的失败。
{
swError("calloc[reactor_threads] fail.alloc_size=%d", (int )(serv->reactor_num * sizeof(swReactorThread)));
return SW_ERR;
}
//为connection_list申请空间,connection是对网络连接的抽象,如果是单进程模式,则通过共享内存的方式申请空间。如果是其他模式,则通过calloc申请空间,这里具体为什么这样申请,后续再看。
if (serv->factory_mode == SW_MODE_PROCESS) //单进程模式
{
//为connection_list申请空间,通过mmap到/dev/zero的方式实现共享内存
serv->connection_list = sw_shm_calloc(serv->max_connection, sizeof(swConnection));
}
else//其他模式
{
//sw_calloc是对calloc的封装
serv->connection_list = sw_calloc(serv->max_connection, sizeof(swConnection));
}
//初始化失败
if (serv->connection_list == NULL)
{
swError("calloc[1] failed");
return SW_ERR;
}
//创建factory对象,factory对象在swoole内部用来做任务调度等事情
if (serv->factory_mode == SW_MODE_THREAD)//如果是SW_MODE_THREAD模式,即为多线程模式,也就是worker是以线程的模式启动的
{
//worker_num控制创建worker线程的个数,如果异常则报错。
if (serv->worker_num < 1)
{
swError("Fatal Error: serv->worker_num < 1");
return SW_ERR;
}
//创建多线程的factory对象,后续展开讨论
ret = swFactoryThread_create(&(serv->factory), serv->worker_num);
}
else if (serv->factory_mode == SW_MODE_PROCESS)//如果是SW_MODE_PROCESS模式,即为多进程模式,也就是worker是以多进程的模式启动的
{
//worker_num控制创建worker进程的个数,如果异常则报错。
if (serv->worker_num < 1)
{
swError("Fatal Error: serv->worker_num < 1");
return SW_ERR;
}
//创建多进程的factory对象,后续展开讨论
ret = swFactoryProcess_create(&(serv->factory), serv->worker_num);
}
else//其他模式,即为SW_MODE_BASE,即为基本模式,这种模式下worker和reactor为同一角色,PHP的回调是直接在reactor线程内完成的
{
//创建factory对象,后续展开讨论
ret = swFactory_create(&(serv->factory));
}
//如果factory对象创建失败,则退出程序。
if (ret < 0)
{
swError("create factory failed");
return SW_ERR;
}
return SW_OK;
}
下面分析多线程模式的factory对象创建的流程,这个逻辑在E:\swoole-src-master\src\factory\FactoryThread.c文件中,其中E:\swoole-src-master\为swoole的根目录。
//创建多线程FactoryThread对象
int swFactoryThread_create(swFactory *factory, int worker_num)
{
swFactoryThread *object;//多线程对象模型,这里swFactory是整个调度对象,而swFactoryThread则是对worker线程池的抽象。
swServer *serv = factory->ptr;//获取初始化过的server对象
//申请内存空间,总共申请个worker_num个swFactoryThread对象
object = sw_calloc(worker_num, sizeof(swFactoryThread));
if (object == NULL)//申请失败
{
swWarn("malloc[0] failed");
return SW_ERR;
}
//因worker是通过线程去管理的,这里创建线程池去管理worker,后面展开再讨论
if (swThreadPool_create(&object->workers, worker_num) < 0)
{
sw_free(object);
return SW_ERR;
}
//reactor线程属性的初始化,这里主要是完成reactor线程的lock的初始化。
int i;
swReactorThread *thread;
for (i = 0; i < serv->reactor_num; i++)//遍历所有的reactor线程
{
thread = swServer_get_thread(serv, i);//获取reactor线程,这里获取也是比较简单的,就是从前面步骤缓存的数组中直接获取,这里展开代码为:swServer_get_thread(serv, reactor_id) (&(serv->reactor_threads[reactor_id]))
//创建reactor线程的锁,后续展开再分析
swMutex_create(&thread->lock, 0);
}
//设置worker线程池的worker线程数目
object->worker_num = worker_num;
factory->object = object;//设置factory的object属性,设置值为worker线程池对象
factory->dispatch = swFactoryThread_dispatch;//设置factory对象的dispatch回调函数
factory->finish = swFactoryThread_finish;//设置factory对象的finish回调函数
factory->end = swFactory_end;//设置factory对象的end回调函数
factory->start = swFactoryThread_start;//设置factory对象的start回调函数
factory->shutdown = swFactoryThread_shutdown;//设置factory对象的shutdown回调函数
factory->notify = swFactory_notify;//设置facotry对象的notify回调函数
object->workers.onStart = swFactoryThread_onStart;//设置worker线程池的onStart回调函数
object->workers.onStop = swFactoryThread_onStop;//设置worker线程池的onStop回调函数
object->workers.onTask = swFactoryThread_onTask;//设置worker线程池的onTask回调函数
object->workers.ptr1 = factory->ptr;//线程池对象的ptr1属性设置为factory的ptr属性,其实factory的ptr属性指向全局的server对象。
object->workers.ptr2 = factory;//线程池对象的ptr2属性设置为factory对象
return SW_OK;
}
这里分析worker线程池的初始化过程,对应的函数入口为swThreadPool_create,这个函数实现在E:\swoole-src-master\src\network\ThreadPool.c里面。
//worker线程池对象初始化,这里传入线程池对象指针和线程数目
int swThreadPool_create(swThreadPool *pool, int thread_num)
{
//线程池对象空间初始化
bzero(pool, sizeof(swThreadPool));
//线程池内线程对象初始化,sw_calloc是通过calloc实现,即申请空间同时初始化为0
pool->threads = (swThread *) sw_calloc(thread_num, sizeof(swThread));
//线程池内线程对应参数对象初始化,sw_calloc是通过calloc实现,即申请空间同时初始化为0
pool->params = (swThreadParam *) sw_calloc(thread_num, sizeof(swThreadParam));
//空间申请失败
if (pool->threads == NULL || pool->params == NULL)
{
swWarn("swThreadPool_create malloc fail");
return SW_ERR;
}
//记录日志,打印指针值信息
swTrace("threads=%p|params=%p", pool->threads, pool->params);
//这个SW_THREADPOOL_USE_CHANNEL在全局代码没找到,不知道什么意思,而swChannel_create也没找到,初步估计是写错了,找到一个swChannel_new的方法,输入参数也是一直的,就当是这里的实现了,这里下面的代码整体实现了线程池的消息队列,队列的实现分为基于链表和基于数组空间的,这里分不同的情况去实现。
#ifdef SW_THREADPOOL_USE_CHANNEL //创建基于链表的队列
//创建链表的队列,这里又分基于共享内存和堆内存的情况
pool->chan = swChannel_new(1024 * 256, 512, 0);
if (pool->chan == NULL)//创建失败
{
swWarn("swThreadPool_create create channel failed");
return SW_ERR;
}
#else //创建基于数组的队列,这里使用的是基于环的队列
int size = SwooleG.max_sockets >= SW_THREADPOOL_QUEUE_LEN ? SwooleG.max_sockets + 1 : SW_THREADPOOL_QUEUE_LEN;//获取队列的size信息,即总的size是基于SW_THREADPOOL_QUEUE_LEN和最大的连接数信息,其中SW_THREADPOOL_QUEUE_LEN的值为10000
if (swRingQueue_init(&pool->queue, size) < 0)//创建失败
{
return SW_ERR;
}
#endif
//创建线程池的条件变量,用于后续的线程同步,代码比较简单,大家自行分析
if (swCond_create(&pool->cond) < 0)//创建失败
{
return SW_ERR;
}
//线程池的线程数目初始化
pool->thread_num = thread_num;
return SW_OK;
}
下面我们接着上面分析worker线程锁对象的初始化逻辑,也就是swMutex_create函数,这个函数实现也在E:\swoole-src-master\src\factory\FactoryThread.c里面。
//初始化worker线程的锁对象,其中lock为worker线程中的锁对象,use_in_process表示是否是以进程模式启动,这里因是以线程模式启动,所以use_in_process传入为0
int swMutex_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));//swLock对象空间初始化为0
lock->type = SW_MUTEX;//设置锁的类型为互斥锁
pthread_mutexattr_init(&lock->object.mutex.attr);//linux API,互斥锁属性初始化
if (use_in_process == 1)//如果是以进程模式启动的
{
//设置互斥锁属性,这里用PTHREAD_PROCESS_SHARED表示可以将互斥锁用到多进程的线程同步中,默认互斥锁只能在单进程内可见。
pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
}
//linux api,初始化互斥锁,真正的互斥锁为lock->object.mutex._lock,这里需要传入前面初始化过的互斥锁属性信息
if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
{
return SW_ERR;
}
/* 设置lock对象的回调函数实现,这里回调函数具体都是对互斥锁的操作,不做详细分析,大家可自行查阅linux API。
*
*/
lock->lock = swMutex_lock;//设置锁对象的lock回调函数
lock->unlock = swMutex_unlock;//设置锁对象的unlock回调函数
lock->trylock = swMutex_trylock;//设置锁对象的trylock回调函数
lock->free = swMutex_free;//设置锁对象的free回调函数
return SW_OK;
}
//互斥锁加锁操作,在加锁失败时,会阻塞
static int swMutex_lock(swLock *lock)
{
return pthread_mutex_lock(&lock->object.mutex._lock);
}
//互斥锁解锁操作
static int swMutex_unlock(swLock *lock)
{
return pthread_mutex_unlock(&lock->object.mutex._lock);
}
//互斥锁加锁操作,在加锁失败时,不会阻塞
static int swMutex_trylock(swLock *lock)
{
return pthread_mutex_trylock(&lock->object.mutex._lock);
}
//互斥锁加锁操作,指定时间内未能加锁成功,则返回失败,有相应的错误码
#ifdef HAVE_MUTEX_TIMEDLOCK
int swMutex_lockwait(swLock *lock, int timeout_msec)
{
struct timespec timeo;
timeo.tv_sec = timeout_msec / 1000;
timeo.tv_nsec = (timeout_msec - timeo.tv_sec * 1000) * 1000 * 1000;
return pthread_mutex_timedlock(&lock->object.mutex._lock, &timeo);
}
#else
int swMutex_lockwait(swLock *lock, int timeout_msec)
{
int sub = 1;
int sleep_ms = 1000;
if (timeout_msec > 100)
{
sub = 10;
sleep_ms = 10000;
}
while( timeout_msec > 0)
{
if (pthread_mutex_trylock(&lock->object.mutex._lock) == 0)
{
return 0;
}
else
{
usleep(sleep_ms);
timeout_msec -= sub;
}
}
return ETIMEDOUT;
}
#endif
//释放互斥锁,需要同时释放互斥锁属性和互斥锁对象本身
static int swMutex_free(swLock *lock)
{
pthread_mutexattr_destroy(&lock->object.mutex.attr);
return pthread_mutex_destroy(&lock->object.mutex._lock);
}