版权声明:转载请注明来源 https://blog.csdn.net/u013702678/article/details/81263829
接着上面一篇文章,我们分析多进程模式下,reactor的创建,reactor完成multi IO通信控制,而在多进程模式下,只有一个reactor线程。
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));//空间初始化
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);//创建epoll类型的reactor
#elif defined(HAVE_KQUEUE)
ret = swReactorKqueue_create(reactor, max_event);//创建kqueue类型的reactor
#elif defined(HAVE_POLL)
ret = swReactorPoll_create(reactor, max_event);//创建poll类型的reactor
#else
ret = swReactorSelect_create(reactor);//默认创建select类型的reactor
#endif
reactor->running = 1;//标记reactor为运行中
reactor->setHandle = swReactor_setHandle;//设置reactor的setHandle回调函数
reactor->onFinish = swReactor_onFinish;//设置reactor的onFinish回调函数
reactor->onTimeout = swReactor_onTimeout;//设置reactor的onTimeout回调函数
reactor->write = swReactor_write;//设置reactor的write回调函数
reactor->defer = swReactor_defer;//设置reactor的defer回调函数
reactor->close = swReactor_close;//设置reactor的close回调函数
//初始化reactor的socket_array属性,这个属性具体的意思,后续再分析
reactor->socket_array = swArray_new(1024, sizeof(swConnection));
if (!reactor->socket_array)
{
swWarn("create socket array failed.");
return SW_ERR;
}
return ret;
}
下面我们挑重点的几个回调函数做分析。
//设置reactor上特定事件的处理函数,其中_fdtype将事件类型和描述符类型做了与操作的结果,handle为相应的处理逻辑,为函数指针,这个函数按描述符类型聚合处理逻辑
int swReactor_setHandle(swReactor *reactor, int _fdtype, swReactor_handle handle)
{
int fdtype = swReactor_fdtype(_fdtype);//剔除读写错误这三类事件后的值,也就是描述符类型
if (fdtype >= SW_MAX_FDTYPE)//swoole定义的描述符类型最大值为32,这里不包括读写错误这三个事件类型
{
swWarn("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
return SW_ERR;
}
if (swReactor_event_read(_fdtype))//描述符可读
{
reactor->handle[fdtype] = handle;//设置可读时的回调函数
}
else if (swReactor_event_write(_fdtype))//描述符可写
{
reactor->write_handle[fdtype] = handle;//设置可写时的回调函数
}
else if (swReactor_event_error(_fdtype))//描述符发生错误
{
reactor->error_handle[fdtype] = handle;//设置发生错误时的回调函数
}
else
{
swWarn("unknow fdtype");
return SW_ERR;
}
return SW_OK;
}
//ractor事件处理器中,fd描述符,写入buff数据,长度为n
int swReactor_write(swReactor *reactor, int fd, void *buf, int n)
{
int ret;
swConnection *socket = swReactor_get(reactor, fd);//获取连接信息,这里后面展开讨论下。
swBuffer *buffer = socket->out_buffer;//获取buffer
//设置socket文件描述符的信息
if (socket->fd == 0)
{
socket->fd = fd;
}
//设置socket缓冲区大小
if (socket->buffer_size == 0)
{
socket->buffer_size = SwooleG.socket_buffer_size;
}
//设置socket为非阻塞
if (socket->nonblock == 0)
{
swoole_fcntl_set_option(fd, 1, -1);
socket->nonblock = 1;
}
if (n > socket->buffer_size)//如果发送的数据量大小超过socket的buffer_size,则报错,这里看了swoole的文档,提示单次连接发送的数据量不能超过socket_buffer_size值,这个值可以由swoole_server来set。
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "data is too large, cannot exceed buffer size.");
return SW_ERR;
}
if (swBuffer_empty(buffer))//buffer容量为空
{
if (socket->ssl_send) //判断当前连接是否设置了ssl属性,如果设置了,则跳转到do_buffer:处扩容buffer
{
goto do_buffer;
}
do_send:
ret = swConnection_send(socket, buf, n, 0);//发送数据,调用linux的send函数
if (ret > 0)//未出现错误,即有数据发送
{
if (n == ret) //发送长度和实际要发送的一致,即所有数据都已经发送完
{
return ret;
}
else //实际发送的小于n
{
buf += ret;//buf调整,扣减掉已经发送的量
n -= ret;//n调整,减去已经发送的量
goto do_buffer;//go跳转,做buffer调整
}
}
#ifdef HAVE_KQUEUE
else if (errno == EAGAIN || errno == ENOBUFS)
#else
else if (errno == EAGAIN)
#endif
{
do_buffer:
if (!socket->out_buffer) //out_buffer为空
{
buffer = swBuffer_new(sizeof(swEventData));//申请内存空间
if (!buffer)//申请失败
{
swWarn("create worker buffer failed.");
return SW_ERR;
}
socket->out_buffer = buffer;//设置buffer属性
}
socket->events |= SW_EVENT_WRITE;//设置socket监听的写事件类型
if (socket->events & SW_EVENT_READ)//socket已经注册了读事件
{
//重新设置fd的事件类型,即增加写事件
if (reactor->set(reactor, fd, socket->fdtype | socket->events) < 0)
{
swSysError("reactor->set(%d, SW_EVENT_WRITE) failed.", fd);
}
}
else
{
//创建fd的写事件监听
if (reactor->add(reactor, fd, socket->fdtype | SW_EVENT_WRITE) < 0)
{
swSysError("reactor->add(%d, SW_EVENT_WRITE) failed.", fd);
}
}
goto append_buffer;
}
else if (errno == EINTR)//send调用被信号中断,则重启此次调用,重新发送
{
goto do_send;
}
else //其他错误,则中断流程
{
SwooleG.error = errno;
return SW_ERR;
}
}
else
{
append_buffer:
if (buffer->length > socket->buffer_size)//经过多次goto跳转后,这里的buffer不太懂是那个值了,后续再看看
{
if (socket->dontwait)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
return SW_ERR;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow.", fd);
swYield();
swSocket_wait(fd, SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
}
}
if (swBuffer_append(buffer, buf, n) < 0)//申请扩容,将buffer空间扩容为n大小,且将buf内容复制到新申请的空间内
{
return SW_ERR;
}
}
return SW_OK;
}
我们用select模型的reactor来看看如何调用这些回调函数的,关于select的使用,不再展开分析,代码在E:\swoole-src-master\src\reactor\ReactorSelect.c里面。
//read
if (SW_FD_ISSET(event.fd, &(object->rfds)) && !event.socket->removed)
{
handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("[Reactor#%d] select event[type=READ, fd=%d] handler fail.", reactor->id, event.fd);
}
}
//write
if (SW_FD_ISSET(event.fd, &(object->wfds)) && !event.socket->removed)
{
handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("[Reactor#%d] select event[type=WRITE, fd=%d] handler fail.", reactor->id, event.fd);
}
}
//error
if (SW_FD_ISSET(event.fd, &(object->efds)) && !event.socket->removed)
{
handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("[Reactor#%d] select event[type=ERROR, fd=%d] handler fail.", reactor->id, event.fd);
}
}