reactor服务器实现代码:
/*================================================================
* Copyright (C) 2021 baichao All rights reserved.
*
* 文件名称:reactor.cpp
* 创 建 者:baichao
* 创建日期:2021年03月07日
* 描 述:
*
================================================================*/
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <errno.h>
#include <sys/epoll.h>
#include <chrono>
#include <thread>
#define RBUFFERSIZE 1023
#define SBUFFERSIZE 1023
/*
每一个fd对应一个socketitem结构体空间.
*/
struct sockitem
{ //
int sockfd;
int (*callback)(int fd, int events, void *arg); //fd对应的回调函数
char recvbuffer[1024]; // 读缓存
char sendbuffer[1024]; // 写缓存
int rlength;
int slength;
};
// mainloop / eventloop --> epoll -->
struct reactor
{
int epfd;
struct epoll_event events[512];
};
struct reactor *eventloop = NULL;
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg)
{
time_t now_time = time(NULL);
time_t timep;
time(&timep);
char tmp[256];
strftime(tmp, sizeof(tmp), "%Y-%m-%d %H:%M:%S", localtime(&timep));
std::cout << tmp << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
struct sockitem *si = (struct sockitem *)arg;
std::cout <<"发送数据:"<< si->sendbuffer << std::endl;
send(fd, si->sendbuffer, si->slength, 0); //发送sendbuffer里已有的数据
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
int recv_cb(int fd, int events, void *arg)
{
struct sockitem *si = (struct sockitem *)arg;
struct epoll_event ev;
int ret = recv(fd, si->recvbuffer, RBUFFERSIZE, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{ //
return -1;
}
else
{
}
ev.events = EPOLLIN;
//ev.data.fd = fd;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
}
else if (ret == 0)
{
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
}
else
{
std::cout << "我是服务器,我收到了数据:" << si->recvbuffer <<std::endl;
//将recvbuffer中收到的数据拷贝到sendbuffer中去
si->rlength = ret;
memcpy(si->sendbuffer, si->recvbuffer, sizeof(si->recvbuffer));
si->slength = si->rlength;
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev); //recv到数据后修改事件,将fd对应的事件的回调函数修改为发送
}
}
int accept_cb(int fd, int events, void *arg)
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (clientfd <= 0)
return -1;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
si->sockfd = clientfd;
si->callback = recv_cb;
ev.data.ptr = si;
//aceept返回的clientfd需要加入epoll中。
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev);
std::cout<<"accept success"<<std::endl;
return clientfd;
}
int main(int argc, char *argv[])
{
if (argc < 2)
{
return -1;
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) < 0)
{
return -2;
}
if (listen(sockfd, 5) < 0)
{
return -3;
}
eventloop = (struct reactor *)malloc(sizeof(struct reactor));
// epoll opera
eventloop->epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si;
//此处传入ev的作用在于再epoll_wait()返回的的ev中包含有fd和si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1)
{
//一次最多返回512个事件
int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
if (nready < -1)
{
break;
}
//只需要将读写分离就好,accept现在就属于EPOLLIN的一个子类,只是对应回调函数不同而已。
int i = 0;
for (i = 0; i < nready; i++)
{
if (eventloop->events[i].events & EPOLLIN)
{
struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
if (eventloop->events[i].events & EPOLLOUT)
{
struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
}
}
}
客户端测试代码:
/*================================================================
* Copyright (C) 2021 baichao All rights reserved.
*
* 文件名称:reactor_client.cpp
* 创 建 者:baichao
* 创建日期:2021年03月09日
* 描 述:
*
================================================================*/
#include<iostream>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <chrono>
#include <thread>
int main(){
int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;;
serv_addr.sin_port = htons(11230);
if(connect(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
{
std::cout<<"连接客户端失败"<<std::endl;
}
else
std::cout<<"连接客户端成功"<<std::endl;
char context_send[100] = "服务器,你好,我是客户端!\n";
char context_recv[100] ;
while(1)
{
std::cout<<"send number:"<<send(serv_sock,context_send,sizeof(context_send),0)<<std::endl;
std::cout<<"recv number:"<<recv(serv_sock,context_recv,sizeof(context_recv),0)<<std::endl;
time_t now_time = time(NULL);
std::cout<<context_recv<<" ";
time_t timep;
time(&timep);
char tmp[256];
strftime(tmp, sizeof(tmp), "%Y-%m-%d %H:%M:%S", localtime(&timep));
std::cout << tmp << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
close(serv_sock);
return 0;
}