libevent源码分析之IO多路复用

一、IO多路复用概念

IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞,挂起线程,直到阻塞超时。

二、libevent的IO多路复用应用

libevent支持多种IO多路复用机制,保障了跨平台和性能。支持的IO多路复用包括:

机制 平台
select unix、linux
win32select win
poll unix、linux
epoll unix、linux
kqueue Mac OS
dev/poll Solaris
evport Solaris

注意:mac os用的unix内核,安卓用的linux内核。libevent还支持win系统的iocp,不过iocp不是IO多路复用,而是异步IO。

三、libevent的IO多路复用接口抽象

libevent使用eventop结构体来抽象IO多路复用相关的接口,可以在不同平台下,选择合适的IO多路复用实现。event_base结构体里有个属性evsel指向eventop,evbase指向特定模式下的数据结构。evnetop结构体的属性和含义如下表:

属性 含义
const char *name 多路复用名称
void *(*init)(struct event_base *) 完成初始化,返回改模式下专用的结构体
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo) 添加fd上的事件监听
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo) 删除fd上的事件监听
int (*dispatch)(struct event_base *, struct timeval *) 检测哪些事件已激活,并执行event的激活回调函数
void (*dealloc)(struct event_base *) 当fork了进程时,调用dealloc来重初始化一些属性
int need_reinit 是否要重新初始化event_base
enum event_method_feature features 当前机制支持的feature
size_t fdinfo_len 额外数据结构的长度

具体的调用关系图,如下:
在这里插入图片描述

四、libevent的select封装

select相关的数据放在selectop结构体里,具体的定义如下:

属性 含义
int event_fds 最大的fd编号
int event_fdsz 集合数组的最大容量
int resize_out_sets 标记是否需要重算集合大小
fd_set *event_readset_in 监听read的fd集合
fd_set *event_writeset_in 监听write的fd结合
fd_set *event_readset_out 调用select前,会把event_readset_in的数据拷贝过来,避免select修改集合数据
fd_set *event_writeset_out 调用select前,会把event_writeset_in的数据拷贝过来,避免select修改集合数据

五、libevent封装的select源码

static void * select_init(struct event_base *base)
{
    
    
	//初始化selectop,并返回该对象
}

static int select_dispatch(struct event_base *base, struct timeval *tv)
{
    
    
	int res=0, i, j, nfds;
	struct selectop *sop = base->evbase;

	check_selectop(sop);
	if (sop->resize_out_sets) {
    
    
		fd_set *readset_out=NULL, *writeset_out=NULL;
		size_t sz = sop->event_fdsz;
		if (!(readset_out = mm_realloc(sop->event_readset_out, sz)))
			return (-1);
		sop->event_readset_out = readset_out;
		if (!(writeset_out = mm_realloc(sop->event_writeset_out, sz))) {
    
    
			/* We don't free readset_out here, since it was
			 * already successfully reallocated. The next time
			 * we call select_dispatch, the realloc will be a
			 * no-op. */
			return (-1);
		}
		sop->event_writeset_out = writeset_out;
		sop->resize_out_sets = 0;
	}

	memcpy(sop->event_readset_out, sop->event_readset_in,
	       sop->event_fdsz);
	memcpy(sop->event_writeset_out, sop->event_writeset_in,
	       sop->event_fdsz);

	nfds = sop->event_fds+1;

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	res = select(nfds, sop->event_readset_out,
	    sop->event_writeset_out, NULL, tv);

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	check_selectop(sop);

	if (res == -1) {
    
    
		if (errno != EINTR) {
    
    
			event_warn("select");
			return (-1);
		}

		return (0);
	}

	event_debug(("%s: select reports %d", __func__, res));

	check_selectop(sop);
	i = random() % nfds;
	for (j = 0; j < nfds; ++j) {
    
    
		if (++i >= nfds)
			i = 0;
		res = 0;
		if (FD_ISSET(i, sop->event_readset_out))
			res |= EV_READ;
		if (FD_ISSET(i, sop->event_writeset_out))
			res |= EV_WRITE;

		if (res == 0)
			continue;

		evmap_io_active(base, i, res);
	}
	check_selectop(sop);

	return (0);
}

static int select_resize(struct selectop *sop, int fdsz)
{
    
    
	//重新计算readset_in,writeset_in的大小,并标记resize_out_sets为1
}

static int select_add(struct event_base *base, int fd, short old, short events, void *p)
{
    
    
	struct selectop *sop = base->evbase;
	(void) p;

	EVUTIL_ASSERT((events & EV_SIGNAL) == 0);
	check_selectop(sop);
	/*
	 * Keep track of the highest fd, so that we can calculate the size
	 * of the fd_sets for select(2)
	 */
	if (sop->event_fds < fd) {
    
    
		int fdsz = sop->event_fdsz;

		if (fdsz < (int)sizeof(fd_mask))
			fdsz = (int)sizeof(fd_mask);

		/* In theory we should worry about overflow here.  In
		 * reality, though, the highest fd on a unixy system will
		 * not overflow here. XXXX */
		while (fdsz < (int) SELECT_ALLOC_SIZE(fd + 1))
			fdsz *= 2;

		if (fdsz != sop->event_fdsz) {
    
    
			if (select_resize(sop, fdsz)) {
    
    
				check_selectop(sop);
				return (-1);
			}
		}

		sop->event_fds = fd;
	}

	if (events & EV_READ)
		FD_SET(fd, sop->event_readset_in);
	if (events & EV_WRITE)
		FD_SET(fd, sop->event_writeset_in);
	check_selectop(sop);

	return (0);
}

