kqueu的两个例子:
1、完全以IO复用的方式读入标准输入流数据,输出到标准输出流中
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/event.h>
#include <errno.h>
#include <string.h>
const static int FD_NUM=2; //两个文件描述符,分别为标准输入与输出
const static int IO_BUFFER_SIZE=1500; //缓冲区大小
void turn_on_flags(int fd, int flags);//为文件描述符打开对应状态位的工具函数
//完全以IO复用的方式读入标准输入流数据,输出到标准输出流中
int main()
{
struct kevent changes[FD_NUM];
struct kevent events[FD_NUM];
//创建一个kqueue
int kque;
if((kque=kqueue()) == -1 )
{
printf("There was an error in kqueue()!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
//准备从标准输入流中读数据
int stdin_fd=STDIN_FILENO;
int stdout_fd=STDOUT_FILENO;
//设置为非阻塞
turn_on_flags(stdin_fd, O_NONBLOCK);
turn_on_flags(stdout_fd, O_NONBLOCK);
//注册监听事件
int k=0;
for(int k=0;k<FD_NUM;k++)
{
EV_SET(&changes[k], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
EV_SET(&changes[k], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdout_fd);
}
int nEvent, nRead, nWrite=0; //发生事件的数量, 已读字节数, 已写字节数
char IOBuffer[IO_BUFFER_SIZE];
while(1)
{
nEvent=kevent(kque, changes, FD_NUM, events, FD_NUM, NULL); //已经就绪的文件描述符数量
if( nEvent<=0 )
{
printf("There was an error in kevent()!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
for(int i=0;i<nEvent;i++)
{
struct kevent event=events[i];
if(event.flags&EV_ERROR)
{
printf("Event error!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
int ev_fd=*((int *)event.udata);
//输入流就绪 且 缓冲区还有空间能继续读
if( ev_fd == stdin_fd && nRead < IO_BUFFER_SIZE )
{
int new_nread=0;
if((new_nread=read(ev_fd,IOBuffer+nRead,sizeof(IOBuffer)-nRead))<=0)
{
//由于可读事件已经发生,因此如果读出0个字节也是不正常的
printf("Event error!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
nRead=nRead+new_nread; //递增已读数据字节数
}
//输出流就绪 且 缓冲区有内容可以写出
if( ev_fd == stdout_fd && nRead > 0 )
{
if((nWrite=write(stdout_fd,IOBuffer,nRead))<=0)
{
printf("There was an error in write()!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
memmove(IOBuffer, IOBuffer+nWrite, nWrite); //为了使实现的代码更简洁,这里把还没有写出去的数据往前移动
nRead=nRead-nWrite; //减去已经写出去的字节数
}
}
}
return 0;
}
//为文件描述符打开对应状态位的工具函数
void turn_on_flags(int fd, int flags)
{
int current_flags=0;
//获取给定文件描述符现有的flag
//其中fcntl的第二个参数F_GETFL表示要获取fd的状态
current_flags=fcntl(fd, F_GETFL);
if(current_flags<0)
{
printf("There was an error in getflag!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
else
{
//施加新的状态位
current_flags |= flags;
if(fcntl(fd,F_SETFL,current_flags)<0)
{
printf("There was an error in setflag!%d,errno:%s\n",errno,strerror(errno));
exit(errno);
}
}
}
2、TCP服务器
#include <sys/event.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define BUFSIZE 1024
/* 函数原型 */
void diep(const char *s);
int tcpopen(const char *host, int port);
void sendbuftosck(int sckfd, const char *buf, int len);
int main(int argc, char *argv[])
{
struct kevent chlist[2];/* 我们要监视的事件 */
struct kevent evlist[2];/* 触发的事件 */
char buf[BUFSIZE];
int sckfd, kq, nev, i;
/* 检查参数数量 */
if (argc != 3)
{
fprintf(stderr, "usage: %s host port\n", argv[0]);
exit(EXIT_FAILURE);
}
/* 打开一个链接(host,port)pair */
sckfd = tcpopen(argv[1], atoi(argv[2]));
/* 创建一个新的内核事件队列 */
if ((kq = kqueue()) == -1)
diep("kqueue()");
/* 初始化kevent结构体 */
EV_SET(&chlist[0], sckfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
EV_SET(&chlist[1], fileno(stdin), EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
/* 无限循环 */
for (;;)
{
nev = kevent(kq, chlist, 2, evlist, 2, NULL);
if (nev < 0)
{
diep("kevent()");
}
else if (nev > 0)
{
if (evlist[0].flags & EV_EOF)/* 读取socket关闭指示 */
{
exit(EXIT_FAILURE);
}
for (i = 0; i < nev; i++)
{
if (evlist[i].flags & EV_ERROR)
{
/* 报告错误 */
fprintf(stderr, "EV_ERROR: %s\n", strerror(evlist[i].data));
exit(EXIT_FAILURE);
}
if (evlist[i].ident == sckfd)
{
/* 我们从host接收到数据 */
memset(buf, 0, BUFSIZE);
if (read(sckfd, buf, BUFSIZE) < 0)
{
diep("read()");
}
fputs(buf, stdout);
}
else if (evlist[i].ident == fileno(stdin))
{
/* stdin中有数据输入 */
memset(buf, 0, BUFSIZE);
fgets(buf, BUFSIZE, stdin);
sendbuftosck(sckfd, buf, strlen(buf));
}
}
}
}
close(kq);
return EXIT_SUCCESS;
}
void diep(const char *s)
{
perror(s);
exit(EXIT_FAILURE);
}
int tcpopen(const char *host, int port)
{
struct sockaddr_in server;
struct hostent *hp;
int sckfd;
if ((hp = gethostbyname(host)) == NULL)
{
diep("gethostbyname()");
}
if ((sckfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
{
diep("socket()");
}
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr = *((struct in_addr *)hp->h_addr);
memset(&(server.sin_zero), 0, 8);
if (connect(sckfd, (struct sockaddr *)&server, sizeof(struct sockaddr)) < 0)
{
diep("connect()");
}
return sckfd;
}
void sendbuftosck(int sckfd, const char *buf, int len)
{
int bytessent, pos;
pos = 0;
do
{
if ((bytessent = send(sckfd, buf + pos, len - pos, 0)) < 0)
{
diep("send()");
}
pos += bytessent;
}while(bytessent > 0);
}