etcd系列-----集大成者etcdserver 模块

1、raftNode 相关

raftNode是充当raft模块与上层模块之间交互的桥梁
    term ( uint64类型): 当前节点己应用Entry记录的最大任期号,term、 index和lead三个宇段的读写都是原子操作。
    index ( uint64类型): 当前节点中己应用Entry记录的最大索引值。
    lead ( uint64类型):记录当前集群中Leader节点的ID值。
    msgSnapC ( chan raftpb. Message类型):在前面分析中提到, raft模块通过返回Ready实例与上层模块进行交互,其中Ready.Message字段记录了待发送的消息,其中可能会包含MsgSnap类型的消息,该类型消息中封装了需要发送到其他节点的快照数据。当raftNode收到MsgSnap消息之后, 会将其写入msgSnapC通道中,并等待上层模块进行发送。
    applyc ( chan apply类型): 在etcd-raft模块返回的Ready实例中, 除了封装了待持久化的Entry记录和待持久化的快照数据, 还封装了待应用的Entry记录。raftNode会将待应用的记录和快照数据封装成apply实例之后写入applyc通道等待上层模块处理。
    readStateC ( chan raft.ReadState类型):Readyc.ReadStates中封装了只读请求相关的
    ReadState实例,其中的最后一项将会被写入readStateC通道中等待上层模块处理。
    ticker ( *time.Ticker类型):该定时器就是逻辑时钟,每触发一次就会推进一次底层的选举计时器和心跳计时器。
    raftStorage ( *raft. MemoryStorage 类型): 与前面介绍的raftLog.storage 字段指向的
    MemoryStorage为同一实例,主要用来保存持久化的Entry记录和快照数据。
    storage ( etcdserver.Storage类型):注意该字段的类型,在raft模块中有一个与之同名的接口(raft.Storage接口),  MemoryStorage就是raft.Storage接口的实现之一。

type Storage interface { 
    //Save ()方法负责将Entry记录和HardState状态信息保存到底层的持久化存储上, 该方法可能会阻塞,Storage接口的实现是通过WAL模块将上述数据持久化到WAL日志文件中的
    Save(st raftpb.HardState,  ents []raftpb.Entry)  error 
    //SaveSnap ()方法负责将快照数据持久到底层的持久化存储上,该方法也可能会阻塞, Storage接口的实现是使用之前介绍的Snapshotter将快照数据保存到快照文件中的
    SaveSnap(snap raftpb.Snapshot)  error 
}

通过raftNode.start()方法启动相关服务, 在该方法中会启动一个独立的后台goroutine, 在该后台goroutine中完成了绝大部分与底层raft模块交互的功能

func (r *raftNode) start(rh *raftReadyHandler) {
	internalTimeout := time.Second

	go func() {
		defer r.onStop()
		islead := false

		for {
			select {
			case <-r.ticker.C://计时器到期被触发,调用Tick()方法才住进选举计时器和心跳计时器
				r.tick()
			case rd := <-r.Ready():
			//Ready实例的处理,处理完调用raft.node.Advance()方法,通知raft模块此次Ready处理完成,raft模块更新相应信息(例如,己应用Entry的最大索引值)之后,可以继续返回Ready实例
			

				r.Advance()
			case <-r.stopped:
				return
			}
		}
	}()
}

raftNode对Ready实例中各个字段的处理:
softstate:1、更新raftNode.lead字段。2、根据leader节点的变化情况调用updateLeadership()回调函数
readStates:readStateC通道
CommittedEntries:封装成apply实例,送入applyc通道
Snapshot:1、封装成apply实例,送入applyc通道。2、将快照数据保存到本地盘。3、保存到MemoryStorage中
Messages:1、目标节点不存在的,踢除。2、如果有多条msgAppresp消息,只保留最后一条。3、如果有msgSnap消息,送入raftNode.msgSnapC中。
Entries:保存到MemoryStorage中

SoftState处理:

//在SoftState中封装了当前集群的Leader信息和当前节点角色
if rd.SoftState != nil {
    //检测集群的Leader节点是否发生变化,并记录相关监控信息
	newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
	if newLeader {
		leaderChanges.Inc()
	}

	if rd.SoftState.Lead == raft.None {
		hasLeader.Set(0)
	} else {
		hasLeader.Set(1)
	}
    //更新raftNode.lead字段,将其更新为新的Leader节点ID
	atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
	islead = rd.RaftState == raft.StateLeader
	if islead {
		isLeader.Set(1)
	} else {
		isLeader.Set(0)
	}
	rh.updateLeadership(newLeader)//调用raftReadyHandler中的updateLeadership()回调
	r.td.Reset()
}

待应用的Entry记录处理:

