说实话,上一篇写的时候压力很大,不知道如何介绍代码流程,才能把存储过程说的清清楚楚?所以后来决定再写一篇,以etcdctl命令行为切入口,层层深入。
一、存储数据结构
Etcd是存储有如下特点:1、采用kv型数据存储,一般情况下比关系型数据库快。
2、支持动态存储(内存)以及静态存储(磁盘)。
3、分布式存储,可集成为多节点集群。
4、存储方式,采用类似目录结构。
1)只有叶子节点才能真正存储数据,相当于文件。
2)叶子节点的父节点一定是目录,目录不能存储数据。
数据结构所在文件:etcd\store\store.go
type store struct { Root *node //树根节点 WatcherHub *watcherHub CurrentIndex uint64 Stats *Stats CurrentVersion int ttlKeyHeap *ttlKeyHeap // need to recovery manually worldLock sync.RWMutex // stop the world lock clock clockwork.Clock readonlySet types.Set }
数据存储的顶层数据结构,挂在数据结构EtcdServer下面。其中Root为根节点。
数据结构所在文件:etcd\store\node.go
// node is the basic element in the store system. // A key-value pair will have a string value // A directory will have a children map type node struct { Path string CreatedIndex uint64 ModifiedIndex uint64 Parent *node `json:"-"` // should not encode this field! avoid circular dependency. ExpireTime time.Time Value string // for key-value pair Children map[string]*node // for directory // A reference to the store this node is attached to. store *store }
node为存储节点,可作为“目录”(父节点),也可作为“文件”(叶子节点)。重要成员说明:
成员名称 |
说明 |
Path |
作为key |
Value |
key-value对中value值 |
Children |
当此节点为目录是,其下面可挂载其他目录 |
角色 |
Value |
Children |
目录(父节节点) |
无效 |
有效 |
文件(叶子节点) |
有效 |
无效 |
二、插入数据
2.1 报文
首先通过报文从感性方向,了解一下数据组织以及传输流程。通过命令行etcdctl set key value,可以进行存储数据,例如:etcdctl set mytest “hello world”,那么报文格式是什么呢?由上图可知:
1)命令行是通过http协议传输数据,目的端口号为2379,方法是PUT。
2)Etcd是nosql存储方式,key为mytest,作为url一部分,value是hello world作为http body。
我们以/v2/keys作为入口,拨开Etcd面纱。
2.2 流程图
2.2.1 由client到raft
func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) { data, err := r.Marshal() //格式化数据 格式化成grpc格式 if err != nil { return Response{}, err } /* 注册并且创建一个channel, 此处ID每次请求都会重新生成*/ ch := a.s.w.Register(r.ID) start := time.Now() a.s.r.Propose(ctx, data) /* 处理请求,进入到raft状态机 */ proposalsPending.Inc() defer proposalsPending.Dec() select { case x := <-ch: //等待响应,强制转成Response,然后调用内部接口发给客户端 resp := x.(Response) return resp, resp.err case <-ctx.Done(): proposalsFailed.Inc() a.s.w.Trigger(r.ID, nil) // GC wait return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) case <-a.s.stopping: } return Response{}, ErrStopped }
这里的select会出现阻塞,直到从一个channel中读取出一个数据。默认场景会从ch中读取出Response,那么在什么地方写入呢?applyEntryNormal方法中调用s.w.Trigger方法,后面会分析到。
2.2.2 GDB获取堆栈信息
这里告诉一个技巧,如何快速定位流程是什么?GDB。不错就是调试,首先接收到一个新数据,必然要创建一个新叶子节点,创建节点的方法是:etcd\store\store.go:internalCreate通过gdb打断点,并且输入命令行,可以获得堆栈信息:
#0 github.com/coreos/etcd/cmd/vendor/github.com/coreos/etcd/store.(*store).internalCreate (s=0xc4202843f0, nodePath=..., dir=false, value=..., unique=false, replace=true, expireTime=..., action=..., ~r7=0xc4206c6774, ~r8=0xa) at /root/etcd/src/github.com/coreos/etcd/gopath/src/github.com/coreos/etcd/cmd/vendor/github.com/coreos/etcd/store/store.go:564 #1 0x0000000000935316 in github.com/coreos/etcd/cmd/vendor/github.com/coreos/etcd/store.(*store).Set (s=0xc4202843f0, nodePath=..., dir=false, value=..., expireOpts=..., ~r4=0x0, ~r5=...) at /root/etcd/src/github.com/coreos/etcd/gopath/src/github.com/coreos/etcd/cmd/vendor/github.com/coreos/etcd/store/store.go:225
由于篇幅原因,这里把堆栈总结如下:
internalCreate<- Set <- Put <- applyV2Request <- applyEntryNormal <- apply<- applyEntries <- applyAll
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { ... var raftReq pb.InternalRaftRequest if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible var r pb.Request pbutil.MustUnmarshal(&r, e.Data) //反序列化 将grpc格式转成可读模式数据 s.w.Trigger(r.ID, s.applyV2Request(&r)) return } if raftReq.V2 != nil { req := raftReq.V2 s.w.Trigger(req.ID, s.applyV2Request(req)) return } ... }此处需要把e.Data进行反序列化(之前为了数据同步,序列化了)。applyV2Request主要有两项工作:
1)将数据写到store中即创建树节点(put操作)
2)生成Response消息,以便于Trigger能将其写入到channel中。
通过堆栈信息可知,会调用的internalCreate方法:
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool, expireTime time.Time, action string) (*Event, *etcdErr.Error) { currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1 if unique { // append unique item under the node path nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10)) } nodePath = path.Clean(path.Join("/", nodePath)) // we do not allow the user to change "/" if s.readonlySet.Contains(nodePath) { return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex) } // Assume expire times that are way in the past are // This can occur when the time is serialized to JS if expireTime.Before(minExpireTime) { expireTime = Permanent } dirName, nodeName := path.Split(nodePath // walk through the nodePath, create dirs and get the last directory node d, err := s.walk(dirName, s.checkDir) if err != nil { s.Stats.Inc(SetFail) reportWriteFailure(action) err.Index = currIndex return nil, err } e := newEvent(action, nodePath, nextIndex, nextIndex) eNode := e.Node n, _ := d.GetChild(nodeName) // force will try to replace an existing file if n != nil { if replace { if n.IsDir() { return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex) } e.PrevNode = n.Repr(false, false, s.clock) n.Remove(false, false, nil) } else { return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex) } } if !dir { // create file // copy the value for safety valueCopy := value eNode.Value = &valueCopy //生成新的树节点node,作为叶子节点 n = newKV(s, nodePath, value, nextIndex, d, expireTime) } else { // create directory eNode.Dir = true n = newDir(s, nodePath, nextIndex, d, expireTime) } // we are sure d is a directory and does not have the children with name n.Name d.Add(n) //添加父节点中,即挂到map中 // node with TTL if !n.IsPermanent() { s.ttlKeyHeap.push(n) eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) } s.CurrentIndex = nextIndex return e, nil }
最后介绍一下Trigger,比较简单:
etcd\pkg\wait\wait.go func (w *list) Trigger(id uint64, x interface{}) { w.l.Lock() ch := w.m[id] delete(w.m, id) w.l.Unlock() if ch != nil { ch <- x //将数据写到channel中,并且关闭channel close(ch) } }
三、查询数据
对于查询比较简单,Etcd接收到客户端查询消息,会根据key在树节点中遍历,如果找到则返回数据,否则返回查询失败。请求会经过store.go中Get方法:// Get returns a get event. // If recursive is true, it will return all the content under the node path. // If sorted is true, it will sort the content by keys. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { var err *etcdErr.Error s.worldLock.RLock() defer s.worldLock.RUnlock() defer func() { if err == nil { s.Stats.Inc(GetSuccess) if recursive { reportReadSuccess(GetRecursive) } else { reportReadSuccess(Get) } return } s.Stats.Inc(GetFail) if recursive { reportReadFailure(GetRecursive) } else { reportReadFailure(Get) } }() n, err := s.internalGet(nodePath) //根据key,获取node节点 if err != nil { return nil, err } e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex) e.EtcdIndex = s.CurrentIndex e.Node.loadInternalNode(n, recursive, sorted, s.clock) //加载node,主要是获取node中数据 return e, nil } // InternalGet gets the node of the given nodePath. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) { nodePath = path.Clean(path.Join("/", nodePath)) //walkFunc是用于递归调用查找节点 walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) { if !parent.IsDir() { err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex) return nil, err } child, ok := parent.Children[name] //从map中获取节点 if ok { return child, nil } return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex) } f, err := s.walk(nodePath, walkFunc) if err != nil { return nil, err } return f, nil } func (eNode *NodeExtern) loadInternalNode(n *node, recursive, sorted bool, clock clockwork.Clock) { if n.IsDir() { // node is a directory eNode.Dir = true children, _ := n.List() eNode.Nodes = make(NodeExterns, len(children)) // we do not use the index in the children slice directly // we need to skip the hidden one i := 0 for _, child := range children { if child.IsHidden() { // get will not return hidden nodes continue } eNode.Nodes[i] = child.Repr(recursive, sorted, clock) i++ } // eliminate hidden nodes eNode.Nodes = eNode.Nodes[:i] if sorted { sort.Sort(eNode.Nodes) } } else { // node is a file value, _ := n.Read() //从node节点中获取数据存到eNode.Value中 eNode.Value = &value } eNode.Expiration, eNode.TTL = n.expirationAndTTL(clock) }
四、总结
本篇只把大体流程介绍了一下,更加入的流程,仍需要深入研究,但是已经能起到抛砖引玉的作用,哈哈。到这里基本上把数据写入和读取介绍完了,但是仍然有一块内容没有介绍--集群数据同步, 下一篇介绍数据同步。