kqueue示例

kqueu的两个例子:

1、完全以IO复用的方式读入标准输入流数据,输出到标准输出流中

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/event.h>
#include <errno.h>
#include <string.h>

const static int FD_NUM=2; //两个文件描述符,分别为标准输入与输出
const static int IO_BUFFER_SIZE=1500; //缓冲区大小

void turn_on_flags(int fd, int flags);//为文件描述符打开对应状态位的工具函数

//完全以IO复用的方式读入标准输入流数据,输出到标准输出流中
int main()
{
    struct kevent changes[FD_NUM];
    struct kevent events[FD_NUM];
	
    //创建一个kqueue
    int kque;
    if((kque=kqueue()) == -1 )
	{
		printf("There was an error in kqueue()!%d,errno:%s\n",errno,strerror(errno));
		exit(errno);
	}
	
    //准备从标准输入流中读数据
    int stdin_fd=STDIN_FILENO;
    int stdout_fd=STDOUT_FILENO;
	
    //设置为非阻塞
    turn_on_flags(stdin_fd, O_NONBLOCK);
    turn_on_flags(stdout_fd, O_NONBLOCK);
	
    //注册监听事件
    int k=0;
    for(int k=0;k<FD_NUM;k++)
	{
		EV_SET(&changes[k], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
		EV_SET(&changes[k], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdout_fd);
	}
	
    int nEvent, nRead, nWrite=0; //发生事件的数量, 已读字节数, 已写字节数
    char IOBuffer[IO_BUFFER_SIZE];
    while(1)
	{
        nEvent=kevent(kque, changes, FD_NUM, events, FD_NUM, NULL); //已经就绪的文件描述符数量
        if( nEvent<=0 )
		{
			printf("There was an error in kevent()!%d,errno:%s\n",errno,strerror(errno));
			exit(errno);
		}
		
        for(int i=0;i<nEvent;i++)
		{
            struct kevent event=events[i];
            if(event.flags&EV_ERROR)
			{
				printf("Event error!%d,errno:%s\n",errno,strerror(errno));
				exit(errno);
			}
			
            int ev_fd=*((int *)event.udata);
            //输入流就绪 且 缓冲区还有空间能继续读
            if( ev_fd == stdin_fd && nRead < IO_BUFFER_SIZE )
			{
                int new_nread=0;
                if((new_nread=read(ev_fd,IOBuffer+nRead,sizeof(IOBuffer)-nRead))<=0)
				{
					//由于可读事件已经发生,因此如果读出0个字节也是不正常的
					printf("Event error!%d,errno:%s\n",errno,strerror(errno));
					exit(errno);
				}
                nRead=nRead+new_nread; //递增已读数据字节数
            }

            //输出流就绪 且 缓冲区有内容可以写出
            if( ev_fd == stdout_fd && nRead > 0 )
			{
                if((nWrite=write(stdout_fd,IOBuffer,nRead))<=0)
				{
					printf("There was an error in write()!%d,errno:%s\n",errno,strerror(errno));
					exit(errno);
				}
                memmove(IOBuffer, IOBuffer+nWrite, nWrite); //为了使实现的代码更简洁,这里把还没有写出去的数据往前移动
                nRead=nRead-nWrite; //减去已经写出去的字节数
            }
        }
    }
    return 0;
}

//为文件描述符打开对应状态位的工具函数
void turn_on_flags(int fd, int flags)
{
    int current_flags=0;
    //获取给定文件描述符现有的flag
    //其中fcntl的第二个参数F_GETFL表示要获取fd的状态
	current_flags=fcntl(fd, F_GETFL);
    if(current_flags<0)
	{
		printf("There was an error in getflag!%d,errno:%s\n",errno,strerror(errno));
		exit(errno);
	}
	else
	{
		//施加新的状态位
		current_flags |= flags;
		if(fcntl(fd,F_SETFL,current_flags)<0)
		{
			printf("There was an error in setflag!%d,errno:%s\n",errno,strerror(errno));
			exit(errno);
		}
	}
}

2、TCP服务器

#include <sys/event.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define BUFSIZE 1024

/* 函数原型 */
void diep(const char *s);
int tcpopen(const char *host, int port);
void sendbuftosck(int sckfd, const char *buf, int len);

int main(int argc, char *argv[])
{
	struct kevent chlist[2];/* 我们要监视的事件 */
	struct kevent evlist[2];/* 触发的事件 */
	char buf[BUFSIZE];
	int sckfd, kq, nev, i;

	/* 检查参数数量 */
	if (argc != 3)
	{
		fprintf(stderr, "usage: %s host port\n", argv[0]);
		exit(EXIT_FAILURE);
	}

	/* 打开一个链接(host,port)pair */
	sckfd = tcpopen(argv[1], atoi(argv[2]));

	/* 创建一个新的内核事件队列 */
	if ((kq = kqueue()) == -1)
		diep("kqueue()");

	/* 初始化kevent结构体 */
	EV_SET(&chlist[0], sckfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
	EV_SET(&chlist[1], fileno(stdin), EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);

	/* 无限循环 */
	for (;;)
	{
		nev = kevent(kq, chlist, 2, evlist, 2, NULL);

		if (nev < 0)
		{
			diep("kevent()");
		}
		else if (nev > 0)
		{
			if (evlist[0].flags & EV_EOF)/* 读取socket关闭指示 */
			{
				exit(EXIT_FAILURE);
			}

			for (i = 0; i < nev; i++)
			{
				if (evlist[i].flags & EV_ERROR)
				{
					/* 报告错误 */
					fprintf(stderr, "EV_ERROR: %s\n", strerror(evlist[i].data));
					exit(EXIT_FAILURE);
				}
				
				if (evlist[i].ident == sckfd)
				{
					/* 我们从host接收到数据 */
					memset(buf, 0, BUFSIZE);
					if (read(sckfd, buf, BUFSIZE) < 0)
					{
						diep("read()");
					}
					fputs(buf, stdout);
				}

				else if (evlist[i].ident == fileno(stdin))
				{
					/* stdin中有数据输入 */
					memset(buf, 0, BUFSIZE);
					fgets(buf, BUFSIZE, stdin);
					sendbuftosck(sckfd, buf, strlen(buf));
				}
			}
		}
	}

	close(kq);
	return EXIT_SUCCESS;
}

void diep(const char *s)
{
	perror(s);
	exit(EXIT_FAILURE);
}

int tcpopen(const char *host, int port)
{
	struct sockaddr_in server;
	struct hostent *hp;
	int sckfd;
	
	if ((hp = gethostbyname(host)) == NULL)
	{
		diep("gethostbyname()");
	}
	
	if ((sckfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
	{
		diep("socket()");
	}
	
	server.sin_family = AF_INET;
	server.sin_port = htons(port);
	server.sin_addr = *((struct in_addr *)hp->h_addr);
	memset(&(server.sin_zero), 0, 8);

	if (connect(sckfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) < 0)
	{
		diep("connect()");
	}
	
	return sckfd;
}

void sendbuftosck(int sckfd, const char *buf, int len)
{
	int bytessent, pos;

	pos = 0;
	do
	{
		if ((bytessent = send(sckfd, buf + pos, len - pos, 0)) < 0)
		{
			diep("send()");
		}
		pos += bytessent;
	}while(bytessent > 0);
}

猜你喜欢

转载自blog.csdn.net/Namcodream521/article/details/83032670