notifyc := make(chan struct{}, 1)
ap := apply{    //将Ready实例中的待应用Entry记录以及快照数据封装成apply实例,其中封装了notifyc通道, 该通道用来协调当前goroutine和EtcdServer启动的后台goroutine的执行
	entries:  rd.CommittedEntries,//已提交、待应用的Entry记录
	snapshot: rd.Snapshot,//待持久化的快照数据
	notifyc:  notifyc,
}

updateCommittedIndex(&ap, rh)//更新EtcdServer中记录的己提交位置(EtcdServer. committedindex字段)

select {
case r.applyc <- ap://将apply实例写入applyc通道中,等待上层应用读取并进行处理
case <-r.stopped:
	return
}

如果当前节点处于Leader状态,则raftNode.start()方法会先调用raftNode.processMessages() 方法对待发送的消息进行过滤, 然后调用rafNode.transport.Send()方法完成消息的发送

if islead { 
    r.transport.Send{r.processMessages(rd.Messages)) 
}

raftNode.processMessages() 方法处理待发送消息的逻辑比较清晰,它首先会对消息进行过滤,去除目标节点己被移出集群的消息,然后分别过滤MsgAppResp消息、MsgSnap消息和MsgHeartbeat消息

func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
	sentAppResp := false
	for i := len(ms) - 1; i >= 0; i-- {//从后向前边历全部待发送的消息
		if r.isIDRemoved(ms[i].To) {//消息的目标节点已从集群中移除,将消息的目标节点ID设立为0
			ms[i].To = 0
		}
        //只会发送最后一条MsgAppResp消息,通过前面对raft模块的分析可知,没有必妥同时发送多条MsgAppResp消息
		if ms[i].Type == raftpb.MsgAppResp {
			if sentAppResp {
				ms[i].To = 0
			} else {
				sentAppResp = true
			}
		}

		if ms[i].Type == raftpb.MsgSnap {//对MsgSnap消息的处理
			// current store snapshot and KV snapshot.
			select {
			case r.msgSnapC <- ms[i]://将MsgSnap消息写入msgSnapC远远中
			default://如采msgSηape通道的缓冲区满了, 则放弃此次快照的发送
				// drop msgSnap if the inflight chan if full.
			}
			ms[i].To = 0//将目标节点设豆为0,如l rafNode. transport后续不会发送该消息
		}
		if ms[i].Type == raftpb.MsgHeartbeat {//对MsgHeartbeat类型的消息
			ok, exceed := r.td.Observe(ms[i].To)
			if !ok {
				// TODO: limit request rate.
				plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
				plog.Warningf("server is likely overloaded")
				heartbeatSendFailures.Inc()
			}
		}
	}
	return ms
}

raftNode对Ready中待持久化的Entry记录,以及快照数据的处理,相关代码

//通过raftNode. storage将Ready实例中携带的HardState信息和待持久化的Entry记录写入WAL日志文件中
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
	plog.Fatalf("raft save state and entries error: %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {//根据HardState信息, 记录相关的监控信息
	proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
	// gofail: var raftBeforeSaveSnap struct{}
	if err := r.storage.SaveSnap(rd.Snapshot); err != nil {//通过raftNode.storage将Ready实例中携带的快照数据保存到磁盘中
		plog.Fatalf("raft save snapshot error: %v", err)
	}
	// 在后面介绍的EtcdServer中会启动后台goroutine读取前面介绍的applye遥遥,并处理apply中封装快照数据。 这里使用notifyc迢迢通知该后台goroutine,该apply实例中的快照数据已经被持久化到磁盘,后台goroutine可以开始应用该快照数据了
	notifyc <- struct{}{}

	// 将快照数据保存到MemoryStorage中
	r.raftStorage.ApplySnapshot(rd.Snapshot)
	plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
	// gofail: var raftAfterApplySnap struct{}
}
r.raftStorage.Append(rd.Entries)//将待持久化的Entry记录写入MemoryStorage中

if !islead {//与Leader节点的处理逻辑类似
	// finish processing incoming messages before we signal raftdone chan
	msgs := r.processMessages(rd.Messages)

	// 处理Ready实例的过程基本结束,这里会通知EtcdServer启动的后台goroutine,检测是否生成快照
	notifyc <- struct{}{}

	waitApply := false
	for _, ent := range rd.CommittedEntries {
		if ent.Type == raftpb.EntryConfChange {
			waitApply = true
			break
		}
	}
	if waitApply {
		// blocks until 'applyAll' calls 'applyWait.Trigger'
		// to be in sync with scheduled config-change job
		// (assume notifyc has cap of 1)
		select {
		case notifyc <- struct{}{}:
		case <-r.stopped:
			return
		}
	}

	// gofail: var raftBeforeFollowerSend struct{}
	r.transport.Send(msgs)//发送消息
} else {
	// 处理Ready实例的过程基本结束,这里会通知EtcdServer启动的后台goroutine,检测是否生成快照
	notifyc <- struct{}{}
}

