在raft协议中,snapshot主要用来压缩raft日志、减少raft日志的数量,一旦正确产生并持久化了一个snapshot,那么在这个snapshot之前的日志全部都可以直接丢掉。
snapshot定义
etcd中对raft snapshot的定义如下(在文件raft.pb.go):
type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
XXX_unrecognized []byte `json:"-"`
}
其中的Data为snapshot的数据部分,这部分通常就是应用状态机数据,而Metadata则是snaoshot的元信息(包括集群当前的配置状态、日志索引、该条索引日志对应的任期号),定义如下:
// snapshot的元数据
type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"` // 最后一次的配置状态
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"` // 被快照取代的最后的条目在日志中的索引值(appliedIndex)
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"` // 该条目的任期号
XXX_unrecognized []byte `json:"-"`
}
集群配置状态定义如下:
type ConfState struct {
Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
snapshot新建
想要在指定目录下新创建一个snapshot,可以使用如下方法(snapshotter.go):
type Snapshotter struct {
dir string
}
func New(dir string) *Snapshotter {
return &Snapshotter{
dir: dir,
}
}
snapshot持久化
// 对外暴露的接口,存储并持久化一个snapshot
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error {
if raft.IsEmptySnap(snapshot) {
return nil
}
return s.save(&snapshot)
}
// 将raft snapshot序列化后持久化到磁盘
func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
// 产生snapshot的时间
start := time.Now()
// snapshot的文件名Term-Index.snap
fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
// 将raft snapshot序列化
b := pbutil.MustMarshal(snapshot)
// 算CRC
crc := crc32.Update(0, crcTable, b)
// 将数据和crc一起打包
snap := snappb.Snapshot{Crc: crc, Data: b}
// 使用pb方式序列化
d, err := snap.Marshal()
if err != nil {
return err
} else {
marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second))
}
// 持久化(必须刷盘)
err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666)
if err == nil {
saveDurations.Observe(float64(time.Since(start)) / float64(time.Second))
} else {
err1 := os.Remove(filepath.Join(s.dir, fname))
if err1 != nil {
plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname))
}
}
return err
}
由上面的代码可以看出,raft snapshot最终会封装成定义在snap.pb.go的Snapshot进行持久化存储:
type Snapshot struct {
Crc uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
snapshot加载
加载snapshot对外暴露的api是Load(),定义如下:
// 加载raft快照
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
// returns the filename of the snapshots in logical time order (from newest to oldest).
// If there is no available snapshots, an ErrNoSnapshot will be returned.
names, err := s.snapNames()
if err != nil {
return nil, err
}
var snap *raftpb.Snapshot
// 从时间最近到最旧来遍历所有snapshot文件
for _, name := range names {
// 加载snapshot
if snap, err = loadSnap(s.dir, name); err == nil {
// 只要成功加载了snapshot就跳出
break
}
}
if err != nil {
return nil, ErrNoSnapshot
}
return snap, nil
}
其中snapNames用于返回所有的snapshot文件的文件名,并且是按照时间排序好的,其实现如下:
// snapNames returns the filename of the snapshots in logical time order (from newest to oldest).
// If there is no available snapshots, an ErrNoSnapshot will be returned.
func (s *Snapshotter) snapNames() ([]string, error) {
dir, err := os.Open(s.dir) // 打开snapshot保存目录
if err != nil {
return nil, err
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}
snaps := checkSuffix(names) // 检查文件名后缀是否正确
if len(snaps) == 0 {
return nil, ErrNoSnapshot
}
sort.Sort(sort.Reverse(sort.StringSlice(snaps))) // 对目录下所有snapshot文件名进行排序
return snaps, nil
}
checkSuffix用于对文件名后缀的合法性检查,如下:
func checkSuffix(names []string) []string {
snaps := []string{}
for i := range names {
// 必须以 ".snap"结尾
if strings.HasSuffix(names[i], snapSuffix) { // snapSuffix 为 ".snap"
snaps = append(snaps, names[i])
} else {
// If we find a file which is not a snapshot then check if it's
// a vaild file. If not throw out a warning.
if _, ok := validFiles[names[i]]; !ok {
plog.Warningf("skipped unexpected non snapshot file %v", names[i])
}
}
}
return snaps
}
拿到所有文件名之后(按照新旧排序),接下来便从最新的文件开始进行loadSnap:
// 参数为snapshot保存目录和snapshot文件名,返回值为加载到的raft snapshot
func loadSnap(dir, name string) (*raftpb.Snapshot, error) {
fpath := filepath.Join(dir, name) // 构造文件路径
snap, err := Read(fpath) // 读snapshot
if err != nil {
renameBroken(fpath) // 如果读遇到错误,就将该snapshot标记为损坏
}
return snap, err
}
Read本质就是Save的逆过程,因此逻辑也比较简单,根绝Save的书序进行相应的反序列化就行了,定义如下:
// Read reads the snapshot named by snapname and returns the snapshot.
func Read(snapname string) (*raftpb.Snapshot, error) {
// 读snapshot文件内容
b, err := ioutil.ReadFile(snapname)
if err != nil {
plog.Errorf("cannot read file %v: %v", snapname, err)
return nil, err
}
if len(b) == 0 {
plog.Errorf("unexpected empty snapshot")
return nil, ErrEmptySnapshot
}
var serializedSnap snappb.Snapshot
// 反序列化
if err = serializedSnap.Unmarshal(b); err != nil {
plog.Errorf("corrupted snapshot file %v: %v", snapname, err)
return nil, err
}
if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 {
plog.Errorf("unexpected empty snapshot")
return nil, ErrEmptySnapshot
}
// 校验CRC
crc := crc32.Update(0, crcTable, serializedSnap.Data)
if crc != serializedSnap.Crc {
plog.Errorf("corrupted snapshot file %v: crc mismatch", snapname)
return nil, ErrCRCMismatch
}
// 反序列化,还原raft snapshot
var snap raftpb.Snapshot
if err = snap.Unmarshal(serializedSnap.Data); err != nil {
plog.Errorf("corrupted snapshot file %v: %v", snapname, err)
return nil, err
}
return &snap, nil
}
etcd对raft snapshot的实现主要在snapshotter.go中,整个逻辑比较简单,在后面的文章中,将会多出看到snapshot接口的使用身影。
本系列文章
2、etcd raft模块分析--raft snapshot
3、etcd raft模块分析--raft wal日志
4、etcd raft模块分析--raft node
5、etcd raft模块分析--raft 协议
6、etcd raft模块分析--raft transport
7、etcd raft模块分析--raft storage
8、etcd raft模块分析--raft progress