项目项目中应用到了redis,对其基于内存的nosql数据库比较感兴趣,打算读一下redis源码,写到哪里算哪里。第一篇是关于redis的网络模型:
redis网络模型采用epoll 异步事件处理机制,针对客户端的读写均是采用单线程的形式操作,于是不存在数据安全性的问题,即多个客户端同时修改某一个相同的key。为什么采用单线程模型,也许是因为redis针对数据的操作均是基于内存操作,没有涉及到磁盘I/O。当然数据持久化会涉及到磁盘I/O,数据持久化会在后面陆续读源码,记录redis的持久化过程和客户端数据读写同时发生时的处理机制。
1、server.el = aeCreateEventLoop(server.maxclients+1024);
此函数在initServer()中调用,用于创建epoll 事件结构体。
使用的数据结构是aeEventLoop ,结构如下:
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 初始化文件事件结构和已就绪文件事件结构
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
2、创建socket,bind,listen等
操作均在server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);中完成。
设置socket参数SO_REUSERDDR,允许立即重复利用,listen中客户端连接队列值设置为511
3、上面监听完socket套接字,就需要创建epoll以及注册epoll事件,以触发客户端连接操作,即accept,
有关创建epool 添加epool事件,删除epoll事件,删除epoll, 监听epoll事件的函数主要在src/ae_epoll.c 文件下:
int aeApiCreate(aeEventLoop *eventLoop)
创建epoll的函数
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
aeApiAddEvent 添加epoll监听事件,主要是epoll_ctl 函数添加或者修改fd的时间
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
aeApiDelEvent 删除epoll监听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
aeApiPoll 监听事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
4、第2步骤中已经创建了socket,并且处在监听的状态,此时要想接受客户端的请求,需要将此socket放入到监听事件中。
aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)
通过此函数创建此监听事件, 此函数里面调用aeApiAddEvent 添加监听事件,即添加读监听事件,等待客户端的连接。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
aeFileEvent *fe;
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
其中eventLoop->events中记录已经处于被监听状态的fd。rfileProc ,wfileProc 分别是写事件函数,读事件函数。
5、注册监听事件之后,就需要开始调用aeApiPoll()开始监听事件的发生
调用方法是main()->aeMain(server.el)->aeProcessEvents(eventloop)
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
int pthreadID = GetCurrentThreadId();
printf("aeMain threadID:%d", pthreadID);
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
**********
numevents = aeApiPoll(eventLoop, tvp);
**********
}
调用结果是返回事件数量,aeApiPoll会将监听到事件的fd以及其事件类型写入到eventLoop->fired[]中
然后从fired中获取fd以及事件类型,到eventLoop->events,获取事件处理函数,然后运行时间处理函数即可。
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
6、针对服务端socketde的rfileProc函数为
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
此函数中调用anetTcpAccept()函数accept接收客户端的连接,然后调用acceptCommonHandler函数,在此函数中创建客户端结构体,并且设置客户端的socket 读监听事件,以监听客户端发送过来的数据。
7、至此服务端epoll监听的时间有连个,一个是服务端socket的客户端连接事件,另外一个是已经连接的客户端的socket的读事件。
8、返回5继续监听事件