Storage接口的具体实现,etcdserver模块提供了一个storage结构体,其中内嵌了前面介绍的WAL和Snapshotter

type storage struct { 
    *wal.WAL 
    *snap.Snapshotter
}

HardState及待持久化Entry记录时调用的storage.Save()方法,实际就是前面介绍的WAL.Save()方法。 storage中重新实现了SaveSnap()方法

func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
	walsnap := walpb.Snapshot{
		Index: snap.Metadata.Index,
		Term:  snap.Metadata.Term,
	}
	//将walpb.Snapshot实例封装成Record记录写入WAL日志文件中
	err := st.WAL.SaveSnapshot(walsnap)
	if err != nil {
		return err
	}
	//通过Snapshotter将快照数据写入到破盘
	err = st.Snapshotter.SaveSnap(snap)
	if err != nil {
		return err
	}
	//根据WAL日志文件的名称及快照的元数据,将放快照之前的WAL日志文件句柄
	return st.WAL.ReleaseLockTo(snap.Metadata.Index)
}

2、etcdServer 相关

type Server interface {
    Start()  //读取自己豆文件,启动当前Server实例
    Stop()  //关闭当前Server实例
    ID()  types.ID //获取当前Server实例的ID
    Leader() types.ID //获取当前集群中的Leader的ID
    Do (ctx context. Context,  r  pb. Request) (Response,  error) //处理Client请求
    Process(ctx context.Context,  m raftpb.Message) error //处理Raft请求
    AddMember (ctx co口text. Context,  memb membership.Member) ( []*membership. Member,  error) //向当前etcd集群中添加一个节点
    RemoveMember (ctx context. Context,  id uint64)  ( []*membership .Member,  error) //从当前etcd集群中删除一个节点
    UpdateMember(ctx context.Context,  updateMemb membership.Member) ( []*membership. Member,  error) //修改集群成员属性,如采成员ID不存在则返回错误
}

结构体EtcdServer 中核心字段的含义
    appliedlndex ( uint64类型): 当前节点己应用的Entry记录的最大索引值。
    committedlndex ( uint64类型): 当前己提交的Entry记录的索引值
    readych ( chan struct{}类型): 当前节点将自身的信息推送到集群中其他节点之后,会将该通道关闭,也作为当前EtcdServer实例,可以对外提供服务的一个信号。
    r(raftNode 类型): 即前面介绍的etcdserver.raftNode,它是EtcdServer 实例与底层raft模块通信的桥梁。
    snapCount( uint64类型):当前EtcdServer实例每应用snapCount条数的Entry记录,就会触发一次生成快照的操作
    id  ( types.I D类型):记录当前节点的ID。
    cluster ( *membership.RaftCluster 类型):记录当前集群中全部节点的信息。
    store ( store.Store类型):前面介绍的etcdv2版本存储
    applyV2 ( Applierv2类型): Applierv2接口主要功能是应用v2版本的Entry记录
    applyV3、 applyV3Base( applierv3类型): applierV3接口主要功能是应用v3版本的Entry记录
    be ( backend. Backend类型): v3版本的后端存储
    kv ( mvcc.ConsistentWatchableKV类型): etcd v3版本的存储
    compactor ( *compactor. Periodic类型):Leader节点会对存储进行定期压缩,该字段用于控制定期压缩的频率。

2.1 初始化

    (1)定义初始化过程中使用的变量,创建当前节点使用的目录
    (2)根据配置项初始化etcd-ra企模块使用到的相关组件,例如,检测当前wal 目录下是否存在WAL日志文件、初始化v2 存储、查找BoltDB 数据库文件、创建Backend 实例、创建RoundTripper实例等
    (3)根据前面对WAL日志文件的查找结果及当前节点启动时的配置信息,初始化raft模块中的Node实例
    (4)创建EtcdServer实例,并初始化其各个字段

2.2 启动

func (s *EtcdServer) Start() {
	s.start()//其中会启动一个后台goroutine, 执行EtcdServer.run( )方法
	s.goAttach(func() { s.adjustTicks() })//启动一个后台goroutine,将当前节点的相关信息发送到集群其他节点
	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })//启动一个后台goroutine,定义清理WAL日志文件和快照文件
	s.goAttach(s.purgeFile)
	s.goAttach(func() { monitorFileDescriptor(s.stopping) })
	s.goAttach(s.monitorVersions)
	s.goAttach(s.linearizableReadLoop)
	s.goAttach(s.monitorKVHash)
}

start

run()方法是EtcdServer启动的核心,其中会启动前面介绍的etcdserver.raftNode实例,然后处理raft模块返回的Ready实例.