static int select_del(struct event_base *base, int fd, short old, short events, void *p)
{
    
    
	struct selectop *sop = base->evbase;
	(void)p;

	EVUTIL_ASSERT((events & EV_SIGNAL) == 0);
	check_selectop(sop);

	if (sop->event_fds < fd) {
    
    
		check_selectop(sop);
		return (0);
	}

	if (events & EV_READ)
		FD_CLR(fd, sop->event_readset_in);

	if (events & EV_WRITE)
		FD_CLR(fd, sop->event_writeset_in);

	check_selectop(sop);
	return (0);
}

static void select_free_selectop(struct selectop *sop)
{
    
    
	//清理sop,代码删掉了
}

static void select_dealloc(struct event_base *base)
{
    
    
	 //这里代码删了
}

六、libevent的epoll封装

封装epoll的数据存放在epollop里,包含的属性如下:

属性 含义
struct epoll_event *events 事件列表
int nevents 事件数量
int epfd epoll的文件句柄

七、libevent的epoll封装源码

static void * epoll_init(struct event_base *base)
{
    
    
	//初始化epollop,返回epollop对象
}

static const char * change_to_string(int change)
{
    
    
	change &= (EV_CHANGE_ADD|EV_CHANGE_DEL);
	if (change == EV_CHANGE_ADD) {
    
    
		return "add";
	} else if (change == EV_CHANGE_DEL) {
    
    
		return "del";
	} else if (change == 0) {
    
    
		return "none";
	} else {
    
    
		return "???";
	}
}

static const char * epoll_op_to_string(int op)
{
    
    
	return op == EPOLL_CTL_ADD?"ADD":
	    op == EPOLL_CTL_DEL?"DEL":
	    op == EPOLL_CTL_MOD?"MOD":
	    "???";
}

static int epoll_apply_one_change(struct event_base *base,
    struct epollop *epollop,
    const struct event_change *ch)
{
    
    
	struct epoll_event epev;
	int op, events = 0;

	if (1) {
    
    
		
		if ((ch->read_change & EV_CHANGE_ADD) ||
		    (ch->write_change & EV_CHANGE_ADD)) {
    
    
			/* If we are adding anything at all, we'll want to do
			 * either an ADD or a MOD. */
			events = 0;
			op = EPOLL_CTL_ADD;
			if (ch->read_change & EV_CHANGE_ADD) {
    
    
				events |= EPOLLIN;
			} else if (ch->read_change & EV_CHANGE_DEL) {
    
    
				;
			} else if (ch->old_events & EV_READ) {
    
    
				events |= EPOLLIN;
			}
			if (ch->write_change & EV_CHANGE_ADD) {
    
    
				events |= EPOLLOUT;
			} else if (ch->write_change & EV_CHANGE_DEL) {
    
    
				;
			} else if (ch->old_events & EV_WRITE) {
    
    
				events |= EPOLLOUT;
			}
			if ((ch->read_change|ch->write_change) & EV_ET)
				events |= EPOLLET;

			if (ch->old_events) {
    
    
				/* If MOD fails, we retry as an ADD, and if
				 * ADD fails we will retry as a MOD.  So the
				 * only hard part here is to guess which one
				 * will work.  As a heuristic, we'll try
				 * MOD first if we think there were old
				 * events and ADD if we think there were none.
				 *
				 * We can be wrong about the MOD if the file
				 * has in fact been closed and re-opened.
				 *
				 * We can be wrong about the ADD if the
				 * the fd has been re-created with a dup()
				 * of the same file that it was before.
				 */
				op = EPOLL_CTL_MOD;
			}
		} else if ((ch->read_change & EV_CHANGE_DEL) ||
		    (ch->write_change & EV_CHANGE_DEL)) {
    
    
			/* If we're deleting anything, we'll want to do a MOD
			 * or a DEL. */
			op = EPOLL_CTL_DEL;

			if (ch->read_change & EV_CHANGE_DEL) {
    
    
				if (ch->write_change & EV_CHANGE_DEL) {
    
    
					events = EPOLLIN|EPOLLOUT;
				} else if (ch->old_events & EV_WRITE) {
    
    
					events = EPOLLOUT;
					op = EPOLL_CTL_MOD;
				} else {
    
    
					events = EPOLLIN;
				}
			} else if (ch->write_change & EV_CHANGE_DEL) {
    
    
				if (ch->old_events & EV_READ) {
    
    
					events = EPOLLIN;
					op = EPOLL_CTL_MOD;
				} else {
    
    
					events = EPOLLOUT;
				}
			}
		}

		if (!events)
			return 0;

		memset(&epev, 0, sizeof(epev));
		epev.data.fd = ch->fd;
		epev.events = events;
		if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) {
    
    
			if (op == EPOLL_CTL_MOD && errno == ENOENT) {
    
    
				/* If a MOD operation fails with ENOENT, the
				 * fd was probably closed and re-opened.  We
				 * should retry the operation as an ADD.
				 */
				if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
    
    
					event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
					    (int)epev.events, ch->fd);
					return -1;
				} else {
    
    
					event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
						(int)epev.events,
						ch->fd));
				}
			} else if (op == EPOLL_CTL_ADD && errno == EEXIST) {
    
    
				/* If an ADD operation fails with EEXIST,
				 * either the operation was redundant (as with a
				 * precautionary add), or we ran into a fun
				 * kernel bug where using dup*() to duplicate the
				 * same file into the same fd gives you the same epitem
				 * rather than a fresh one.  For the second case,
				 * we must retry with MOD. */
				if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
    
    
					event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
					    (int)epev.events, ch->fd);
					return -1;
				} else {
    
    
					event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
						(int)epev.events,
						ch->fd));
				}
			} else if (op == EPOLL_CTL_DEL &&
			    (errno == ENOENT || errno == EBADF ||
				errno == EPERM)) {
    
    
				/* If a delete fails with one of these errors,
				 * that's fine too: we closed the fd before we
				 * got around to calling epoll_dispatch. */
				event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
					(int)epev.events,
					ch->fd,
					strerror(errno)));
			} else {
    
    
				event_warn("Epoll %s(%d) on fd %d failed.  Old events were %d; read change was %d (%s); write change was %d (%s)",
				    epoll_op_to_string(op),
				    (int)epev.events,
				    ch->fd,
				    ch->old_events,
				    ch->read_change,
				    change_to_string(ch->read_change),
				    ch->write_change,
				    change_to_string(ch->write_change));
				return -1;
			}
		} else {
    
    
			event_debug(("Epoll %s(%d) on fd %d okay. [old events were %d; read change was %d; write change was %d]",
				epoll_op_to_string(op),
				(int)epev.events,
				(int)ch->fd,
				ch->old_events,
				ch->read_change,
				ch->write_change));
		}
	}
	return 0;
}

