版权声明:guojawee https://blog.csdn.net/weixin_36750623/article/details/84639766
Connector用于client向server主动发起连接,并有自动重连的功能
Connector只负责建立socket连接,不负责创建TcpCOnnection(下文中的TcpClient类实现TcpCOnnection的创建)
Connector
- 在非阻塞网络编程中,发起连接的基本方式是调用connect(2),当socket变得可写时表明连接建立完毕,其中要处理各种类型的错误,我们把它封装为Connector class.
- 还有一点就是错误处理,socket可写不一定就是连接建立好了 , 当连接建立出错时,套接口描述符变成既可读又可写,这时我们可以通过调用getsockopt来得到套接口上待处理的错误(SO_ERROR).
- 其次非阻塞网络编程中connect(2)的sockfd是一次性的,一旦出错(比如对方拒绝连接),就无法恢复,只能关闭重来。 但Connector是可以反复使用的, 因此每次尝试连接都要使用新的socket文件描述符和新的Channel对象。要注意的就是Channel的生命期管理了.
代码剖析
只有5个共有接口
void setNewConnectionCallback(const NewConnectionCallback& cb);
void start(); // can be called in any thread
void restart(); // must be called in loop thread
void stop(); // can be called in any thread
const InetAddress& serverAddress() const;
- 成员函数
EventLoop* loop_;
InetAddress serverAddr_; //要连接的服务器端地址
bool connect_; // atomic 表示是否要去建立连接
States state_; // FIXME: use atomic variable
std::unique_ptr<Channel> channel_;//客户端用于通信的sockfd创建的channel_
NewConnectionCallback newConnectionCallback_; //连接建立成功的回调函数
int retryDelayMs_; //重连延迟事件(单位:ms)
- 构造函数
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false), // 现在不去建立连接
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
- start()
第一步:创建非阻塞socket,返回套接口描述符;
第二步:connect(2)开始建立连接;
第三步:根据connect的返回值和errno,判断连接是否成功建立,见下面3中情况 ==>
① 如果connect返回0,表示连接建立成功;如果错误为EINPROGRESS表示连接正在进行,可以等待select()变的可写,再通过getsockopt(___,SO_ERROR)来确定连接是否真正的建立成功。
② EAGAIN、EADDRINUSE、EADDRNOTAVAIL、ECONNREFUSED、ENETUNREACH 像EAGAIN 这类表明本机临时端口暂时用完的错误、可以尝试重连。
③ EACCES、EPERM、EAFNOSUPPORT、EALREADY、EBADF、EFAULT、ENOTSOCK 其他错误像无权限,协议错误等,就直接关闭套接字。
当连接成功建立后,会调用newConnectionCallback_函数,该回调函数可以使用setNewConnectionCallback进行设置
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
重连策略retry:采用bake-off退避策略重连,即重连时间逐渐延长,0.5s,1s,2s,…,直至30s
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
//注册一个定时操作,重连
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}
start()代码实现:
void Connector::start() //public接口
{
connect_ = true; //表示开始向server发起连接请求
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); //FIXME: unsafe
}
void Connector::startInLoop() //private接口
{
loop_->assertInLoopThread(); //断言处于IO线程中
assert(state_ == kDisconnected); //没连接状态
if (connect_)
{
connect(); //向server发起连接的步骤
}
else
{
LOG_DEBUG << "do not connect";
}
}
void Connector::connect()//private接口
{
//创建非阻塞套接字sockfd
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
//
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0: //connect返回0,表示连接建立成功
case EINPROGRESS: //表示连接正在进行,可以等待select()变的可写
case EINTR:
case EISCONN: //连接成功
connecting(sockfd);
break;
case EAGAIN: //表明本机临时端口暂时用完的错误,要关闭socket再延期重试
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd); //重连
break;
case EACCES: //其他错误像无权限,协议错误等,就直接关闭套接字
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
void Connector::connecting(int sockfd) //private接口
{
setState(kConnecting);
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd));
//一旦select()可写,就会调用handleWrite
channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe
channel_->enableWriting(); //关注可写事件
}
void Connector::handleWrite() //private接口
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
//从poller中移除关注,并将channel置空,防止可写造成busy loop
int sockfd = removeAndResetChannel();
//sockfd可写并不意味着连接一定建立成功
//还需要用getsockopt(sockfd,SOL_SOCKET,SO_ERROR,...)再次确认一下
int err = sockets::getSocketError(sockfd);
if (err)//有错误
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd); //重连
}
else if (sockets::isSelfConnect(sockfd))//自连接
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);//重连
}
else //连接成功
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
- restart() // 重启连接
void Connector::restart()
{
loop_->assertInLoopThread();
setState(kDisconnected); //设置连接为断开标志
retryDelayMs_ = kInitRetryDelayMs; //超时时间为0.5s
connect_ = true; //设置去建立连接的标志
startInLoop(); //发起连接
}
- stop()关闭套接字,删除注册的通道,停止连接
void Connector::stop()//停止、关闭连接的sockfd
{
connect_ = false;
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
// FIXME: cancel timer
}
void Connector::stopInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnecting)
{
setState(kDisconnected);
int sockfd = removeAndResetChannel();
retry(sockfd); //这里并非重连,只是调用socket::close(sockfd)
}
}