func (s *EtcdServer) run() {
	sn, err := s.r.raftStorage.Snapshot()
	if err != nil {
		plog.Panicf("get snapshot from raft storage error: %v", err)
	}

	// asynchronously accept apply packets, dispatch progress in-order
	sched := schedule.NewFIFOScheduler()

	var (
		smu   sync.RWMutex
		syncC <-chan time.Time
	)
	setSyncC := func(ch <-chan time.Time) {
		smu.Lock()
		syncC = ch
		smu.Unlock()
	}
	getSyncC := func() (ch <-chan time.Time) {
		smu.RLock()
		ch = syncC
		smu.RUnlock()
		return
	}
	rh := &raftReadyHandler{
	//raftNode在处理raft模块返回的Ready.SoftState字段时,会调用raftReadyHandler.updateLeadership()回调函数, 其中会根据当前节点的状态和Leader节点是否发生变化完成一些相应的操作
		updateLeadership: func(newLeader bool) {
			if !s.isLeader() {
				if s.lessor != nil {
					s.lessor.Demote()
				}
				if s.compactor != nil {
					s.compactor.Pause()//非Leader节点暂停自动压缩
				}
				setSyncC(nil)//非Leader节点不会发送SYNC消息
			} else {
				if newLeader {//如果发生Leader节点的切换,且当前节点成为Leader节点,则初始化leadElectedTime字段,该字段记录了当前节点最近一次成为Leader节点的时间
					t := time.Now()
					s.leadTimeMu.Lock()
					s.leadElectedTime = t
					s.leadTimeMu.Unlock()
				}
				setSyncC(s.SyncTicker.C)//Leader节点会定期发送SYNC消息,恢复该定时器
				if s.compactor != nil {
					s.compactor.Resume()//重启自动压缩的功能
				}
			}

			// TODO: remove the nil checking
			// current test utility does not provide the stats
			if s.stats != nil {
				s.stats.BecomeLeader()
			}
		},
		//在raftNode处理apply实例时会调用updateCommittedindex()函数,该函数会根据apply实例中封装的待应用Entry记录和快照数据确定当前的committedindex值, 然后调用raftReadyHandler中的同名回调函数更新EtcdServer.committedindex字段位
		updateCommittedIndex: func(ci uint64) {
			cci := s.getCommittedIndex()
			if ci > cci {
				s.setCommittedIndex(ci)
			}
		},
	}
	//启动raftNode,其中会启动后台goroutine处理raft模块返回的Ready实例,前面已经介绍
	s.r.start(rh)

	//记录当前快照相关的元数据信息和己应用Entry记录的位置信息
	ep := etcdProgress{
		confState: sn.Metadata.ConfState,
		snapi:     sn.Metadata.Index,
		appliedt:  sn.Metadata.Term,
		appliedi:  sn.Metadata.Index,
	}

	defer func() {
		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
		close(s.stopping)
		s.wgMu.Unlock()
		s.cancel()

		sched.Stop()

		// wait for gouroutines before closing raft so wal stays open
		s.wg.Wait()

		s.SyncTicker.Stop()

		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
		// by adding a peer after raft stops the transport
		s.r.stop()

		// kv, lessor and backend can be nil if running without v3 enabled
		// or running unit tests.
		if s.lessor != nil {
			s.lessor.Stop()
		}
		if s.kv != nil {
			s.kv.Close()
		}
		if s.authStore != nil {
			s.authStore.Close()
		}
		if s.be != nil {
			s.be.Close()
		}
		if s.compactor != nil {
			s.compactor.Stop()
		}
		close(s.done)
	}()

	var expiredLeaseC <-chan []*lease.Lease
	if s.lessor != nil {
		expiredLeaseC = s.lessor.ExpiredLeasesC()
	}

	for {
		select {
		case ap := <-s.r.apply()://读取raftNode.applyc通过中的apply实例并进行处理
			f := func(context.Context) { s.applyAll(&ep, &ap) }
			sched.Schedule(f)
		case leases := <-expiredLeaseC:
			s.goAttach(func() {
				// Increases throughput of expired leases deletion process through parallelization
				c := make(chan struct{}, maxPendingRevokes)
				for _, lease := range leases {
					select {
					case c <- struct{}{}:
					case <-s.stopping:
						return
					}
					lid := lease.ID
					s.goAttach(func() {
						ctx := s.authStore.WithRoot(s.ctx)
						_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
						if lerr == nil {
							leaseExpired.Inc()
						} else {
							plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
						}

						<-c
					})
				}
			})
		case err := <-s.errorc:
			plog.Errorf("%s", err)
			plog.Infof("the data-dir used by this member must be removed.")
			return
		case <-getSyncC()://定时发送SYNC消息
			if s.store.HasTTLKeys() {
				s.sync(s.Cfg.ReqTimeout())
			}
		case <-s.stop:
			return
		}
	}
}

