Redis源码阅读——基于epoll的事件模型

Redis的事件模型实现基于linux的epoll,sun的export,FreeBSD和Mac osx的queue,还有select;我们简单分析下Redis基于epoll实现的事件模型。main函数调用initServer实现服务初始化:

void initServer(void) {
    int j;
	
	//SIG_DFL:默认信号处理程序
	//SIG_IGN:忽略信号的处理程序
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
	
	//设置信号处理函数
    setupSignalHandlers();

    if (server.syslog_enabled) {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
            server.syslog_facility);
    }

    server.pid = getpid();
    server.current_client = NULL;
	//客户端链表
    server.clients = listCreate();
	//异步关闭的客户端
    server.clients_to_close = listCreate();
	//从机
    server.slaves = listCreate();
	
    server.monitors = listCreate();
	
    server.clients_pending_write = listCreate();
	
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
	
    server.unblocked_clients = listCreate();
	
    server.ready_keys = listCreate();
	
    server.clients_waiting_acks = listCreate();
	
    server.get_ack_from_slaves = 0;
	
    server.clients_paused = 0;
	//
    server.system_memory_size = zmalloc_get_memory_size();

    createSharedObjects();
	//
    adjustOpenFilesLimit();

	//构建aeEventLoop对象
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
	//
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
		//启动TCP监听服务 --> 并设置为非阻塞
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);

    /* Open the listening Unix domain socket. */

	//启动本地监听服务
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
    }

    /* Abort if there are no listening sockets at all. */

	//如果没有监听套接字,则退出 --->初始化默认为0 -- > 在listenToPort每成功建立一个监听套接字,自加1
    if (server.ipfd_count == 0 && server.sofd < 0) {
        serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

    /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }
    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
    server.pubsub_patterns = listCreate();
    listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
    listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
    server.cronloops = 0;
    server.rdb_child_pid = -1;
    server.aof_child_pid = -1;
    server.rdb_child_type = RDB_CHILD_TYPE_NONE;
    server.rdb_bgsave_scheduled = 0;
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.dirty = 0;
    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.resident_set_size = 0;
    server.lastbgsave_status = C_OK;
    server.aof_last_write_status = C_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
    updateCachedTime();

    /* Create the serverCron() time event, that's our main way to process
     * background operations. */
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */

	//建立epoll事件监听  --> 一个tcp套接字一个监听文件
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
	//建立epoll事件监听  --> 一个本地套接字一个监听文件
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

    /* Open the AOF file if needed. */
    if (server.aof_state == AOF_ON) {
        server.aof_fd = open(server.aof_filename,
                               O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            serverLog(LL_WARNING, "Can't open the append-only file: %s",
                strerror(errno));
            exit(1);
        }
    }

    /* 32 bit instances are limited to 4GB of address space, so if there is
     * no explicit limit in the user provided configuration we set a limit
     * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
     * useless crashes of the Redis instance for out of memory. */
    if (server.arch_bits == 32 && server.maxmemory == 0) {
        serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
        server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
        server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
    }

    if (server.cluster_enabled) clusterInit();
    replicationScriptCacheInit();
    scriptingInit(1);
    slowlogInit();
    latencyMonitorInit();
    bioInit();
}

我们主要关注三个地方

1 构建aeEventLoop对象

	//构建aeEventLoop对象
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

2 建立套接字监听,分为TCP监听和本地套接字监听

    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
		//启动TCP监听服务 --> 并设置为非阻塞
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);

    /* Open the listening Unix domain socket. */

	//启动本地监听服务
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
    }

3 将建立监听的套接字加入EventLoop事件监听

	//建立epoll事件监听  --> 一个tcp套接字一个监听文件
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
	//建立epoll事件监听  --> 一个本地套接字一个监听文件
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 

在分析如何构建aeEventLoop对象之前,我们先看下aeEventLoop的定义:

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   				//监听的最大文件号  
    int setsize; 				//可以注册的事件的上限,默认为1024*10
    long long timeEventNextId;	//定时器事件的ID编号管理(分配ID号所用)  
    time_t lastTime;     		/* Used to detect system clock skew */
    aeFileEvent *events; 		//注册的文件事件,这些是需要进程关注的文件  
    aeFiredEvent *fired; 		//poll结果,待处理的文件事件的文件号和事件类型  
    aeTimeEvent *timeEventHead;	//定时器时间链表 
    int stop;		 			//时间轮询是否结束?  
    void *apidata; 				//文件事件的轮询数据和结果数据:poll; 三种轮询方式:epoll(linux),select(windows),kqueue  
    aeBeforeSleepProc *beforesleep;
} aeEventLoop; 

接着分析aeCreateEventLoop函数:

//底层epoll多路复用初始化,然后存放在aeEventLoop中 void * 类型的apidata,隐藏了底层的实现。

/*****************************************************************************
 * 函 数 名  : aeCreateEventLoop
 * 函数功能  : 构建aeEventLoop对象eventLoop  
 * 输入参数  : int setsize  消息队列大小
 * 输出参数  : 无
 * 返 回 值  : aeEventLoop
 * 调用关系  : 
 * 记    录
 * 1.日    期: 2017年10月21日
 *   作    者: zyz
 *   修改内容: 新生成函数
*****************************************************************************/
aeEventLoop *aeCreateEventLoop(int setsize) {
	//setsize指的是放到eventloop中最大描述符大小,也就是该事件循环中可能有多少个fd。
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
	//eventLoop->events和event_loop->fired都是一个set size大小的
	//数组,表明了event loop是用数组下标来当fd。
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
	//eventLoop->timeEventHead保存第一个时间事件,值得注意的是时间事件
	//链表不是有序列表,因此会后面的检索会是O(n)
	
    eventLoop->stop = 0;			 	//指示事件循环是否继续进行。
    eventLoop->maxfd = -1;			 	//事件链表中目前最大的fd,用于减小检索范围
    eventLoop->beforesleep = NULL;		//每轮循环前的回调函数
    // aeApiCreate封装了不同多路复用的实现,如linux的epoll,sun的export,FreeBSD和Mac osx的queue,还有select。
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;// AE_NONE来指示该fd没有放入事件循环库
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
} 

