一、IO多路复用概念
IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞,挂起线程,直到阻塞超时。
二、libevent的IO多路复用应用
libevent支持多种IO多路复用机制,保障了跨平台和性能。支持的IO多路复用包括:
机制 | 平台 |
---|---|
select | unix、linux |
win32select | win |
poll | unix、linux |
epoll | unix、linux |
kqueue | Mac OS |
dev/poll | Solaris |
evport | Solaris |
注意:mac os用的unix内核,安卓用的linux内核。libevent还支持win系统的iocp,不过iocp不是IO多路复用,而是异步IO。
三、libevent的IO多路复用接口抽象
libevent使用eventop结构体来抽象IO多路复用相关的接口,可以在不同平台下,选择合适的IO多路复用实现。event_base结构体里有个属性evsel指向eventop,evbase指向特定模式下的数据结构。evnetop结构体的属性和含义如下表:
属性 | 含义 |
---|---|
const char *name | 多路复用名称 |
void *(*init)(struct event_base *) | 完成初始化,返回改模式下专用的结构体 |
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo) | 添加fd上的事件监听 |
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo) | 删除fd上的事件监听 |
int (*dispatch)(struct event_base *, struct timeval *) | 检测哪些事件已激活,并执行event的激活回调函数 |
void (*dealloc)(struct event_base *) | 当fork了进程时,调用dealloc来重初始化一些属性 |
int need_reinit | 是否要重新初始化event_base |
enum event_method_feature features | 当前机制支持的feature |
size_t fdinfo_len | 额外数据结构的长度 |
具体的调用关系图,如下:
四、libevent的select封装
select相关的数据放在selectop结构体里,具体的定义如下:
属性 | 含义 |
---|---|
int event_fds | 最大的fd编号 |
int event_fdsz | 集合数组的最大容量 |
int resize_out_sets | 标记是否需要重算集合大小 |
fd_set *event_readset_in | 监听read的fd集合 |
fd_set *event_writeset_in | 监听write的fd结合 |
fd_set *event_readset_out | 调用select前,会把event_readset_in的数据拷贝过来,避免select修改集合数据 |
fd_set *event_writeset_out | 调用select前,会把event_writeset_in的数据拷贝过来,避免select修改集合数据 |
五、libevent封装的select源码
static void * select_init(struct event_base *base)
{
//初始化selectop,并返回该对象
}
static int select_dispatch(struct event_base *base, struct timeval *tv)
{
int res=0, i, j, nfds;
struct selectop *sop = base->evbase;
check_selectop(sop);
if (sop->resize_out_sets) {
fd_set *readset_out=NULL, *writeset_out=NULL;
size_t sz = sop->event_fdsz;
if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
return (-1);
sop->event_readset_out = readset_out;
if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
/* We don't free readset_out here, since it was
* already successfully reallocated. The next time
* we call select_dispatch, the realloc will be a
* no-op. */
return (-1);
}
sop->event_writeset_out = writeset_out;
sop->resize_out_sets = 0;
}
memcpy(sop->event_readset_out, sop->event_readset_in,
sop->event_fdsz);
memcpy(sop->event_writeset_out, sop->event_writeset_in,
sop->event_fdsz);
nfds = sop->event_fds+1;
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = select(nfds, sop->event_readset_out,
sop->event_writeset_out, NULL, tv);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
check_selectop(sop);
if (res == -1) {
if (errno != EINTR) {
event_warn("select");
return (-1);
}
return (0);
}
event_debug(("%s: select reports %d", __func__, res));
check_selectop(sop);
i = random() % nfds;
for (j = 0; j < nfds; ++j) {
if (++i >= nfds)
i = 0;
res = 0;
if (FD_ISSET(i, sop->event_readset_out))
res |= EV_READ;
if (FD_ISSET(i, sop->event_writeset_out))
res |= EV_WRITE;
if (res == 0)
continue;
evmap_io_active(base, i, res);
}
check_selectop(sop);
return (0);
}
static int select_resize(struct selectop *sop, int fdsz)
{
//重新计算readset_in,writeset_in的大小,并标记resize_out_sets为1
}
static int select_add(struct event_base *base, int fd, short old, short events, void *p)
{
struct selectop *sop = base->evbase;
(void) p;
EVUTIL_ASSERT((events & EV_SIGNAL) == 0);
check_selectop(sop);
/*
* Keep track of the highest fd, so that we can calculate the size
* of the fd_sets for select(2)
*/
if (sop->event_fds < fd) {
int fdsz = sop->event_fdsz;
if (fdsz < (int)sizeof(fd_mask))
fdsz = (int)sizeof(fd_mask);
/* In theory we should worry about overflow here. In
* reality, though, the highest fd on a unixy system will
* not overflow here. XXXX */
while (fdsz < (int) SELECT_ALLOC_SIZE(fd + 1))
fdsz *= 2;
if (fdsz != sop->event_fdsz) {
if (select_resize(sop, fdsz)) {
check_selectop(sop);
return (-1);
}
}
sop->event_fds = fd;
}
if (events & EV_READ)
FD_SET(fd, sop->event_readset_in);
if (events & EV_WRITE)
FD_SET(fd, sop->event_writeset_in);
check_selectop(sop);
return (0);
}
static int select_del(struct event_base *base, int fd, short old, short events, void *p)
{
struct selectop *sop = base->evbase;
(void)p;
EVUTIL_ASSERT((events & EV_SIGNAL) == 0);
check_selectop(sop);
if (sop->event_fds < fd) {
check_selectop(sop);
return (0);
}
if (events & EV_READ)
FD_CLR(fd, sop->event_readset_in);
if (events & EV_WRITE)
FD_CLR(fd, sop->event_writeset_in);
check_selectop(sop);
return (0);
}
static void select_free_selectop(struct selectop *sop)
{
//清理sop,代码删掉了
}
static void select_dealloc(struct event_base *base)
{
//这里代码删了
}
六、libevent的epoll封装
封装epoll的数据存放在epollop里,包含的属性如下:
属性 | 含义 |
---|---|
struct epoll_event *events | 事件列表 |
int nevents | 事件数量 |
int epfd | epoll的文件句柄 |
七、libevent的epoll封装源码
static void * epoll_init(struct event_base *base)
{
//初始化epollop,返回epollop对象
}
static const char * change_to_string(int change)
{
change &= (EV_CHANGE_ADD|EV_CHANGE_DEL);
if (change == EV_CHANGE_ADD) {
return "add";
} else if (change == EV_CHANGE_DEL) {
return "del";
} else if (change == 0) {
return "none";
} else {
return "???";
}
}
static const char * epoll_op_to_string(int op)
{
return op == EPOLL_CTL_ADD?"ADD":
op == EPOLL_CTL_DEL?"DEL":
op == EPOLL_CTL_MOD?"MOD":
"???";
}
static int epoll_apply_one_change(struct event_base *base,
struct epollop *epollop,
const struct event_change *ch)
{
struct epoll_event epev;
int op, events = 0;
if (1) {
if ((ch->read_change & EV_CHANGE_ADD) ||
(ch->write_change & EV_CHANGE_ADD)) {
/* If we are adding anything at all, we'll want to do
* either an ADD or a MOD. */
events = 0;
op = EPOLL_CTL_ADD;
if (ch->read_change & EV_CHANGE_ADD) {
events |= EPOLLIN;
} else if (ch->read_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_READ) {
events |= EPOLLIN;
}
if (ch->write_change & EV_CHANGE_ADD) {
events |= EPOLLOUT;
} else if (ch->write_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_WRITE) {
events |= EPOLLOUT;
}
if ((ch->read_change|ch->write_change) & EV_ET)
events |= EPOLLET;
if (ch->old_events) {
/* If MOD fails, we retry as an ADD, and if
* ADD fails we will retry as a MOD. So the
* only hard part here is to guess which one
* will work. As a heuristic, we'll try
* MOD first if we think there were old
* events and ADD if we think there were none.
*
* We can be wrong about the MOD if the file
* has in fact been closed and re-opened.
*
* We can be wrong about the ADD if the
* the fd has been re-created with a dup()
* of the same file that it was before.
*/
op = EPOLL_CTL_MOD;
}
} else if ((ch->read_change & EV_CHANGE_DEL) ||
(ch->write_change & EV_CHANGE_DEL)) {
/* If we're deleting anything, we'll want to do a MOD
* or a DEL. */
op = EPOLL_CTL_DEL;
if (ch->read_change & EV_CHANGE_DEL) {
if (ch->write_change & EV_CHANGE_DEL) {
events = EPOLLIN|EPOLLOUT;
} else if (ch->old_events & EV_WRITE) {
events = EPOLLOUT;
op = EPOLL_CTL_MOD;
} else {
events = EPOLLIN;
}
} else if (ch->write_change & EV_CHANGE_DEL) {
if (ch->old_events & EV_READ) {
events = EPOLLIN;
op = EPOLL_CTL_MOD;
} else {
events = EPOLLOUT;
}
}
}
if (!events)
return 0;
memset(&epev, 0, sizeof(epev));
epev.data.fd = ch->fd;
epev.events = events;
if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) {
if (op == EPOLL_CTL_MOD && errno == ENOENT) {
/* If a MOD operation fails with ENOENT, the
* fd was probably closed and re-opened. We
* should retry the operation as an ADD.
*/
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
(int)epev.events,
ch->fd));
}
} else if (op == EPOLL_CTL_ADD && errno == EEXIST) {
/* If an ADD operation fails with EEXIST,
* either the operation was redundant (as with a
* precautionary add), or we ran into a fun
* kernel bug where using dup*() to duplicate the
* same file into the same fd gives you the same epitem
* rather than a fresh one. For the second case,
* we must retry with MOD. */
if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
(int)epev.events,
ch->fd));
}
} else if (op == EPOLL_CTL_DEL &&
(errno == ENOENT || errno == EBADF ||
errno == EPERM)) {
/* If a delete fails with one of these errors,
* that's fine too: we closed the fd before we
* got around to calling epoll_dispatch. */
event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
(int)epev.events,
ch->fd,
strerror(errno)));
} else {
event_warn("Epoll %s(%d) on fd %d failed. Old events were %d; read change was %d (%s); write change was %d (%s)",
epoll_op_to_string(op),
(int)epev.events,
ch->fd,
ch->old_events,
ch->read_change,
change_to_string(ch->read_change),
ch->write_change,
change_to_string(ch->write_change));
return -1;
}
} else {
event_debug(("Epoll %s(%d) on fd %d okay. [old events were %d; read change was %d; write change was %d]",
epoll_op_to_string(op),
(int)epev.events,
(int)ch->fd,
ch->old_events,
ch->read_change,
ch->write_change));
}
}
return 0;
}
static int epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd,
short old, short events, void *p)
{
struct event_change ch;
ch.fd = fd;
ch.old_events = old;
ch.read_change = ch.write_change = 0;
if (events & EV_WRITE)
ch.write_change = EV_CHANGE_ADD |
(events & EV_ET);
if (events & EV_READ)
ch.read_change = EV_CHANGE_ADD |
(events & EV_ET);
return epoll_apply_one_change(base, base->evbase, &ch);
}
static int epoll_nochangelist_del(struct event_base *base, evutil_socket_t fd,
short old, short events, void *p)
{
struct event_change ch;
ch.fd = fd;
ch.old_events = old;
ch.read_change = ch.write_change = 0;
if (events & EV_WRITE)
ch.write_change = EV_CHANGE_DEL;
if (events & EV_READ)
ch.read_change = EV_CHANGE_DEL;
return epoll_apply_one_change(base, base->evbase, &ch);
}
static int epoll_dispatch(struct event_base *base, struct timeval *tv)
{
struct epollop *epollop = base->evbase;
struct epoll_event *events = epollop->events;
int i, res;
long timeout = -1;
if (tv != NULL) {
timeout = evutil_tv_to_msec(tv);
if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) {
/* Linux kernels can wait forever if the timeout is
* too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
}
epoll_apply_changes(base);
//event_changelist_remove_all(&base->changelist, base);
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
return (0);
}
event_debug(("%s: epoll_wait reports %d", __func__, res));
EVUTIL_ASSERT(res <= epollop->nevents);
for (i = 0; i < res; i++) {
int what = events[i].events;
short ev = 0;
if (what & (EPOLLHUP|EPOLLERR)) {
ev = EV_READ | EV_WRITE;
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
}
if (!ev)
continue;
evmap_io_active(base, events[i].data.fd, ev | EV_ET);
}
if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) {
/* We used all of the event space this time. We should
be ready for more events next time. */
int new_nevents = epollop->nevents * 2;
struct epoll_event *new_events;
new_events = mm_realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
}
return (0);
}
static void epoll_dealloc(struct event_base *base)
{
//删掉了代码
}