purgeFile

在EtcdServer.Start()方法中会启动两个后台goroutine,其中一个后台goroutine负责定期清理WAL日志文件,另一个后台goroutine负责定期清理快照文件,相应的逻辑位于EtcdServer.purgeFile()方法中,具体实现如下:

func (s *EtcdServer) purgeFile() {
	var dberrc, serrc, werrc <-chan error
	if s.Cfg.MaxSnapFiles > 0 {
	   //这里会启动后台goroutine,定期清理快照文件(默认purgeFileinterval的值为30s)
		dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
		serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
	}
	if s.Cfg.MaxWALFiles > 0 {
	    //启动一个后台goroutine,定期清理WAL日志文件(默认purgeFileinterval的位为30s)
		werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
	}
	select {
	case e := <-dberrc:
		plog.Fatalf("failed to purge snap db file %v", e)
	case e := <-serrc:
		plog.Fatalf("failed to purge snap file %v", e)
	case e := <-werrc:
		plog.Fatalf("failed to purge wal file %v", e)
	case <-s.stopping:
		return
	}
}

apply

rungoroutine会监听raftNode.applyc通道,并调用EtcdServer.applyAll()方法处理从中读取到的apply实例。在apply实例中封装了待应用的Entry记录、待应用的快照数据和notifyc通道.在EtcdServer.applyAll()方法中,首先会调用EtcdServer.applySnapshot()方法处理apply实例中的快照数据。 EtcdServer.applySnapshot()方法会先等待raftNode将快照数据持久化到磁盘中,之后根据快照元数据查找BoltDB数据库文件并重建Backend实例, 最后根据重建后的存储更新本地RaftCluster实例

func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
	if raft.IsEmptySnap(apply.snapshot) {//检测待应用的快炜、数据是否为空, 如果为空则直接返回
		return
	}

	plog.Infof("applying snapshot at index %d...", ep.snapi)
	defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)

	if apply.snapshot.Metadata.Index <= ep.appliedi {//如采该快照中最后一条Entry的索引小于当前节点己应用Entry索引,则异常结束
		plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
			apply.snapshot.Metadata.Index, ep.appliedi)
	}

	// raftNode在将快照数据写入磁盘文件之后,会向notifyc通道中写入一个空结构体作为信号,这里会阻塞等待该信号
	<-apply.notifyc
    //根据快照信息查找对应的BoltDB数据库文件,并创建新的Backend实例
	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
	if err != nil {
		plog.Panic(err)
	}

	// 因为在store.restore()方法中除了恢复内存索引,还会重新绑定键值对与对应的Lease,所以需先恢复EtcdServer.lessor,再恢复EtcdServer.kv字段
	if s.lessor != nil {
		plog.Info("recovering lessor...")
		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
		plog.Info("finished recovering lessor")
	}

	plog.Info("restoring mvcc store...")

	if err := s.kv.Restore(newbe); err != nil {
		plog.Panicf("restore KV error: %v", err)
	}
	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())// 重置EtcdServer.consistindex字段

	plog.Info("finished restoring mvcc store")

	// Closing old backend might block until all the txns
	// on the backend are finished.
	// We do not want to wait on closing the old backend.
	s.bemu.Lock()
	oldbe := s.be
	go func() {
		plog.Info("closing old backend...")
		defer plog.Info("finished closing old backend")
        //因为此时可能还有事务在执行,关闭旧Backend实例可能会被阻塞,所以这里启动一个后台goroutine用来关闭Backend实例
		if err := oldbe.Close(); err != nil {
			plog.Panicf("close backend error: %v", err)
		}
	}()

	s.be = newbe
	s.bemu.Unlock()

	plog.Info("recovering alarms...")
	if err := s.restoreAlarms(); err != nil {//恢复EtcdServer中的alarmStore和authStore,它们分别对应BoltDB中的alarm Bucket 和auth Bukcet
		plog.Panicf("restore alarms error: %v", err)
	}
	plog.Info("finished recovering alarms")

	if s.authStore != nil {
		plog.Info("recovering auth store...")
		s.authStore.Recover(newbe)
		plog.Info("finished recovering auth store")
	}

	plog.Info("recovering store v2...")
	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
		plog.Panicf("recovery store error: %v", err)
	}
	plog.Info("finished recovering store v2")

	s.cluster.SetBackend(s.be)
	plog.Info("recovering cluster configuration...")
	s.cluster.Recover(api.UpdateCapability)
	plog.Info("finished recovering cluster configuration")

	plog.Info("removing old peers from network...")
	// recover raft transport
	s.r.transport.RemoveAllPeers()
	plog.Info("finished removing old peers from network")

	plog.Info("adding peers from new cluster configuration into network...")
	for _, m := range s.cluster.Members() {
		if m.ID == s.ID() {
			continue
		}
		s.r.transport.AddPeer(m.ID, m.PeerURLs)
	}
	plog.Info("finished adding peers from new cluster configuration into network...")
    //更新etcdProgress,其中涉及已应用Entry记录的Term值、 Index值和快照相关信息
	ep.appliedt = apply.snapshot.Metadata.Term
	ep.appliedi = apply.snapshot.Metadata.Index
	ep.snapi = ep.appliedi
	ep.confState = apply.snapshot.Metadata.ConfState
}