static int epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd,
    short old, short events, void *p)
{
    
    
	struct event_change ch;
	ch.fd = fd;
	ch.old_events = old;
	ch.read_change = ch.write_change = 0;
	if (events & EV_WRITE)
		ch.write_change = EV_CHANGE_ADD |
		    (events & EV_ET);
	if (events & EV_READ)
		ch.read_change = EV_CHANGE_ADD |
		    (events & EV_ET);

	return epoll_apply_one_change(base, base->evbase, &ch);
}

static int epoll_nochangelist_del(struct event_base *base, evutil_socket_t fd,
    short old, short events, void *p)
{
    
    
	struct event_change ch;
	ch.fd = fd;
	ch.old_events = old;
	ch.read_change = ch.write_change = 0;
	if (events & EV_WRITE)
		ch.write_change = EV_CHANGE_DEL;
	if (events & EV_READ)
		ch.read_change = EV_CHANGE_DEL;

	return epoll_apply_one_change(base, base->evbase, &ch);
}

static int epoll_dispatch(struct event_base *base, struct timeval *tv)
{
    
    
	struct epollop *epollop = base->evbase;
	struct epoll_event *events = epollop->events;
	int i, res;
	long timeout = -1;

	if (tv != NULL) {
    
    
		timeout = evutil_tv_to_msec(tv);
		if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) {
    
    
			/* Linux kernels can wait forever if the timeout is
			 * too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
			timeout = MAX_EPOLL_TIMEOUT_MSEC;
		}
	}

	epoll_apply_changes(base);
	//event_changelist_remove_all(&base->changelist, base);

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	if (res == -1) {
    
    
		if (errno != EINTR) {
    
    
			event_warn("epoll_wait");
			return (-1);
		}

		return (0);
	}

	event_debug(("%s: epoll_wait reports %d", __func__, res));
	EVUTIL_ASSERT(res <= epollop->nevents);

	for (i = 0; i < res; i++) {
    
    
		int what = events[i].events;
		short ev = 0;

		if (what & (EPOLLHUP|EPOLLERR)) {
    
    
			ev = EV_READ | EV_WRITE;
		} else {
    
    
			if (what & EPOLLIN)
				ev |= EV_READ;
			if (what & EPOLLOUT)
				ev |= EV_WRITE;
		}

		if (!ev)
			continue;

		evmap_io_active(base, events[i].data.fd, ev | EV_ET);
	}

	if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) {
    
    
		/* We used all of the event space this time.  We should
		   be ready for more events next time. */
		int new_nevents = epollop->nevents * 2;
		struct epoll_event *new_events;

		new_events = mm_realloc(epollop->events,
		    new_nevents * sizeof(struct epoll_event));
		if (new_events) {
    
    
			epollop->events = new_events;
			epollop->nevents = new_nevents;
		}
	}

	return (0);
}

static void epoll_dealloc(struct event_base *base)
{
    
    
	//删掉了代码
}

猜你喜欢

转载自blog.csdn.net/gamekit/article/details/113087362