muduo中的客户端的编写

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wk_bjut_edu_cn/article/details/82778423

总体来说,客户端的代码和服务端的代码大体相同,主要区别是一个是发起连接,一个是接受连接。

客户端的代码

#include <muduo/net/Channel.h>
#include <muduo/net/TcpClient.h>

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

class TestClient
{
 public:
  TestClient(EventLoop* loop, const InetAddress& listenAddr)
    : loop_(loop),
      client_(loop, listenAddr, "TestClient"),
	  //标准输入通道对应的文件描述符是0
      stdinChannel_(loop, 0)
  {
    client_.setConnectionCallback(
        boost::bind(&TestClient::onConnection, this, _1));
    client_.setMessageCallback(
        boost::bind(&TestClient::onMessage, this, _1, _2, _3));
	//连接已经建立成功,由于某种原因断开而进行的重连
    //client_.enableRetry();
    // 标准输入缓冲区中有数据的时候,回调TestClient::handleRead
    stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this));
	stdinChannel_.enableReading();		// 关注可读事件
  }

  void connect()
  {
    client_.connect();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      printf("onConnection(): new connection [%s] from %s\n",
             conn->name().c_str(),
             conn->peerAddress().toIpPort().c_str());
    }
    else
    {
      printf("onConnection(): connection [%s] is down\n",
             conn->name().c_str());
    }
  }

  void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
  {
    string msg(buf->retrieveAllAsString());
    printf("onMessage(): recv a message [%s]\n", msg.c_str());
    LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString();
  }

  // 标准输入缓冲区中有数据的时候,回调该函数
  void handleRead()
  {
    char buf[1024] = {0};
    fgets(buf, 1024, stdin);
	buf[strlen(buf)-1] = '\0';		// 去除\n
	client_.connection()->send(buf);
  }

  EventLoop* loop_;
  TcpClient client_;
  Channel stdinChannel_;		// 标准输入Channel
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
  EventLoop loop;
  InetAddress serverAddr("127.0.0.1", 8888);
  TestClient client(&loop, serverAddr);
  client.connect();
  loop.loop();
}

分析这段代码在muduo中的运作流程

我们要想使用muduo中客户端的代码,必须在我们的类中包含两个对象EventLoop* loop_;      TcpClient client_;

在main()函数中执行client.connect(),也就是client_.connect()

void TcpClient::connect()
{
  // FIXME: check state
  LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
           << connector_->serverAddress().toIpPort();
  connect_ = true;
  connector_->start();
}

connector_是Connector的对象,执行Connector中的start()函数

//可以跨线程调用,是线程安全的
void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  {
    connect();
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

紧接就是向客户端发起连接

//发起连接
void Connector::connect()
{
  //创建非阻塞套接字
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  //发起连接
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  //保存错误码
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0:
    case EINPROGRESS://非阻塞套接字,未连接成功返回码是EINPROGRESS表示正在连接
    case EINTR:
    case EISCONN://连接成功
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);//连接成功
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);//不能重连,关闭sockfd
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}

然后将这个发起连接的套接字sockfd加入channel,关注其可写事件,并且设置了可写回调函数 

void Connector::connecting(int sockfd)
{
  setState(kConnecting);
  assert(!channel_);
  //创建Channel与sockfd关联
  channel_.reset(new Channel(loop_, sockfd));
  //设置可写回调函数,这时候如果socket没有错误,sockfd就处于可写状态
  channel_->setWriteCallback(
      boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  //设置错误回调函数
  channel_->setErrorCallback(
      boost::bind(&Connector::handleError, this)); // FIXME: unsafe

  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr
  //让poller关注可写事件
  channel_->enableWriting();
}

当对应通道产生可写事件,回调Connector中的函数handleWrite,回调函数的注册是在上个函数完成的。

在函数handleWrite中,验证连接成功后,就可以不关注sockfd对应的通道了,因为连接已经建立了。然后回调TcpClient中注册的函数connector_->setNewConnectionCallback(boost::bind(&TcpClient::newConnection, this, _1));这是在TcpClient的构造函数中注册的。

void Connector::handleWrite()
{
  LOG_TRACE << "Connector::handleWrite " << state_;

  if (state_ == kConnecting)
  {
    //连接成功,就不需要关注这个channel的可写事件
    //从poller中移除关注,并将channel置空
    int sockfd = removeAndResetChannel();
	//socket可写并不意味着连接一定建立成功
	//还需要用getsockopt(sockfd,SOL_SOCKET,SO_ERROR,...)再次确认一下
    int err = sockets::getSocketError(sockfd);
	//有错误
    if (err)
    {
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
	  //重连
	  retry(sockfd);
    }
	//自连接
    else if (sockets::isSelfConnect(sockfd))
    {
      LOG_WARN << "Connector::handleWrite - Self connect";
      //重连
	  retry(sockfd);
    }
	//连接成功
    else
    {
      setState(kConnected);
      if (connect_)
      {
        //回调
        newConnectionCallback_(sockfd);
      }
      else
      {
        sockets::close(sockfd);
      }
    }
  }
  else
  {
    // what happened?
    assert(state_ == kDisconnected);
  }
}

注意,TcpClient有重连功能,如果在handleWrite()中判断连接不成功,就进入函数retry(sockfd)进行重连。

//重连
//采用back-off策略重连,即重连事件逐渐延长,0.5s,1s,2s...30s
void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);
  if (connect_)
  {
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
	//注册一个定时操作,重连
    loop_->runAfter(retryDelayMs_/1000.0,
                    boost::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

重连的实现就是注册一个定时操作,初始事件是0.5s,如果连接不上,按照2倍的方式递增,最大为30s,注册到定时操作中的函数是Connector::startInLoop,利用connect()函数进行重连。

void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  {
    connect();
  }
  else
  {
    LOG_DEBUG << "do not connect";
  }
}

如果在执行Connector中的函数handleWrite中判断连接成功,则执行TcpClient中的函数newConnection

在这个函数中得到一个TcpConnection对象,并且注册了连接建立回调函数、消息到达回调函数、数据发送完毕回调函数,当TcpConnection中的handleRead()、handleWrite()产生事件之后,就会回调上层注册的这些函数。

//连接成功回调函数
void TcpClient::newConnection(int sockfd)
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  //得到一个TcpConnection对象
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));

  //设置连接建立回调函数
  conn->setConnectionCallback(connectionCallback_);
  //设置消息到达回调函数
  conn->setMessageCallback(messageCallback_);
  //数据发送完毕回调函数
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  //连接关闭回调函数
  conn->setCloseCallback(
      boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
    MutexLockGuard lock(mutex_);
    connection_ = conn;
  }
  conn->connectEstablished();
}

下面的步骤就和服务端类似

当服务端给客户端发送数据之后,相应通道产生可读事件Channel的handleEvent()->TcpConnection的handleRead()->客户端用户注册的消息到达回调函数

当客户端往服务端的消息发送完毕之后,相应的通道产生可写事件Channel的handleWrite()->TcpConnection的handleWrite()->客户端用户注册的消息发送完毕回调函数

对于为啥要设置消息发送回调函数,因为有些大量的应用程序,不断生成数据,然后发送conn->send(),如果对等方接收不及时,受到通告窗口的控制,内核发送缓冲不足,这个时候,就会将用户数据添加到应用层发送缓冲区(output buffer);可能会撑爆output buffer。解决方法是,调整发送频率。关注WriteCompleCallback。所有数据都发送完,WriteCompleCallback回调,然后就可以继续发送了。

猜你喜欢

转载自blog.csdn.net/wk_bjut_edu_cn/article/details/82778423