应用完快照数据之后, run goroutine紧接着会调用EtcdServer.applyEntries()方法处理待应用的Entry记录

func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
	if len(apply.entries) == 0 {//检测是否存在待应用的Entry记录,如果为空直接返回
		return
	}
	firsti := apply.entries[0].Index
	if firsti > ep.appliedi+1 {//检测待应用的第一条Entry记录是否合法
		plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
	}
	var ents []raftpb.Entry
	if ep.appliedi+1-firsti < uint64(len(apply.entries)) {//忽略己应用的Entry记录,只留未应用的Entry记录
		ents = apply.entries[ep.appliedi+1-firsti:]
	}
	if len(ents) == 0 {
		return
	}
	var shouldstop bool
	//调用apply()方法应用ents中的Entry记录
	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
	}
}

在apply()方法中会遍历ents 中的全部Entry记录,并根据Entry的类型进行不同的处理。

func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
	for i := range es {//遥历待应用的Entry记录
		e := es[i]
		switch e.Type {//根据Entry记录的不同类型,进行不同的处理
		case raftpb.EntryNormal:
			s.applyEntryNormal(&e)
		case raftpb.EntryConfChange:
			// set the consistent index of current executing entry
			if e.Index > s.consistIndex.ConsistentIndex() {
				s.consistIndex.setConsistentIndex(e.Index)
			}
			var cc raftpb.ConfChange
			pbutil.MustUnmarshal(&cc, e.Data)
			removedSelf, err := s.applyConfChange(cc, confState)
			s.setAppliedIndex(e.Index)
			shouldStop = shouldStop || removedSelf
			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
		default:
			plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
		}
		atomic.StoreUint64(&s.r.index, e.Index)
		atomic.StoreUint64(&s.r.term, e.Term)
		appliedt = e.Term
		appliedi = e.Index
	}
	return appliedt, appliedi, shouldStop
}

applyEntryNormal()方法处理EntryNormal 记录的具体过程。applyEntryNormal()方法首先会尝试将Entry.Data反序列化成IntemalRaftRequest实例, 如果失败,则将其反序列化成etcdserverpb.Request实例,之后根据反序列化的结果调用EtcdSever的相应方法进行处理, 最后将处理结果写入Entry对应的通道中

func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
	shouldApplyV3 := false
	if e.Index > s.consistIndex.ConsistentIndex() {
		// set the consistent index of current executing entry
		s.consistIndex.setConsistentIndex(e.Index)//更新EtcdServer. consistindex记录的索引位
		shouldApplyV3 = true
	}
	defer s.setAppliedIndex(e.Index)//方法结束时更新EtcdServer.appliedindex字段记录的索引值

	// raft state machine may generate noop entry when leader confirmation.
	// skip it in advance to avoid some potential bug in the future
	if len(e.Data) == 0 {//空的Entry记录只会在Leader选举结束时出现
		select {
		case s.forceVersionC <- struct{}{}:
		default:
		}
		// promote lessor when the local member is leader and finished
		// applying all entries from the last term.
		if s.isLeader() {//如果当前节点为Leader,则晋升其lessor实例
			s.lessor.Promote(s.Cfg.electionTimeout())
		}
		return
	}

	var raftReq pb.InternalRaftRequest
	if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // 尝试将Entry.Data反序列化成InternalRaftRequest实例, InternalRaftRequest中封装了所有类型的Client请求
		var r pb.Request
		rp := &r
		//兼容性处理, 如采上述序列化失败,则将Entry.Date反序列化成pb.Request
		pbutil.MustUnmarshal(rp, e.Data)
		s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))//调用EtcdServer.applyV2Request()方法进行处理
		return
	}
	if raftReq.V2 != nil {
		req := (*RequestV2)(raftReq.V2)
		s.w.Trigger(req.ID, s.applyV2Request(req))
		return
	}

	// do not re-apply applied entries.
	if !shouldApplyV3 {
		return
	}
    //下面是对v3版本请求的处理
	id := raftReq.ID
	if id == 0 {
		id = raftReq.Header.ID
	}

	var ar *applyResult
	needResult := s.w.IsRegistered(id)
	if needResult || !noSideEffect(&raftReq) {
		if !needResult && raftReq.Txn != nil {
			removeNeedlessRangeReqs(raftReq.Txn)
		}
		//调用applyV3.Apply ()方法处理该Entry,其中会根据请求的类型选择不同的方法进行处理
		ar = s.applyV3.Apply(&raftReq)
	}

	if ar == nil {
		return
	}