很简单,分配一个aeEventLoop 对象,然后一折对象为参数,调用底层实现函数aeApiCreate,我们看下epoll类型的实现函数:

/*****************************************************************************
 * 函 数 名  : aeApiCreate
 * 函数功能  : 封装epoll事件驱动构建函数
 * 输入参数  : aeEventLoop *eventLoop  事件循环描述结构对象
 * 输出参数  : 无
 * 返 回 值  : static
 * 调用关系  : 
 * 记    录
 * 1.日    期: 2017年10月23日
 *   作    者: zyz
 *   修改内容: 新生成函数
*****************************************************************************/
static int aeApiCreate(aeEventLoop *eventLoop) {
	//包含一个epoll专用文件描述符和一个epoll_event对象指针
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
	//创建setsize个epoll_event
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
	//该函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
} 

非常简单粗暴,构建一个aeApiState对象,这个对象包含一个文件描述符和一个epoll_event 对象指针。

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

然后调用epoll_create建立事件模型,将得到文件描述符赋值给aeApiState对象的epfd;分配指定size的poll_event对象内存,将地址赋值给aeApiState对象的events;然后aeApiState对象地址赋值给aeEventLoop对象的apidata;

第二件事是建立套接字监听,不是这里关注的重点,我们只需知道得到一个存放套接字描述符的数组。这些套接字就是被监听的对象。

第三件事是将这些套接字封装后加入epoll监听服务。我们看下函数aeCreateFileEvent:

/*****************************************************************************
 * 函 数 名  : aeCreateFileEvent
 * 函数功能  : 建立事件监听对象
 * 输入参数  : aeEventLoop *eventLoop  事件监听服务描述符对象指针
               int fd                  被监听文件描述符
               int mask                监听事件掩码
               aeFileProc *proc        事件响应函数
               void *clientData        参数
 * 输出参数  : 无
 * 返 回 值  : 
 * 调用关系  : 
 * 记    录
 * 1.日    期: 2018年03月06日
 *   作    者: zyz
 *   修改内容: 新生成函数
*****************************************************************************/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
	//调用底层实现,将被监听文件描述符加入监听服务
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

在继续分析之前我们先看下一个结构体:

/* File event structure */
/* 文件事件结构体 */  
typedef struct aeFileEvent {
	//只为读事件或者写事件中的1种	
    int mask; /* one of AE_(READABLE|WRITABLE) */
	//读方法  
    aeFileProc *rfileProc;
	//写方法 
    aeFileProc *wfileProc;
	 //客户端数据  
    void *clientData;
} aeFileEvent;

在调用aeCreateEventLoop函数时,分配了指定大小的内存,用于在抽象层代表可以监听指定数量的文件描述符。在函数中就根据指定文件描述符取出指定的aeFileEvent 对象,然后以该对象为参数,调用底层实现函数aeApiAddEvent

//EPOLL_CTL_ADD:	注册新的fd到epfd中;
//EPOLL_CTL_MOD:	修改已经注册的fd的监听事件;
//EPOLL_CTL_DEL:	从epfd中删除一个fd;

/*****************************************************************************
 * 函 数 名  : aeApiAddEvent
 * 函数功能  : epoll事件注册函数封装
 * 输入参数  : aeEventLoop *eventLoop  事件驱动模型结构对象
               int fd                  被监听事件的文件描述符
               int mask                事件掩码
 * 输出参数  : 无
 * 返 回 值  : static
 * 调用关系  : 
 * 记    录
 * 1.日    期: 2017年10月23日
 *   作    者: zyz
 *   修改内容: 新生成函数
*****************************************************************************/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
	
	//events可以是以下几个宏的集合:
	//EPOLLIN:         触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
	//EPOLLOUT:        触发该事件,表示对应的文件描述符上可以写数据;
	//EPOLLPRI:        表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
	//EPOLLERR:        表示对应的文件描述符发生错误;
	//EPOLLHUP:        表示对应的文件描述符被挂断;
	//EPOLLET:         将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
	//EPOLLONESHOT:    只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

基于epoll的实现方式,实际调用的是epoll_ctl,将被监听文件描述符加入监听服务。在这里事件掩码为AE_READABLE,对应到epoll为EPOLLIN,即只对输入事件感兴趣。

到这里,事件监听服务已经构建完成,下面就是事件处理的部分了

事件处理部分在函数中实现,在main函数中调用

/*****************************************************************************
 * 函 数 名  : aeMain
 * 函数功能  : 事件循环处理
 * 输入参数  : aeEventLoop *eventLoop  事件监听服务描述符对象指针
 * 输出参数  : 无
 * 返 回 值  : 
 * 调用关系  : 
 * 记    录
 * 1.日    期: 2018年03月06日
 *   作    者:  
 *   修改内容: 新生成函数
*****************************************************************************/
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {						//以死循环的方式进入轮询,直到stop被置为非0
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);		//前期处理,回调函数
		//处理消息
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);	//事件处理(见下面)  
    }
}


猜你喜欢

转载自blog.csdn.net/idwtwt/article/details/79460217