从本篇开始,将会有一个系列对Etcd源码进行分析。
我之前阅读过很多开源软件,对阅读开源软件,有如下基本思路:
1、了解该软件相关背景知识,例如相关博客、官网,要相信自己不是第一个分析该软件的人
2、对该软件进行使用,例如:编译、运行或者基于接口进行开发
3、找到该软件的合适切入点进行源码分析,例如网络相关的软件(ovs、etcd)找到socket监听服务、接收消息、发送消息
对于etcd这种分布式存储系统,我们的切入点就是socket服务。
一、配置文件
1.1、配置文件简介
研究一个软件最好的方式就是先把配置文件搞清楚,这个是根基。我们看一下etcd的配置文件,默认存储位置为/etc/etcd/etcd.conf,该配置文件一共有5个section分别是:
名称 |
作用 |
member |
本节点的配置,包括监听服务端口、心跳时间等 |
cluster |
集群配置,包括集群状态、集群名称以及本节点广播地址 |
proxy |
用于网络自动发现服务 |
security |
安全配置 |
logging |
日志功能组件 |
其中配置文件中比较重要的是member和cluster配置项。
1.2、部分参数详细说明
1.2.1、member
在Etcd中一共监听了两个服务端口分别2379和2380,对应在member中体现的如下两个配置项:
名称 |
举例 |
作用 |
ETCD_LISTEN_CLIENT_URLS |
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:2379" |
2379端口用于客户端访问etcd数据库,例如向etcd中插入数据。 |
ETCD_LISTEN_PEER_URLS |
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:2380" |
2380端口用于集群成员间进行通信,例如集群数据同步、心跳。 |
1.2.2、cluster
初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。
名称 |
举例 |
作用 |
ETCD_ADVERTISE_CLIENT_URLS |
ETCD_LISTEN_CLIENT_URLS="http://10.10.10.128:2379" |
同上表 |
ETCD_INITIAL_ADVERTISE_PEER_URLS |
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://10.10.10.128:2380" |
同上表 |
集群相关配置,在后面介绍Raft集群时再进行详细说明。
二、服务监听
从本小节开始介绍详细代码。我们都知道,建立socket服务端一共有5个基本步骤(C语言):创建socket套接字、bind地址及端口、listen监听服务、accept接收客户端连接、启动新线程为客户端服务。正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)。
2.1、总体流程图
从这里开始介绍具体流程以及关键代码,对于数据结构会有专门一篇介绍,如下是从main方法入口函数:
流程图简要说明:
1、方法startEtcdOrProxyV2()中,会根据配置文件启动两种不同模式:默认模式和代理模式。默认模式进入方法startEtcd,代理模式进入方法startProxy。这里介绍默认模式。
2、方法newConfig,从名字上来看就知道,此方法是用于读取配置文件或者生成默认配置,在1.2章节中介绍的配置项就是在此方法中读取。
3、执行完方法serve后,会退回到startEtcd中,然后就阻塞在startEtcd方法中,这样整个etcd启动完毕。
2.2、核心方法embed/etcd.go StartEtcd
此方法内容比较长,分为三部分说明:/* * 构建Etcd结构 包含一个server、listener */ serving := false e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} //etcd结构体 cfg := &e.cfg defer func() {//析构函数 if e == nil || err == nil { return } if !serving { // errored before starting gRPC server for serveCtx.grpcServerC for _, sctx := range e.sctxs { close(sctx.grpcServerC) } } e.Close() e = nil }() if e.Peers, err = startPeerListeners(cfg); err != nil {//为peer创建listener,socket三部曲只到了第二个步骤 return } if e.sctxs, err = startClientListeners(cfg); err != nil {//为client创建listener,socket三部曲只到了第二个步骤 return } for _, sctx := range e.sctxs { e.Clients = append(e.Clients, sctx.l) }
上面完成etcd结构初始化以及listener创建。
// 创建EtcdServer
srvcfg := &etcdserver.ServerConfig{ Name: cfg.Name, ClientURLs: cfg.ACUrls, //客户端url监听地址,2379端口 PeerURLs: cfg.APUrls, //peer url监听地址,2380端口 DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, SnapCount: cfg.SnapCount, MaxSnapFiles: cfg.MaxSnapFiles, MaxWALFiles: cfg.MaxWalFiles, InitialPeerURLsMap: urlsmap, InitialClusterToken: token, DiscoveryURL: cfg.Durl, DiscoveryProxy: cfg.Dproxy, NewCluster: cfg.IsNewCluster(), /* 是否新的集群 */ ForceNewCluster: cfg.ForceNewCluster, PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.ElectionTicks(), AutoCompactionRetention: cfg.AutoCompactionRetention, QuotaBackendBytes: cfg.QuotaBackendBytes, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, AuthToken: cfg.AuthToken, } //创建EtcdServer并且创建raftNode并运行raftNode if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return } // configure peer handlers after rafthttp.Transport started // 生成http.hander 用于处理peer请求 ph := etcdhttp.NewPeerHandler(e.Server) for _, p := range e.Peers { srv := &http.Server{ Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error } l := p.Listener //上一段代码创建的listener p.serve = func() error { return srv.Serve(l) } //回调函数,激活服务,主要是Accept方法 p.close = func(ctx context.Context) error {关闭服务,回调掉函数。即socket关闭时调用此方法 // gracefully shutdown http.Server // close open listeners, idle connections // until context cancel or time-out return srv.Shutdown(ctx) } } 上面handler用于处理peer发过来的请求以及设置回调函数。 // buffer channel so goroutines on closed connections won't wait forever e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs)) //运行EtcdSever 监听服务 e.Server.Start() if err = e.serve(); err != nil {//激活服务,主要调用第二段代码的回调函数serve return } serving = true
下面重点分析一下listerner以及serve回调函数。
2.3、Listener分析
Listener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。
方法startPeerListeners,中主要核心代码,如下:
for i, u := range cfg.LPUrls {//循环遍历多个peer url if u.Scheme == "http" { if !cfg.PeerTLSInfo.Empty() { plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } if cfg.PeerTLSInfo.ClientCertAuth { plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String()) } } /* 构造peerListener对象 监听2380 作为服务端模式 */ peers[i] = &peerListener{close: func(context.Context) error { return nil }} //调用接口,创建listener对象,返回来之后, //socket套接字已经完成listener监听流程 peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo) if err != nil { return nil, err } // once serve, overwrite with 'http.Server.Shutdown' // close回调方法,用于关闭socket套接字 peers[i].close = func(context.Context) error { return peers[i].Listener.Close() } plog.Info("listening for peers on ", u.String()) } func newListener(addr string, scheme string) (net.Listener, error) { if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr return NewUnixListener(addr) } return net.Listen("tcp", addr) //调用golang内置方法,返回listener对象 }
从startPeerListeners到net.Listen整个流程中,有掺杂tls以及unix socket相关逻辑,添加这些只为了保证各种需求,大体流程并没有变化,这里不在对齐进行详细说明。至此,两个服务均已完成监听步骤,下面就是接收对端请求即Accept过程。
三、服务激活
在上面已经介绍了,服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。func (e *Etcd) serve() (err error) { var ctlscfg *tls.Config if !e.cfg.ClientTLSInfo.Empty() { plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo) if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil { return err } } if e.cfg.CorsInfo.String() != "" { plog.Infof("cors = %s", e.cfg.CorsInfo) } // Start the peer server in a goroutine // 为Peer启动协程 for _, pl := range e.Peers { go func(l *peerListener) { // 集群peer 前期已经创建listener,此处将会调用accept, // 那么serve()是什么地方定义的? e.errHandler(l.serve()) }(pl) } // Start a client server goroutine for each listen address // 为Client启动协程 var h http.Handler if e.Config().EnableV2 { h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout()) } else { mux := http.NewServeMux() etcdhttp.HandleBasic(mux, e.Server) h = mux } h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo}) for _, sctx := range e.sctxs { go func(s *serveCtx) { // client 前期已经创建listener,此处将调用accept e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler)) }(sctx) } return nil }那么在l.serve()或者s.serve()在什么地方定义的?在方法StartEtcd中生成http.PeerHandler附近,在第二章节已经有介绍。我们来看一下具体内容:
srv := &http.Server{ Handler: ph, //http handler 用于处理业务逻辑 后面调用handler中ServeHTTP方法 ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error } l := p.Listener p.serve = func() error { return srv.Serve(l) } //激活服务,这地方调用的srv.Serve(l),此方法调用golang内置方法http.Server.
下面简要分析一下http.Server方法:
func (srv *Server) Serve(l net.Listener) error { defer l.Close() if fn := testHookServerServe; fn != nil { fn(srv, l) } var tempDelay time.Duration // how long to sleep on accept failure if err := srv.setupHTTP2_Serve(); err != nil { return err } srv.trackListener(l, true) defer srv.trackListener(l, false) baseCtx := context.Background() // base is always background, per Issue 16220 ctx := context.WithValue(baseCtx, ServerContextKey, srv) for {//死循环 rw, e := l.Accept() //阻塞等待客户端程序连接 .... tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return go c.serve(ctx) //表示启动协程,服务新的连接。 } }
此处的Accept定义在什么地方呢?Accept为golang内置方法,定义在Go/src/net/tcpsock.go,对于Accept内容不再展开,只要知道它会一直阻塞,直到有新的连接才会返回。
这里着重介绍一下http/server.go中serve方法。在serve方法内部主要内容是一个for循环:
for { w, err := c.readRequest(ctx) //接收到对端发送的http请求 if c.r.remain != c.server.initialReadLimitSize() { // If we read any bytes off the wire, we're active. c.setState(c.rwc, StateActive) } ... // Expect 100 Continue support req := w.req if req.expectsContinue() { if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 { // Wrap the Body reader with one that replies on the connection req.Body = &expectContinueReader{readCloser: req.Body, resp: w} } } else if req.Header.get("Expect") != "" { w.sendExpectationFailed() return } c.curReq.Store(w) if requestBodyRemains(req.Body) { registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead) } else { if w.conn.bufr.Buffered() > 0 { w.conn.r.closeNotifyFromPipelinedRequest() } w.conn.r.startBackgroundRead() } // HTTP cannot have multiple simultaneous active requests.[*] // Until the server replies to this request, it can't read another, // so we might as well run the handler in this goroutine. // [*] Not strictly true: HTTP pipelining. We could let them all process // in parallel even if their responses need to be serialized. // But we're not going to implement HTTP pipelining because it // was never deployed in the wild and the answer is HTTP/2. serverHandler{c.server}.ServeHTTP(w, w.req) //处理http请求 w.cancelCtx() if c.hijacked() { return } w.finishRequest() //将缓冲区数据发送到对端,完成http此次请求 ... }
调用c.readRequest(ctx)方法等待对端发来的请求,调用serverHandler{c.server}.ServeHTTP(w,w.req)处理客户端请求并且发送到对端。来看一下ServeHTTP实现:
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) { handler := sh.srv.Handler if handler == nil { handler = DefaultServeMux } if req.RequestURI == "*"&& req.Method == "OPTIONS" { handler = globalOptionsHandler{} } handler.ServeHTTP(rw, req) }
会发现在内部会调用handler.ServeHTTP方法,那么此处的handler在什么地方定义的?ServeHTTP又在哪里定义的?继续看下面章节
四、接收流程
上一章节介绍调用handler.ServeHTTP,可想而知,这个是golang内置的http框架,框架不知道具体业务需求是什么,所以一般场景下handler都是用户自定义的,用户根据不同业务需求来实现不同的handler。对Etcd中存在很多handler,后面会文章进行详细分析。
现在解答一下,handler在什么地方定义以及ServeHTTP定义在什么地方?这个其实在第三章就已经有提到,在创建完listener会创建一个handler,如etcdhttp.NewPeerHandler(e.Server),处理集群节点内消息。
先来看一下创建的handler里面有什么内容?用peerhandler作为例子说明,在构造方法NewPeerHandler中主要调用newPeerHandler方法:func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { mh := &peerMembersHandler{ cluster: cluster, } //将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理。 mux := http.NewServeMux() //初始化一个Serve Multiplexer结构 mux.HandleFunc("/", http.NotFound) mux.Handle(rafthttp.RaftPrefix, raftHandler) /* rafthttp.RaftPrefix == /raft */ mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) mux.Handle(peerMembersPrefix, mh) //处理请求/members handler是mh,即peerMembersHandler if leaseHandler != nil { mux.Handle(leasehttp.LeasePrefix, leaseHandler) /* /leases */ mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) /* /leases/internal */ } mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion)) return mux }
通过上面的代码可知,应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表。那么此处只是完成注册,那么在什么地方会调用此处handler?接下来看一下golang内置方法。
流程图中所有方法均在Go/src/net/http/server.go中。通过流程图可知:
// ServeHTTP dispatches the request to the handler whose // pattern most closely matches the request URL. func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) { if r.RequestURI == "*" { if r.ProtoAtLeast(1, 1) { w.Header().Set("Connection", "close") } w.WriteHeader(StatusBadRequest) return } h, _ := mux.Handler(r) //注册表查找,用户自定义的handler,并且调用ServeHTTP接口,处理该http request请求。 h.ServeHTTP(w, r) }
五、发送流程
对于发送流程,这里打算简单介绍一下,后续在介绍raft协议是会详细说明。
通过之前介绍,ServeHTTP接口是用于处理http request请求入口,每一个handler都必须实现此接口。此接口有两个参数,分别为:ResponseWriter,Request。其中ResponseWriter就是用于响应,http请求,这里用peerMembersHandler作为举例(比较简单),代码如下:
/* 获取集群成员列表 */ func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, "GET") { return } w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) if r.URL.Path != peerMembersPrefix { http.Error(w, "bad path", http.StatusBadRequest) return } ms := h.cluster.Members() //获取集群成员列表 w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(ms); err != nil {//调用json接口,进行格式化并且将数据写到缓冲区中 plog.Warningf("failed to encode members response (%v)", err) } }通过查看Encode代码可知,最后会将会数据写入到ResponseWriter的缓冲区中,即调用Wirter接口,并没有真正发送到对端。那么在什么地方才会真正发出去呢?在golang http源码w.finishRequest(),此处会进行刷新操作,将缓冲区数据发送到对端。在上面已经提交到。
六、总结
至此,介绍Etcd网络通信流程就结束了,下一篇介绍网络模型进阶篇。