//返回结采ar(applyResult类型)为nil,直接返回,如采返回了ErrNoSpace错误,则表示底层的Backend已经没有足够的空间,如是第一次出现这种情
//况,则在后面立即启动一个后台goroutine,并调用EtcdServer.raftRequest()方法发送AlarmRequest请求,当前其他节点收到该请求时, 会停止后续的PUT操作
	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
		s.w.Trigger(id, ar)//将上述处理结果写入对应的通道中, 然后将对应通道关闭
		return
	}

	plog.Errorf("applying raft message exceeded backend quota")
	s.goAttach(func() {//第一次出现ErrNoSpace错误
		a := &pb.AlarmRequest{//创建AlarmRequest
			MemberID: uint64(s.ID()),
			Action:   pb.AlarmRequest_ACTIVATE,
			Alarm:    pb.AlarmType_NOSPACE,
		}
		s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})//将AlarmRequest请求封装成MsgProp消息,发送到集群
		s.w.Trigger(id, ar)//将上述处理结果写入对应的通道中,然后将对应通过关闭
	})
}

EtcdServer.applyAll()方法中首先调用applySnapshot()方法处理apply实例中记录的快照数据,然后调用applyEntries()方法处理apply实例中的En町记录,之后根据apply实例的处理结果检测是否需要生成新的快照文件, 最后处理MsgSnap消息

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
	s.applySnapshot(ep, apply)//调用applySnapshot()方法处理apply实例中记录的快照数据
	s.applyEntries(ep, apply)//调用applyEntries()方法处理apply实例中的Entry记录

	proposalsApplied.Set(float64(ep.appliedi))
	s.applyWait.Trigger(ep.appliedi)//在前面的说到,etcdProgress.appliedi记录了已应用Entry的索引值。 这里通过调用WaitTirne.Trigger()方法, 将id小于etcdProgress.appliedi的Entry对应的通道全部关闭,这样就可以通知其他监听通道的goroutine
	// 回顾前面对raftNode的分析, 当Ready处理基本完成时, 会向notifyc通道中写入一个信号,通知当前goroutine去检测是否需要生成快照
	<-apply.notifyc

	s.triggerSnapshot(ep)//根据当前状态决定是否触发快照的生成
	select {
	// 在raftNode中处理Ready实例时,如果并没有直接发送MsgSnap消息,而是将其写入rnsgSnapC中,这里会读取rnsgSnapC通过,并完成快照数据的发送
	case m := <-s.r.msgSnapC:
	    //将v2存储的快照数据和v3存储的数据合并成完整的快照、数据
		merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
		s.sendMergedSnap(merged)//发送快照数据
	default:
	}
}

triggerSnapshot()方法对是否需要生成新快照文件的判定

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
	if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {//连续应用一定量的Entry记录,会触发快照的生成(snapCount默认为100000条)
		return
	}

	plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
	s.snapshot(ep.appliedi, ep.confState)//创建新的快照文件
	ep.snapi = ep.appliedi//更新etcdProgress.snapi
}

snapshot()方法是真正生成快照文件的地方,其中会启动一个单独的后台goroutine来完成新快照文件的生成,主要是序列化v2存储中的数据并持久化到文件中,触发相应的压缩操作

func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
	clone := s.store.Clone()//复制v2存储
	// commit kv to write metadata (for example: consistent index) to disk.
	// KV().commit() updates the consistent index in backend.
	// All operations that update consistent index must be called sequentially
	// from applyAll function.
	// So KV().Commit() cannot run in parallel with apply. It has to be called outside
	// the go routine created below.
	s.KV().Commit()//提交v3存储中当前等待读写事务------即将数据写入db文件

	s.goAttach(func() {
		d, err := clone.SaveNoCopy()//将v2存储序列化成JSON数据
		if err != nil {
			plog.Panicf("store save should never fail: %v", err)
		}
		//将上述快照数据和元数据更新到raft模块中的MemoryStorage中,并且返回Snapshot实例(即MemoryStorage.snapshot字段)
		snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
		if err != nil {
			// the snapshot was done asynchronously with the progress of raft.
			// raft might have already got a newer snapshot.
			if err == raft.ErrSnapOutOfDate {
				return
			}
			plog.Panicf("unexpected create snapshot error %v", err)
		}
		// 将v2存储的快照数据记录到磁盘中,该过程涉及在WAL日志文件中记录快照元数据及写入snap文件等操作, 
		if err = s.r.storage.SaveSnap(snap); err != nil {
			plog.Fatalf("save snapshot error: %v", err)
		}
		plog.Infof("saved snapshot at index %d", snap.Metadata.Index)

		if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
			plog.Infof("skip compaction since there is an inflight snapshot")
			return
		}

		// 为了防止集群中存在比较慢的Follower节点,保留5000条Entry记录不压缩
		compacti := uint64(1)
		if snapi > numberOfCatchUpEntries {
			compacti = snapi - numberOfCatchUpEntries
		}
		//压缩MemoryStorage中的指定位置之前的全部Entry记录
		err = s.r.raftStorage.Compact(compacti)
		if err != nil {
			// the compaction was done asynchronously with the progress of raft.
			// raft log might already been compact.
			if err == raft.ErrCompacted {
				return
			}
			plog.Panicf("unexpected compaction error %v", err)
		}
		plog.Infof("compacted raft log at %d", compacti)
	})
}

