moduo网络库的reactor模式(下):实现非阻塞TCP网络

1、在reactor框架下加入tcp

Unix下的tcp连接也是经由socket文件描述符(sockfd)实现的。此节只是封装了listening sockefd进行监听(accept(2)),得到的新连接(普通sockfd)直接提供给用户让用户自行处理。下一节才进一步地将得到的新连接也封装起来。

1.1、首先将unix下的socket调用api简易封装成Socket类,得到wapper。即将api调用如socket()、bind()、listen()、accept()等裹上对错误返回值的处理。

#ifndef SOCKET_H_
#define SOCKET_H_

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

class Socket
{
public:
  Socket(unsigned short port) : port_(port) 
  {
    sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd_<0)
    {
      perror("socket");
	  exit(-1);
    }
    // non-block
    int flags = ::fcntl(sockfd_, F_GETFL, 0);
    flags |= O_NONBLOCK;
    int ret = ::fcntl(sockfd_, F_SETFL, flags);
    if(ret==-1)
    {
      perror("fcntl");
	  exit(-1);
    }

    // close-on-exec
    flags = ::fcntl(sockfd_, F_GETFD, 0);
    flags |= FD_CLOEXEC;
    ret = ::fcntl(sockfd_, F_SETFD, flags);
    if(ret==-1)
    {
      perror("fcntl");
	  exit(-1);
    }
  }

  void setReuseAddr(bool on)
  {
    int optval = on ? 1 : 0;
    setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR,
                 &optval, sizeof optval);
    // FIXME CHECK
  }

  ~Socket() 
  {
    close(sockfd_); 
  }
  
  int fd() { return sockfd_; }

  void Bind()
  {
    struct sockaddr_in my_addr;
    bzero(&my_addr,sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(port_);   //port
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int err_log=bind(sockfd_, (struct sockaddr*)&my_addr, sizeof(my_addr));
    if(err_log!=0)
    {
        perror("bind");
	    close(sockfd_);
	    exit(-1);
    }
  }
  
  void Listen()
  {
    int err_log = listen(sockfd_, 10);
    if(err_log!=0)
    {
        perror("listen");
	    close(sockfd_);
	    exit(-1);
    }
  }

  int Accept(struct sockaddr_in* peeraddr)
  {
    //struct sockaddr_in addr;
    //bzero(&addr, sizeof addr);
    //socklen_t addr_len=sizeof(addr);
    //int connfd = accept(sockfd_, &addr, &addr_len);

    bzero(peeraddr, sizeof(*peeraddr));
    socklen_t peeraddr_len=sizeof(*peeraddr);

    int connfd = accept(sockfd_, (sockaddr*)peeraddr, &peeraddr_len);
    if (connfd < 0)
    {
      perror("accept");  
    }
    return connfd;
  }

private:
  unsigned short port_;
  int sockfd_;
};
    
#endif

1.2、封装listening sockfd为Acceptor类,封装方法与muduo手法类似。负责监听(accept(2))外部是否有新建连接。其中

Acceptor::enableReading()为用户调用函数,使能listening sockfd可读并加入到poll(2)中进行事件循环。若可读(listen到有新连接),则回调Acceptor::handleRead()进行accept(2)并回调用户函数cb_(),而cb_是由用户调用setNewConnectionCallbackFunc()设置。cb_()则是用户自行设计的普通sockfd读、写等操作处理。

#ifndef ACCEPTOR_H_
#define ACCEPTOR_H_

#include "Socket.hpp"
#include "Channel.hpp"
#include "Thread.hpp"
#include <iostream>

class EventLoop;

class Acceptor
{
public:
  typedef std::function<void(int, sockaddr_in*)> AcceptCallbackFunc;
  Acceptor(EventLoop* loop, unsigned short port)
   : loop_(loop), socket_(port), socketChannel_(loop_,socket_.fd())
  {}

  ~Acceptor() {}

  void setNewConnectionCallbackFunc(AcceptCallbackFunc cb)
  {
    cb_=cb;
  }

