总体来说,客户端的代码和服务端的代码大体相同,主要区别是一个是发起连接,一个是接受连接。
客户端的代码
#include <muduo/net/Channel.h>
#include <muduo/net/TcpClient.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestClient
{
public:
TestClient(EventLoop* loop, const InetAddress& listenAddr)
: loop_(loop),
client_(loop, listenAddr, "TestClient"),
//标准输入通道对应的文件描述符是0
stdinChannel_(loop, 0)
{
client_.setConnectionCallback(
boost::bind(&TestClient::onConnection, this, _1));
client_.setMessageCallback(
boost::bind(&TestClient::onMessage, this, _1, _2, _3));
//连接已经建立成功,由于某种原因断开而进行的重连
//client_.enableRetry();
// 标准输入缓冲区中有数据的时候,回调TestClient::handleRead
stdinChannel_.setReadCallback(boost::bind(&TestClient::handleRead, this));
stdinChannel_.enableReading(); // 关注可读事件
}
void connect()
{
client_.connect();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp time)
{
string msg(buf->retrieveAllAsString());
printf("onMessage(): recv a message [%s]\n", msg.c_str());
LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toFormattedString();
}
// 标准输入缓冲区中有数据的时候,回调该函数
void handleRead()
{
char buf[1024] = {0};
fgets(buf, 1024, stdin);
buf[strlen(buf)-1] = '\0'; // 去除\n
client_.connection()->send(buf);
}
EventLoop* loop_;
TcpClient client_;
Channel stdinChannel_; // 标准输入Channel
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid() << ", tid = " << CurrentThread::tid();
EventLoop loop;
InetAddress serverAddr("127.0.0.1", 8888);
TestClient client(&loop, serverAddr);
client.connect();
loop.loop();
}
分析这段代码在muduo中的运作流程
我们要想使用muduo中客户端的代码,必须在我们的类中包含两个对象EventLoop* loop_; TcpClient client_;
在main()函数中执行client.connect(),也就是client_.connect()
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;
connector_->start();
}
connector_是Connector的对象,执行Connector中的start()函数
//可以跨线程调用,是线程安全的
void Connector::start()
{
connect_ = true;
loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}
紧接就是向客户端发起连接
//发起连接
void Connector::connect()
{
//创建非阻塞套接字
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
//发起连接
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
//保存错误码
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS://非阻塞套接字,未连接成功返回码是EINPROGRESS表示正在连接
case EINTR:
case EISCONN://连接成功
connecting(sockfd);
break;
case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd);//连接成功
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);//不能重连,关闭sockfd
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
然后将这个发起连接的套接字sockfd加入channel,关注其可写事件,并且设置了可写回调函数
void Connector::connecting(int sockfd)
{
setState(kConnecting);
assert(!channel_);
//创建Channel与sockfd关联
channel_.reset(new Channel(loop_, sockfd));
//设置可写回调函数,这时候如果socket没有错误,sockfd就处于可写状态
channel_->setWriteCallback(
boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe
//设置错误回调函数
channel_->setErrorCallback(
boost::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
//让poller关注可写事件
channel_->enableWriting();
}
当对应通道产生可写事件,回调Connector中的函数handleWrite,回调函数的注册是在上个函数完成的。
在函数handleWrite中,验证连接成功后,就可以不关注sockfd对应的通道了,因为连接已经建立了。然后回调TcpClient中注册的函数connector_->setNewConnectionCallback(boost::bind(&TcpClient::newConnection, this, _1));这是在TcpClient的构造函数中注册的。
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
//连接成功,就不需要关注这个channel的可写事件
//从poller中移除关注,并将channel置空
int sockfd = removeAndResetChannel();
//socket可写并不意味着连接一定建立成功
//还需要用getsockopt(sockfd,SOL_SOCKET,SO_ERROR,...)再次确认一下
int err = sockets::getSocketError(sockfd);
//有错误
if (err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
//重连
retry(sockfd);
}
//自连接
else if (sockets::isSelfConnect(sockfd))
{
LOG_WARN << "Connector::handleWrite - Self connect";
//重连
retry(sockfd);
}
//连接成功
else
{
setState(kConnected);
if (connect_)
{
//回调
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
注意,TcpClient有重连功能,如果在handleWrite()中判断连接不成功,就进入函数retry(sockfd)进行重连。
//重连
//采用back-off策略重连,即重连事件逐渐延长,0.5s,1s,2s...30s
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
//注册一个定时操作,重连
loop_->runAfter(retryDelayMs_/1000.0,
boost::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}
重连的实现就是注册一个定时操作,初始事件是0.5s,如果连接不上,按照2倍的方式递增,最大为30s,注册到定时操作中的函数是Connector::startInLoop,利用connect()函数进行重连。
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}
如果在执行Connector中的函数handleWrite中判断连接成功,则执行TcpClient中的函数newConnection
在这个函数中得到一个TcpConnection对象,并且注册了连接建立回调函数、消息到达回调函数、数据发送完毕回调函数,当TcpConnection中的handleRead()、handleWrite()产生事件之后,就会回调上层注册的这些函数。
//连接成功回调函数
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
//得到一个TcpConnection对象
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
//设置连接建立回调函数
conn->setConnectionCallback(connectionCallback_);
//设置消息到达回调函数
conn->setMessageCallback(messageCallback_);
//数据发送完毕回调函数
conn->setWriteCompleteCallback(writeCompleteCallback_);
//连接关闭回调函数
conn->setCloseCallback(
boost::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}
conn->connectEstablished();
}
下面的步骤就和服务端类似
当服务端给客户端发送数据之后,相应通道产生可读事件Channel的handleEvent()->TcpConnection的handleRead()->客户端用户注册的消息到达回调函数
当客户端往服务端的消息发送完毕之后,相应的通道产生可写事件Channel的handleWrite()->TcpConnection的handleWrite()->客户端用户注册的消息发送完毕回调函数
对于为啥要设置消息发送回调函数,因为有些大量的应用程序,不断生成数据,然后发送conn->send(),如果对等方接收不及时,受到通告窗口的控制,内核发送缓冲不足,这个时候,就会将用户数据添加到应用层发送缓冲区(output buffer);可能会撑爆output buffer。解决方法是,调整发送频率。关注WriteCompleCallback。所有数据都发送完,WriteCompleCallback回调,然后就可以继续发送了。