msgSnapC 通道

首先分析EtcdServer.createMergedSnapshotMessage()方法,该方法中会将v2版本存储和v3版本存储封装成snap.Message实例

func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
	// get a snapshot of v2 store as []byte
	clone := s.store.Clone()//复制一份v2存储的数据, 并转换成JSON格式
	d, err := clone.SaveNoCopy()
	if err != nil {
		plog.Panicf("store save should never fail: %v", err)
	}

	// commit kv to write metadata(for example: consistent index).
	s.KV().Commit()//提交v3存储中当前的读写事务
	dbsnap := s.be.Snapshot()//获取v3存储快照, 其实就是对BoltDB数据库进行快照, 
	// get a snapshot of v3 KV as readCloser
	rc := newSnapshotReaderCloser(dbsnap)

	// put the []byte snapshot of store into raft snapshot and return the merged snapshot with
	// KV readCloser snapshot.
	snapshot := raftpb.Snapshot{
		Metadata: raftpb.SnapshotMetadata{
			Index:     snapi,
			Term:      snapt,
			ConfState: confState,
		},
		Data: d,
	}
	m.Snapshot = snapshot

	return *snap.NewMessage(m, rc, dbsnap.Size())//将消息MsgSnap消息和v3存储中的数据封装成snap.Message实例返回
}

创建完snap.Message 实例之后会调用EtcdServer.sendMergedSnap()方法将其发送到指定节点

func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
	atomic.AddInt64(&s.inflightSnapshots, 1)//递增inflightSnapshots字段,它表示已发送但未收到响应的快照消息个数

	s.r.transport.SendSnapshot(merged)//发送snap.Message消息, 底层会启动羊独的后台goroutine, 通过snapshotSender完成发送
	s.goAttach(func() {//启动一个后台goroutine监听该快照消息是否发送完成
		select {
		case ok := <-merged.CloseNotify():
			// delay releasing inflight snapshot for another 30 seconds to
			// block log compaction.
			// If the follower still fails to catch up, it is probably just too slow
			// to catch up. We cannot avoid the snapshot cycle anyway.
			if ok {
				select {
				case <-time.After(releaseDelayAfterSnapshot):
				case <-s.stopping:
				}
			}
			//snap.Message,肖息发送完成(或是超时)之后会递减inflightSnapshots,当inflightSnapshots递减到0时, 前面对MernoryStorage的压缩才能执行
			atomic.AddInt64(&s.inflightSnapshots, -1)
		case <-s.stopping:
			return
		}
	})
}

2.3 注册消息Handle

在执行完EtcdServer.NewServer()方法之后,Transport己经启动,我们可以在其上注册多个Handler 实例, 主要函数:func (e *Etcd) servePeers()中,这些Handler 实例主要用于集群内部各节点之间的通信:

func NewPeerHandler(s etcdserver.ServerPeer) http.Handler {
    //s.RaftHandler() 参见前面rafthttp中的说明,这里启动pipelineHandler、streamHandler、snapHandler
	return newPeerHandler(s.Cluster(), s.RaftHandler(), s.LeaseHandler())
}

func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
	mh := &peerMembersHandler{
		cluster: cluster,
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/", http.NotFound)//使用默认的Handler,直接返回404状态码
	
	//注册Transport.Handler()方法返回的Handler
	mux.Handle(rafthttp.RaftPrefix, raftHandler)
	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
	mux.Handle(peerMembersPrefix, mh)//将上述 peerMemberHandler 实例注册到“/me巾ers”路径上
	if leaseHandler != nil {
		mux.Handle(leasehttp.LeasePrefix, leaseHandler)
		mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
	}
	mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
	return mux
}
发布了48 篇原创文章 · 获赞 9 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/cyq6239075/article/details/105555899