channel名称 |
写入 |
读取 |
作用 |
EtcdServer->readych chan struct{} |
|
文件:etcdmain/etcd.go 方法:startEtcd 变量:e.Server.ReadyNotify |
表示加入集群 |
EtcdServer->done |
|
文件:etcdmain/etcd.go 方法:startEtcdOrProxyV2 变量:stopped |
表示退出进程 |
node->tickc |
文件:node.go 方法:Tick() n.tickc <- struct{}{} |
文件:node.go 方法:(n *node) run select case <-n.tick: |
当读取到数据则表示定时器超时。例如:当选举定时器超时后执行回调函数,进行选举 |
node->readyc |
文件:node.go 方法:(n *node) run select case readyc <- rd: |
文件:etcdserver/raft.go 方法:start select case rd := <- r.Ready() |
用leader、candidate发送消息到follower,还有一些其他作用。 |
raftNode->ticker.C |
golang sleep.go sendTime() |
文件:etcdserver/raft.go 方法:start case <-r.ticker.C: |
心跳时间超时 |
包含当前EtcdServer以及监听器 type Etcd struct { Peers []*peerListener /* 集群成员监听器 */ Clients []net.Listener /* 客户端监听器 */ Server *etcdserver.EtcdServer cfg Config stopc chan struct{} errc chan error sctxs map[string]*serveCtx /* 服务上下文件 我理解成session,上面客户端监听器来自此字段 */ closeOnce sync.Once }
// EtcdServer is the production implementation of the Server interface // 实现了Raf Interface接口 type EtcdServer struct { // inflightSnapshots holds count the number of snapshots currently inflight. inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. // consistIndex used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. Cfg *ServerConfig readych chan struct{} /* 表示已经加入集群 */ r raftNode /* 表示集群节点 */ snapCount uint64 w wait.Wait readMu sync.RWMutex // read routine notifies etcd server that it waits for reading by sending an empty struct to // readwaitC readwaitc chan struct{} // readNotifier is used to notify the read routine that it can process the request // when there is no error readNotifier *notifier // stop signals the run goroutine should shutdown. stop chan struct{} // stopping is closed by run goroutine on shutdown. stopping chan struct{} // done is closed when all goroutines from start() complete. // 用于退出进程 done chan struct{} errorc chan error id types.ID attributes membership.Attributes cluster *membership.RaftCluster store store.Store snapshotter *snap.Snapshotter applyV2 ApplierV2 // applyV3 is the applier with auth and quotas applyV3 applierV3 // applyV3Base is the core applier without auth or quotas applyV3Base applierV3 applyWait wait.WaitTime kv mvcc.ConsistentWatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend authStore auth.AuthStore alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats SyncTicker *time.Ticker // compactor is used to auto-compact the KV. compactor *compactor.Periodic // peerRt used to send requests (version, lease) to peers. peerRt http.RoundTripper reqIDGen *idutil.Generator // forceVersionC is used to force the version monitor loop // to detect the cluster version immediately. forceVersionC chan struct{} // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state // to exit when stopping the server. wg sync.WaitGroup // ctx is used for etcd-initiated requests that may need to be canceled // on etcd server shutdown. ctx context.Context cancel context.CancelFunc leadTimeMu sync.RWMutex leadElectedTime time.Time }
type raftNode struct { // Cache of the latest raft index and raft term the server has seen. // These three unit64 fields must be the first elements to keep 64-bit // alignment for atomic access to the fields. index uint64 term uint64 lead uint64 raftNodeConfig /* 匿名组合 */ // a chan to send/receive snapshot msgSnapC chan raftpb.Message // a chan to send out apply applyc chan apply // a chan to send out readState readStateC chan raft.ReadState // utility ticker *time.Ticker // contention detectors(探测器) for raft heartbeat message td *contention.TimeoutDetector stopped chan struct{} done chan struct{} }
type raftNodeConfig struct { // to check if msg receiver is removed from cluster isIDRemoved func(id uint64) bool raft.Node /* 匿名组合 */ raftStorage *raft.MemoryStorage /* 动态存储 内存 */ storage Storage /* 静态存储 磁盘 包括WAL文件和Snapshot文件*/ heartbeat time.Duration // for logging // transport specifies the transport to send and receive msgs to members. // Sending messages MUST NOT block. It is okay to drop messages, since // clients should timeout and reissue their messages. // If transport is nil, server will panic. transport rafthttp.Transporter }
// Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error // ProposeConfChange proposes config change. // At most one ConfChange can be in the process of going through consensus. // Application needs to call ApplyConfChange when applying EntryConfChange type entry. ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. // 接收到消息 进行处理,进行状态机迁移 Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state. // Users of the Node must call Advance after retrieving the state returned by Ready. // // NOTE: No committed entries from the next Ready may be applied until all committed entries // and snapshots from the previous one have finished. Ready() <-chan Ready // Advance notifies the Node that the application has saved progress up to the last Ready. // It prepares the node to return the next available Ready. // // The application should generally call Advance after it applies the entries in last Ready. // // However, as an optimization, the application may call Advance while it is applying the // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. Advance() // ApplyConfChange applies config change to the local node. // Returns an opaque ConfState protobuf which must be recorded // in snapshots. Will never return nil; it returns a pointer only // to match MemoryStorage.Compact. ApplyConfChange(cc pb.ConfChange) *pb.ConfState // TransferLeadership attempts to transfer leadership to the given transferee. TransferLeadership(ctx context.Context, lead, transferee uint64) // ReadIndex request a read state. The read state will be set in the ready. // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be // processed safely. The read state will have the same rctx attached. ReadIndex(ctx context.Context, rctx []byte) error // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) // ReportSnapshot reports the status of the sent snapshot. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() }
集群对象 保存已加入集群成员以及从集群中掉线的 type RaftCluster struct { id types.ID token string //集群唯一标识 store store.Store be backend.Backend sync.Mutex // guards the fields below version *semver.Version members map[types.ID]*Member // removed contains the ids of removed members in the cluster. // removed id cannot be reused. removed map[types.ID]bool }
// node is the canonical implementation of the Node interface type node struct { propc chan pb.Message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready //表示完成 advancec chan struct{} tickc chan struct{} //各种定时器超时 例如:选举定时器,超时后进行选举 done chan struct{} stop chan struct{} status chan chan Status logger Logger }
type raft struct { id uint64 // 集群节点id 唯一标识 Term uint64 //任期 Vote uint64 //可能保存的是id,含义是要为这个id进行投票 readStates []ReadState // the log raftLog *raftLog maxInflight int maxMsgSize uint64 prs map[uint64]*Progress state StateType /* raft角色 */ votes map[uint64]bool /* key -- 对端raft id value -- true表示投票给自己 false表示没有投票*/ msgs []pb.Message /* 消息队列 所有发送消息均保存在这里 */ // the leader id lead uint64 // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 // New configuration is ignored if there exists unapplied configuration. pendingConf bool readOnly *readOnly // number of ticks since it reached last electionTimeout when it is leader // or candidate. // number of ticks since it reached last electionTimeout or received a // valid message from current leader when it is a follower. electionElapsed int // number of ticks since it reached last heartbeatTimeout. // only leader keeps heartbeatElapsed. heartbeatElapsed int checkQuorum bool preVote bool heartbeatTimeout int electionTimeout int // randomizedElectionTimeout is a random number between // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. randomizedElectionTimeout int /* 超时定时器回调函数 例如:选举超时定时器,超时后进行选举,成为leader后变成心跳定时器 */ tick func() step stepFunc logger Logger }
// unstable.entries[i] has raft log position i+unstable.offset. // Note that unstable.offset may be less than the highest log // position in storage; this means that the next write to storage // might need to truncate the log before persisting unstable.entries. //保存未提交的entries //下一个可写位置为 i+unstable.offset type unstable struct { // the incoming unstable snapshot, if any. snapshot *pb.Snapshot // all entries that have not yet been written to storage. entries []pb.Entry offset uint64 logger Logger }
type raftLog struct { // storage contains all stable entries since the last snapshot. // 保存自最后一个snapshot之后所有稳定的entries // MemoryStorage storage Storage // unstable contains all unstable entries and snapshot. // they will be saved into storage. // 未提交的entries,最后会写到Storage,即MemoryStore unstable unstable // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. // 最后一次提交的索引 committed uint64 // applied is the highest log position that the application has // been instructed to apply to its state machine. // Invariant: applied <= committed // 表示应用 已经把entry应用到状态机中 最后一个提交索引,applied始终小于等于committed applied uint64 logger Logger }这篇基本上没有什么技术含量,只是把一些数据结构总结一下,用于方便查找与理解。