  //user used function
  void enableReading()
  {
    socket_.Bind();
    socket_.Listen();
    socketChannel_.setReadCallback(std::bind(&Acceptor::handleRead,this));
    socketChannel_.enableReading();
  }
 
  void handleRead()
  {
    struct sockaddr_in peeraddr;
    int connfd=socket_.Accept(&peeraddr);
    //std::cout<<"tid "<<CurrentThreadtid()<<": server is acceptting in port 6666"<<std::endl;
    if(connfd>=0)
      if(cb_)
        cb_(connfd, &peeraddr);
    else
      ::close(connfd);
  }

  void setReuseAddr(bool on)
  {
    socket_.setReuseAddr(true);
  }

private:
  EventLoop* loop_;
  Socket socket_;
  Channel socketChannel_;
  AcceptCallbackFunc cb_;
};

#endif

1.3、测试

server端:

Acceptor server1(loop, 6666),server2(loop,6688);为两个不同的listening sockfd封装类。

Acceptor::setReuseAddr()为该listening sockfd关闭后不必等待则可直接再次使用。

client1()和client2()为用户回调函数,此处用于处理普通socketfd的读操作:当有新建连接时,发送一句话给连接者然后close。

#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include "Acceptor.hpp"
#include <iostream>
    
using namespace std;
   
void client1(int connfd, sockaddr_in* peeraddr)
{     
  cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl;
  if(send(connfd, "How are you", 11, 0) < 0)
    cout<<"send error"<<endl;
  
  close(connfd);
} 
  
void client2(int connfd, sockaddr_in* peeraddr)
{ 
  cout<<"tid "<<CurrentThreadtid()<<": server accept a connector"<<endl;
  if(send(connfd, "What's up", 9, 0) < 0)
    cout<<"send error"<<endl;
  
  close(connfd);
} 

int main()
{
  cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread

  EventLoopThread ELThread1;
  EventLoop* loop = ELThread1.startLoop();//thread 2
  Acceptor server1(loop, 6666);//TCP server create in main thread, but accept in thread 2
  server1.setReuseAddr(true);
  server1.setAcceptCallbackFunc(client1);
  server1.enableReading();

  Acceptor server2(loop, 6688);//TCP server create in main thread, but accept in thread 2
  server2.setReuseAddr(false);
  server2.setAcceptCallbackFunc(client2);
  server2.enableReading();
  //loop->loop(); //test "one thread one loop"
  sleep(20);
  loop->quit();

  sleep(3);
  return 0;
}

