起初本篇打算介绍raft相关,但是后来发现,还是有必要再深入介绍一下网络模型。
一、基础网络模型
红色框,符合上面经典模型,请求+应答。
蓝色框,只有请求,没有应答。
对于上述蓝色框中请求,为什么没有应答呢?难道不会超时吗?
二、消息流程
2.1 http handler
上一篇介绍到,在使用net/http模块需要用户自定义http handler(相当于http路由),针对不同http请求,定义不同handler。那么对于etcd中和peer相关的handler有哪些呢?在文件rafthttp/transport.go中:
func (t *Transport) Handler() http.Handler { pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) mux := http.NewServeMux() //http 请求路由 mux.Handle(RaftPrefix, pipelineHandler) /* /raft */ mux.Handle(RaftStreamPrefix+"/", streamHandler) /* /raft/stream/ */ mux.Handle(RaftSnapshotPrefix, snapHandler) /* /raft/snapshot */ mux.Handle(ProbingPrefix, probing.NewHandler()) /* /raft/probing */ return mux }
上面所有罗列出的handler并不是所有,只把相关介绍一下,我们只要知道,不同的url会有与之对应handler即可。
2.2 会话建立
对于第一节中,我们以GET /raft/stream/message/8a840eaa4b694be1进行说明,因为这个是最复杂的。2.2.1 报文
首先来看一下,发送http报文,本端Ip为192.63.63.1,远端Ip为192.63.63.30名称 |
含义 |
Host |
服务端ip地址以及端口 |
X-Etcd-Cluster-Id |
集群id,每个etcd节点都会随机生成
扫描二维码关注公众号,回复:
1766635 查看本文章
|
X-Min-Cluster-Version |
集群要求最低版本 |
X-Peerurls |
告诉对端etcd节点,我(本端)监听的peer地址是什么 |
X-Raft-To |
远端etcd节点id |
X-Server-Form |
本端etcd节点id,用于标识唯一etcd节点。与url后面数字一致 |
2.2.2 会话建立
上面的报文,在哪里构造出来?在哪里发出去呢?流程图如下:rafthttp/http.go,会发现是ServeHTTP方法,这个方法在上一篇已经介绍!
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { w.Header().Set("Allow", "GET") http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return } /* * 忽略部分代码,这部分代码主要使用构造http头部信息 */ /* 这个地方需要注意一下,此处并没有包把应答报文发出去,但是具体处理逻辑需要参考net/http中Flush */ w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() /* 构造conn对象 */ c := newCloseNotifier() conn := &outgoingConn{ t: t, /* 连接类型 */ Writer: w, /* reponse writer */ Flusher: w.(http.Flusher), /* reponse flusher */ Closer: c, /* 连接close channel对象 */ } p.attachOutgoingConn(conn) /* 会发streamWriter run中connc操作 用于*/ <-c.closeNotify() /* 等待close channel,若一直没数据可读则阻塞 */ }
通过attach方法,可知会把conn对象写到channel cw.connc中,channel另外一端就在run方法中,下面为run的部分代码片段:
case conn := <-cw.connc: /* 从channel读取conn对象,表示会话已经建立 */ cw.mu.Lock() closed := cw.closeUnlocked() t = conn.t switch conn.t { /* 根据StreamType生成对应的解析器 */ case streamTypeMsgAppV2: enc = newMsgAppV2Encoder(conn.Writer, cw.fs) case streamTypeMessage: enc = &messageEncoder{w: conn.Writer} default: plog.Panicf("unhandled stream type %s", conn.t) } flusher = conn.Flusher /* 用于send消息 等待接收消息 */ unflushed = 0 cw.status.activate() cw.closer = conn.Closer cw.working = true cw.mu.Unlock() if closed { plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = tickc.C, cw.msgc //保存心跳和message的通道
2.3 消息发送
发消息的接口为rafthttp/transport.gotransport.send方法,在介绍raft协议时会介绍如何调用此方法,目前只需要知道此方法用于发送消息即可。func (t *Transport) Send(msgs []raftpb.Message) { for _, m := range msgs { if m.To == 0 { // ignore intentionally dropped message continue } to := types.ID(m.To) /* 将m.To转成type.ID格式 */ /* 以to作为key在map中查找peer对象 */ t.mu.RLock() p, pok := t.peers[to] g, rok := t.remotes[to] t.mu.RUnlock() //存在peer则不去检查remote if pok { if m.Type == raftpb.MsgApp { t.ServerStats.SendAppendReq(m.Size()) } p.send(m) /* 调用peer.go (p *peer) send */ continue } if rok { g.send(m) continue } plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) } } func (p *peer) send(m raftpb.Message) { p.mu.Lock() paused := p.paused p.mu.Unlock() if paused { return } // 如果消息类型是snapshot则返回pipeline,如果是MsgApp则返回msgAppV2Writer,否则返回wirter // wirtec创建是在 writec, name := p.pick(m) select { /* 将消息写入channel中 * 接收端的channel位于stream.go streamWriter.run msgc */ case writec <- m: //写入channel default: p.r.ReportUnreachable(m.To) if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } if p.status.isActive() { plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) } plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) } }
假设返回的writec为streamWriter类型,则上面writec定义在stream.go func (cw *streamWriter)run() ,到了这里会发现在2.2.2节中介绍的会话建立流程也是在这个方法中。
发送消息具体代码如下:
//etcd大部分消息是通过http协议 此处使用的http通道
case m := <-msgc: err := enc.encode(&m) /* 格式化消息,如选举消息 */ if err == nil { unflushed += m.Size() if len(msgc) == 0 || batched > streamBufSize/2 {/*batched批处理 streamBufSize全局变量 4096 */ flusher.Flush() /* 刷新缓冲区,发送到对端。Flush代码为net/http模块 */ sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) unflushed = 0 batched = 0 } else { batched++ } continue /* 发送完成就返回上层 并没有结束会话 */ } cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() /* 表示本次收发消息结束 即http会话结束 */ plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) sentFailures.WithLabelValues(cw.peerID.String()).Inc()
上述代码,有一个很关键的代码--continue。这段代码并不是像我们之前理解http请求一样,收到request之后,做处理并且响应一个reponse,最后关闭http会话。然而这里的做法是,发送一个消息后直接continue,并没有结束会话。难道说就是利用http通道(建立的socket),进行长连接操作吗?(c/s模式)。后来通过抓包,验证了我的想法:
发现一些数据在通过2380这端口发送数据(上图中tcp数据长度是59字节),具体内容wireshark无法解析。
至此,发送流程介绍完毕,下面来看一下接收流程。
2.4 消息接收
在上一篇其实已经介绍了,接收流程,这里再深入介绍一下。etcd中有两个对象:streamReader和streamWriter,通过名字可知,用于读写网络流的。上一小节其实操作就是streamWriter,那么关于接收流程肯定和streamReader相关,流程图如下:
上一篇介绍到在rafthttp/stream.go中的run方法,cr.dial用于建立http会话(对应上述报文中没有响应的http请求),cr.decodeLoop循环等待对端的消息,代码如下:
func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { var dec decoder cr.mu.Lock() //根据stream类型,创建不同解码器 switch t { case streamTypeMsgAppV2: dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) case streamTypeMessage: dec = &messageDecoder{r: rc} default: plog.Panicf("unhandled stream type %s", t) } select { case <-cr.stopc: cr.mu.Unlock() if err := rc.Close(); err != nil { return err } return io.EOF default: cr.closer = rc } cr.mu.Unlock() //死循环 等待消息 for { m, err := dec.decode() //阻塞等待消息 if err != nil { cr.mu.Lock() cr.close() cr.mu.Unlock() return err } receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) cr.mu.Lock() paused := cr.paused cr.mu.Unlock() if paused { continue } if isLinkHeartbeatMessage(&m) { // raft is not interested in link layer // heartbeat message, so we should ignore // it. continue } recvc := cr.recvc if m.Type == raftpb.MsgProp { recvc = cr.propc } select { case recvc <- m: /* 将消息写到channel中 channel另外一段是rafthttp/peer.go startPeer*/ default: if cr.status.isActive() { plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) } plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() } } }
结合流程和相关代码,基本上可以梳理清楚。流图中最后一个方法则进入raft相关处理,对于raft相关内容,后面会有介绍。
三、疑问解答
经过上一节介绍,如下两个问题就有答案了:为什么没有应答?etcd使用http作为通道,说明白点就是使用socket通道,传输数据,并没有完全遵守http协议流程。
难道不会超时吗?首先反问一句,超时不超时是由谁决定?由客户端决定!!客户端在发出请求一段时间内没有收到响应则认为超时,进行超时处理逻辑。但若客户端没有超时处理逻辑呢?那永远都不会超时,所以超时并不是由协议决定而是由业务逻辑决定。
至此所有关于网络模型相关的内容,介绍到这里就算完全结束了,下一篇介绍核心重点之一Raft协议。