版权声明:所有的博客都是个人笔记,交流可以留言或者加QQ:2630492017 https://blog.csdn.net/qq_35976351/article/details/86561539
模式简介
这是基于SIGALRM
信号的定时器,关于该信号,可以参考这篇博客:https://blog.csdn.net/qq_35976351/article/details/86532889
本文来自于《Linux高性能服务器》这本书,不过把升序链表改成带有头结点和尾结点的结构了,这两个节点不存储数据,只是为了操作方便。
升序链表的模式作用如下:每隔固定的时间,就去扫描链表上的定时器。链表上的每个节点都有一个回调函数,本例子中的回调函数是取消注册事件并移除定时器。超时事件处理需要调用回调函数。
链表中的节点是按照事件升序排列的,所以tick()
时直接顺序执行就行,而且该函数只需要执行超时的任务,执行完成后立刻从定时器中移除。
该模式是时间轮的基础,典型的应用是心跳检测机制。
注意一点,因为定时器的优先级一般低于I/O处理的优先级,所以一般先做超时标记,留作最后处理定时器事件,代价是定时器的精度受到影响。
个人认为,,代码中最巧妙的一点是client_data
与util_timer
的指针互联机制,这样可以完成结构之间的相互索引,极大提高处理效率。关于这一点,可以在while
循环中事件处理的代码中体现。
代码实例
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <assert.h>
#include <time.h>
#include <functional>
#define FD_LIMIT 65535
#define MAX_EVENT_NUBEMR 1024
#define MAX_WAIT_NUMBER 32
#define MAX_SIGNAL_NUMBER 1024
#define TIMESLOT 5
#define BUFFER_SIZE 64
struct util_timer;
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer* timer;
client_data() : timer (nullptr) {
bzero (buf, BUFFER_SIZE);
bzero (&address, sizeof (address) );
}
};
struct util_timer {
time_t expire;
client_data* user_data;
std::function<void (client_data*) > cb_func;
util_timer *prev, *next;
util_timer() : prev (nullptr), next (nullptr) {}
};
class sort_timer_lst {
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer (util_timer* timer); // 添加一个新的定时器,保证链表升序的
void adjust_timer (util_timer* timer); // 调整timer,保证升序的
void del_timer (util_timer* timer); // 删除timer
void tick();
private:
util_timer* head {nullptr};
util_timer* tail{nullptr};
};
sort_timer_lst::sort_timer_lst() {
head = new util_timer;
tail = new util_timer;
head->next = tail;
tail->prev = head;
}
sort_timer_lst::~sort_timer_lst() {
auto tmp = head;
while (tmp != nullptr) {
tmp = tmp->next;
delete tmp->prev;
}
}
void sort_timer_lst::add_timer (util_timer* timer) {
if (timer == nullptr) {
return;
}
auto pos = head;
while (pos->next != tail) {
if (pos->next->expire < timer->expire) {
pos = pos->next;
} else {
break;
}
}
pos->next = timer->next;
timer->prev = pos;
timer->next->prev = timer;
}
void sort_timer_lst::adjust_timer (util_timer* timer) {
if (timer == nullptr || timer->next == tail || \
timer->expire < timer->next->expire) {
return;
}
// 先把timer从表中拆出来
auto pos = timer->next->next;
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
while (pos != tail) {
if (pos->expire < timer->expire) {
pos = pos->next;
} else {
break;
}
}
timer->next = pos;
timer->prev = pos->prev;
pos->prev = timer;
}
void sort_timer_lst::del_timer (util_timer* timer) {
if (timer == nullptr) {
return;
}
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
void sort_timer_lst::tick() {
if (head->next == tail) {
return;
}
printf ("timer tick\n");
time_t cur = time (NULL);
auto tmp = head->next;
while (tmp != tail) {
if (cur < tmp->expire) { // 超时时间和系统当前时间进行判别
break;
}
tmp->cb_func (tmp->user_data);
del_timer (tmp); // 执行完定时任务后就删除
}
}
static int epollfd = -1;
static int pipefd[2];
static sort_timer_lst timer_lst;
int setnonblocking (int fd) {
int old_option = fcntl (fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
assert (fcntl (fd, F_SETFL, new_option) != -1);
return old_option;
}
void addfd (int epollfd, int fd) {
epoll_event event;
event.data.fd;
event.events = EPOLLIN | EPOLLET;
assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, &event) != -1);
setnonblocking (fd);
}
void sig_handler (int sig) {
int save_errno = errno;
int msg = sig;
send (pipefd[1], (char*) &msg, sizeof (char), 0);
errno = save_errno;
}
void addsig (int sig) {
struct sigaction sa;
bzero (&sa, sizeof (sa) );
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset (&sa.sa_mask);
assert (sigaction (sig, &sa, NULL) != -1);
}
void timer_handler() {
timer_lst.tick();
alarm (TIMESLOT); // 重新定时需要不断触发SIGALRM信号
}
void cb_func (client_data* user_data) {
epoll_ctl (epollfd, EPOLL_CTL_DEL, user_data->sockfd, NULL);
close (user_data->sockfd);
printf ("close fd %d\n", user_data->sockfd);
}
int main (int argc, char* argv[]) {
if (argc != 2) {
printf ("Usage: %s <port of server>\n", argv[0]);
return 1;
}
int port = atoi (argv[1]);
if (port < 1024 || port > 65535) {
perror ("port error\n");
return 1;
}
struct sockaddr_in address;
bzero (&address, sizeof (address) );
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl (INADDR_ANY);
address.sin_port = htons (port);
int listenfd = socket (AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror ("socket() error\n");
return 1;
}
int ret = bind (listenfd, (struct sockaddr*) &address, sizeof (address) );
if (ret < 0) {
perror ("bind() error\n");
return 1;
}
ret = listen (listenfd, MAX_WAIT_NUMBER);
if (ret < 0) {
perror ("listen error\n");
return 1;
}
epollfd = epoll_create1 (0);
if (epollfd < 0) {
perror ("epoll_create1() error\n");
return 1;
}
addfd (epollfd, listenfd);
epoll_event events[MAX_EVENT_NUBEMR];
bzero (events, sizeof (events) );
ret = socketpair (PF_UNIX, SOCK_STREAM, 0, pipefd);
if (ret < 0) {
perror ("socketpair() error\n");
return 1;
}
setnonblocking (pipefd[1]);
addfd (epollfd, pipefd[0]);
// 设置信号集
addsig (SIGALRM);
addsig (SIGINT);
client_data* users = new client_data[FD_LIMIT];
alarm (TIMESLOT); // 定时
bool stop_server = false;
bool timeout = false;
while (!stop_server) {
int number = epoll_wait (epollfd, events, MAX_EVENT_NUBEMR, -1);
if (number < 0 && errno != EINTR) {
printf ("epoll failure\n");
break;
}
for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) { // 接收到新的连接
struct sockaddr_in client_address;
socklen_t client_addrlen = sizeof (client_address);
int connfd = 0;
while ( (connfd = accept (listenfd, (struct sockaddr*) &client_address, \
&client_addrlen) ) > 0) {
addfd (epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;
// 创建定时器,设置回调函数与超时时间,然后加入链表中
util_timer* timer = new util_timer;
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time (NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer (timer);
}
} else if (sockfd == pipefd[0] && events[i].events & EPOLLIN) {
// 这里处理信号
char signals[MAX_SIGNAL_NUMBER];
bzero (signals, MAX_EVENT_NUBEMR);
ret = recv (pipefd[0], signals, MAX_SIGNAL_NUMBER, 0);
if (ret == -1) {
perror ("recv() error\n");
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM:
// 定时器任务优先级不高,放到最后处理
timeout = true;
break;
case SIGINT:
stop_server = true;
// 以下可以扩充其他信号处理方式
}
}
}
} else if (events[i].events & EPOLLIN) {
// 处理客户端上的数据
if (ret <= 0) { // 发生读错误或者对方关闭连接,关闭连接。移除定时器
cb_func (&users[sockfd]);
timer_lst.del_timer (users[sockfd].timer);
} else {
ret = recv (sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf ("get %d bytes of client data %s from %d", \
ret, users[sockfd].buf, sockfd);
util_timer* timer = users[sockfd].timer;
// 调整定时器,延迟被关闭连接的时间
time_t cur = time (NULL);
timer->expire = cur + 3 * TIMESLOT;
timer_lst.adjust_timer (timer);
}
} else {
// 处理一些其他的事件
}
if (timeout) {
// 最后处理定时器事件,因为I/O事件有更高的优先级,缺陷是
// 不能精确地按照预期时间执行
timer_handler();
timeout = false;
}
}
}
delete[] users;
close(pipefd[0]);
close(pipefd[1]);
close(listenfd);
return 0;
}