client端:

  1 #include <stdio.h>
  2 #include <unistd.h>
  3 #include <string.h>
  4 #include <stdlib.h>
  5 #include <arpa/inet.h>
  6 #include <sys/socket.h>
  7 #include <netinet/in.h>
  8 
  9 int main(int argc, char *argv[])
 10 {
 11     unsigned short port = 6666;
 12     char *server_ip = "127.0.0.1";
 13 
 14     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
 15     if(sockfd<0)
 16     {
 17         perror("socket");
 18         exit(-1);
 19     }
 20 
 21     struct sockaddr_in server_addr;
 22     bzero(&server_addr,sizeof(server_addr));
 23     server_addr.sin_family = AF_INET;
 24     server_addr.sin_port = htons(port);
 25     inet_pton(AF_INET, server_ip, &server_addr.sin_addr.s_addr);
 26 
 27     int err_log = connect(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
 28     if(err_log!=0)
 29     {
 30         perror("connect");
 31         close(sockfd);
 32         exit(-1);
 33     }
 34 
 35     char buff[4096];
 36     int n=recv(sockfd, buff, 4096, 0);
 37     buff[n]='\0';
 38     printf("Port %d: recv msg from server: %s\n", port, buff);
 39     close(sockfd);
 40 
 41     return 0;
 42 }

 测试:

server:端

baddy@ubuntu:~/Documents/Reactor/s2.1$ ./testAcceptorDemo 
Main: pid: 56222 tid: 56222
tid 56222: create a new thread
tid 56222: waiting
tid 56223: Thread::func_() started!
tid 56223: notified
tid 56222: received notification
tid 56223: start looping...
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: server accept a connector
tid 56223: end looping...
tid 56223: Thread end!

client端: 

baddy@ubuntu:~/Documents$ ./tcp_send
Port 6666: recv msg from server: How are you
baddy@ubuntu:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up
baddy@ubuntu:~/Documents$ ./tcp_send2
Port 6688: recv msg from server: What's up

2 封装TCP网络

在上一节的基础上进一步封装普通sockfd为TcpConnection类,并将listening sockfd类Acceptor和普通sockfd类TcpConnection封装到类TcpServer供用户使用。

封装方式与muduo手法基本一致:把当前对象(Acceptor对象)封装到一个新类(TcpServer类)中成为其数据成员。在构造函数中构造该成员(此操作相当于上节中用户自行构造Acceptor server1(loop,6666)),并在构造函数中使该成员回调新的回调函数(把回调Acceptor::handleRead()改为TcpServer::newConnection())。在新的回调函数中构造TcpConnection对象(用于封装accept(2)得到的普通sockfd),当普通sockfd有事件发生时则回调由用户传入的回调函数(相当于上节中的client1())。

用户通过设置操作普通sockfd读、写等事件的回调函数,然后调用TcpServer::start()使能listening sockfd,进入事件循环则可实现非阻塞TCP网络。

  • 2.1 封装listening sockfd为Acceptor类,封装方法与muduo手法类似。负责监听(accept(2))外部是否有新建连接。

    Acceptor用于监听,关注连接,建立连接后,由TCPConnection来接管处理;

    这个类没有业务处理,用来处理监听和连接请求到来后的逻辑; 

    所有与事件循环相关的都是Channel,Acceptor不直接和EventLoop打交道,所以在这个类中需要有一个Channel的成员,并包含将Channel挂到事件循环中的逻辑(listen())。

  • 2.2 封装普通socket为TcpConnection类,封装方法与muduo手法类似。负责接收新建的普通sockfd。

    TcpConnection处理连接建立后的收发数据;业务处理回调完成。

  • 2.3 封装供用户使用的TcpServer类,封装负责监听listening socketfd的Acceptor类和普通sockfd的TcpConnection类。

    作为最终用户的接口方,和外部打交道通过TCPServer交互,而业务逻辑处理将回调函数传入到底层,这种传递函数的方式犹如数据的传递一样自然和方便;

    作用Acceptor和TcpConnection的粘合剂,调用Acceptor开始监听连接并设置回调,连接请求到来后,在回调中新建TcpConnection连接,设置TcpConnection的回调(将用户的业务处理回调函数传入,包括:连接建立后,读请求处理、写完后的处理,连接关闭后的处理),从这里可以看到,业务逻辑的传递就跟数据传递一样,多么漂亮。


总结:

(1)通过对象回调机制,一个对象A可以把一个特定工作委托给另一个对象B的一个方法来完成。A不必知道B的名字,也不用知道它的类型,甚至都不需要知道B的存在,只要求B对象具有一个签名正确的方法,就可以通过回调机制把工作交给B的这个方法来执行。在C语言里,这个机制是通过函数指针实现的,所以很自然的,在C++里,我们希望通过指向成员函数的指针(如std::function+std::bind())来解决类似问题。

(2)函数式编程中,类之间的关系主要通过组合来实现,而不是通过派生实现; 这也是函数式编程的一个设计理念,更多的使用组合而不是继承来实现类之间的关系,而支撑其能够这样设计的根源在于function()+bind()带来的函数自由传递,实现回调非常简单; 而OO设计中,只能使用基于虚函数/多态来实现回调,不可避免的使用继承结构。


参考资料

https://github.com/chenshuo/muduo

C++ 工程实践(5):避免使用虚函数作为库的接口

function/bind的救赎(上)

猜你喜欢

转载自blog.csdn.net/u014694510/article/details/83895320