K8S Informer机制分析
一、总体组件介绍
- Reflector: 通过 Kubernetes API 监控 Kubernetes 的资源类型 采用 List/Watch 机制, 可以 Watch 任何资源包括 CRD 添加 object 对象到 DeltaFIFO 队列,然后 Informer 会从队列里面取数据进行处理。Reflector 会和 apiServer 建立长连接,并使用 ListAndWatch 方法获取并监听某一个资源的变化。List 方法将会获取某个资源的所有实例,Watch 方法则监听资源对象的创建、更新以及删除事件,然后将事件放入到DeltaFIFO Queue中;
- DeltaFIFO:DeltaFIFO是一个先进先出的队列,可以保存资源对象的操作类型;
- Informer:controller 机制的基础,循环处理 object 对象 从 Reflector 取出数据,然后将数据给到 Indexer 去缓存,提供对象事件的 handler 接口,只要给 Informer 添加
ResourceEventHandler
实例的回调函数,去实现OnAdd(obj interface{})
、OnUpdate(oldObj, newObj interface{})
和OnDelete(obj interface{})
这三个方法,就可以处理好资源的创建、更新和删除操作了。Informer会不断的从 Delta FIFO Queue 中 pop 增量事件,并根据事件的类型来决定新增、更新或者是删除本地缓存;接着Informer 根据事件类型来触发事先注册好的 Event Handler触发回调函数,然后然后将该事件丢到 WorkQueue 这个工作队列中。 - Indexer:用来存储资源对象并自带索引功能的本地存储,提供 object 对象的索引,是线程安全的,缓存对象信息。
- workqueue:最后轮到Controller从线程安全的Workqueue中取出这个资源的key,进行事件的处理。Controller在处理事件的过程中可能是并行的,有许多个Worker线程不断从Workqueue中取事件并处理。 worker 来业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后不断的调和处理。Informer/SharedInformer与Worker线程的关系,实际上是一个生产者-消费者关系,利用一个Workqueue将二者分开,既实现了两个部件的解耦,也解决了双方处理速度不一致的问题。
函数接口风格: k8s对象有层级关系的逻辑,函数返回接口不返回具体的类型主要是更关心对象需对外暴露的接口不关心对象的内部构造。
二、reflector机制源码分析
// 代码源自client-go/tools/cache/reflector.go
type Reflector struct {
name string // 名字
metrics *reflectorMetrics // 监控
expectedType reflect.Type // 反射的类型,也就是要监控的对象类型,比如Pod
store Store // 存储,就是DeltaFIFO
listerWatcher ListerWatcher // 这个是用来从apiserver获取资源用的
period time.Duration // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
// 这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
// 其实这里面同步指的是shared_informer使用者需要定期同步全量对象
ShouldResync func() bool // 如果需要同步,调用这个函数问一下,当然前提是该函数指针不为空
clock clock.Clock // 时钟
lastSyncResourceVersion string // 最后一次同步的资源版本
lastSyncResourceVersionMutex sync.RWMutex // 最后一次同步的资源版本锁
}
Reflector的Run()函数,启用listWatch接口:
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{
}) {
// func Until(f func(), period time.Duration, stopCh <-chan struct{})是下面函数的声明
// 这里面我们不用关心wait.Until是如何实现的,只要知道他调用函数f会被每period周期执行一次
// 意思就是f()函数执行完毕再等period时间后在执行一次,也就是r.ListAndWatch()会被周期性的调用
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
很多存储类的系统都是这样设计的,数据采用版本的方式记录,数据每变化(添加、删除、更新)都会触发版本更新,这样的做法可以避免全量数据访问。以apiserver资源监控为例,只要监控比缓存中资源版本大的对象就可以了,
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{
}) error {
var resourceVersion string
// 把变化的部分更新到缓存中就可以达到与apiserver一致的效果,一般资源的初始版本为0,从0版本开始列举就是全量的对象了
options := metav1.ListOptions{
ResourceVersion: "0"}
// 列举资源,这部分是apimachery相关的内容,读者感兴趣可以自己了解
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
// 以上部分都是对象实例化的过程,可以称之为反射,也是Reflector这个名字的主要来源,本文不是讲解反射原理的,
// 而是作为SharedInformer的前端,所以我们重点介绍的是对象在SharedInformer中流转过程,所以反射原理部分不做为重点讲解
// 这可是真正从apiserver同步过来的全量对象,所以要同步到DeltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
// 设置最新的同步的对象版本
r.setLastSyncResourceVersion(resourceVersion)
// 下面要启动一个后台协程实现定期的同步操作,这个同步就是将SharedInformer里面的对象全量以同步事件的方式通知使用者
// 我们暂且称之为“后台同步协程”,Run()函数退出需要后台同步协程退出,所以下面的cancelCh就是干这个用的
// 利用defer close(cancelCh)实现的,而resyncerrc是后台同步协程反向通知Run()函数的报错通道
// 当后台同步协程出错,Run()函数接收到信号就可以退出了
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{
})
defer close(cancelCh)
// 下面这个匿名函数就是后台同步协程的函数了
go func() {
// resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup()
}()
// 死循环等待各种信号
for {
// 只有定时器有信号才继续处理,其他的都会退出
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// ShouldResync是个函数地址,创建反射器对象的时候传入,即便时间到了,也要通过函数问问是否需要同步
if r.ShouldResync == nil || r.ShouldResync() {
//store是DeltaFIFO,DeltaFIFO.Resync()Resync机制
//会将Indexer 的本地缓存重新同步到 DeltaFIFO 队列中。
//一般我们会设置一个时间周期,让 Indexer 周期性地将缓存同步到队列中.
//Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,
//在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。
//并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存
//不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,
//触发 onUpdate 回调来让事件重新处理。
// 就在这里实现了我们前面提到的同步,从这里看所谓的同步就是以全量对象同步事件的方式通知使用者
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
// 清理掉当前的计时器,获取下一个同步时间定时器
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 前面已经列举了全量对象,接下来就是watch的逻辑了
for {
// 如果有退出信号就立刻返回,否则就会往下走,因为有default.
select {
case <-stopCh:
return nil
default:
}
// 计算watch的超时时间
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
// 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
// 如果没有资源变化总不能一直挂着吧?也不知道是卡死了还是怎么了,所以有一个超时会好一点
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
// 开始监控对象
w, err := r.listerWatcher.Watch(options)
// watch产生错误了,大部分错误就要退出函数然后再重新来一遍流程
if err != nil {
switch err {
case io.EOF:
case io.ErrUnexpectedEOF:
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// 类似于网络拒绝连接的错误要等一会儿再试,因为可能网络繁忙
if urlError, ok := err.(*url.Error); ok {
if opError, ok := urlError.Err.(*net.OpError); ok {
if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
time.Sleep(time.Second)
continue
}
}
}
return nil
}
// watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
// 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
// 用于异步通知退出或者后台同步协程错误
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
}
其中调用了两个私有函数,分别为syncWith()和watchHandler()。syncWith()用于实现一次从apiserver全量对象的同步,这里的同步和我们上面提到的同步不是一回事,这里指的是从apiserver的同步。watchHandler是实现监控apiserver资源变化的处理过程,主要就是把apiserver的资源变化转换为DeltaFIFO调用。通过apiserver的client监控(watch)资源,监控的当前资源版本号以后的对象,因为之前的都已经获取到了;一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,产生一个相应的对象Delta,同时更新当前资源的版本;我们接下来就看这两个函数的具体实现
// 代码源自client-go/tools/cache/reflector.go
// 实现apiserver全量对象的同步
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
// 做一次slice类型转换
found := make([]interface{
}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
return r.store.Replace(found, resourceVersion)
}
// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
unc (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{
}) error {
...
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
...
// 获取资源版本号
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
//将添加资源事件添加到DeltaFIFO队列中
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将更新资源事件添加到DeltaFIFO队列中
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//将删除资源事件添加到DeltaFIFO队列中
case watch.Deleted:
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
...
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
while循环一直监听返回watch的回复。
三、ListWatch机制分析
kubernetes系统的采取Level Trigger的设计理念,所以各组件只需要感知数据最新的状态,而不需要担心错过数据的变化过程。而作为kubernentes系统消息通知机制(或者说数据实时通知机制),应该满足下面几点要求:
3.1 实时性(即数据变化时,相关组件越快感知越好)。消息必须是实时的,list-watch
机制下,每当apiserver
的资源产生状态变更事件
,都会将事件及时的推送给客户端,从而保证了消息的实时性
。
list-watch 具体是什么呢,顾名思义,list-watch有两部分组成,分别是list
和 watch
。list
非常好理解,就是调用资源的list API
罗列资源,基于HTTP
短链接实现;watch
则是调用资源的watch API
监听资源变更事件,基于HTTP 长链接
实现。Watch API,往往带上 watch=true
,表示采用 HTTP 长连接
持续监听 pod 相关事件
,每当有事件来临,返回一个 WatchEvent。
HTTP 分块传输编码允许服务器为动态生成的内容维持 HTTP 持久链接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。
当客户端调用 watch API
时,apiserver
在response
的 HTTP Header
中设置 Transfer-Encoding
的值为chunked
,表示采用分块传输
编码,客户端收到该信息后,便和服务端该链接,并等待下一个数据块,即资源的事件信息具体流程如下图所示:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/pods?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 02 Jan 2019 20:22:59 GMT
Transfer-Encoding: chunked
{
"type": "ADDED",
"object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "10596", ...}, ...}
}
{
"type": "MODIFIED",
"object": {"kind": "Pod", "apiVersion": "v1", "metadata": {"resourceVersion": "11020", ...}, ...}
}
3.2 消息的顺序性
消息的顺序性也是非常重要的,在并发的场景下,客户端在短时间内可能会收到同一个资源的多个事件,对于关注最终一致性
的 K8S
来说,它需要知道哪个是最近发生的事件,并保证资源的最终状态如同最近事件所表述的状态一样。K8S
在每个资源的事件中都带一个 resourceVersion
的标签,这个标签是递增的数字,所以当客户端并发处理同一个资源的事件时,它就可以对比 resourceVersion
来保证最终的状态和最新的事件所期望的状态保持一致。
3.3 消息的可靠性, 保证消息不丢失或者有可靠的重新获取机制(比如说kubelet和kube-apiserver间网络闪断,需要保证网络恢复后kubelet可以收到网络闪断期间产生的消息)
消息
必须是可靠
的,list
和 watch
一起保证了消息的可靠性,避免因消息丢失而造成状态不一致场景。具体而言,list API
可以查询当前的资源及其对应的状态(即期望的状态),客户端通过拿期望的状态
和实际的状态
进行对比,纠正状态不一致的资源。Watch API
和 apiserver
保持一个长链接
,接收资源的状态变更事件
并做相应处理。如果仅调用 watch API
,若某个时间点连接中断,就有可能导致消息丢失,所以需要通过list API
解决消息丢失
的问题。从另一个角度出发,我们可以认为list API
获取全量数据,watch API
获取增量数据。虽然仅仅通过轮询 list API
,也能达到同步资源状态的效果,但是存在开销大,实时性不足的问题。
kubernetes中结合watch请求增加了list请求
,主要做如下两件事情:
- watch请求开始之前,先发起一次list请求,获取集群中当前所有该类数据(同时得到最新的ResourceVersion),之后基于最新的ResourceVersion发起watch请求。
- 当watch出错时(比如说网络闪断造成客户端和服务端数据不同步),重新发起一次list请求获取所有数据,再重新基于最新ResourceVersion来watch。
ListerWatcher是一个interface类型,定义如下:
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan Event
}
// Event represents a single event to a watched resource.
// +k8s:deepcopy-gen=true
type Event struct {
Type EventType
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
}
watch机制分析:Informer
只会调用Kubernetes List
和 Watch
两种类型的 API
。Informer
在初始化的时,先调用Kubernetes List API
获得某种 resource
的全部Object
,缓存在内存
中; 然后,调用 Watch API
去watch
这种resource
,去维护这份缓存.
// pod的watch接口示例
// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch()
}
四、sharedInformer源码分析
control模块非controller-manager管理的各种各样的controller,其目的是用来把Reflector、DeltaFIFO组合起来形成一个相对固定的、标准的处理流程。Controller把多个模块整合起来实现了一套业务逻辑,所以在创建Controller需要提供一些配置:
// 代码源自client-go/tools/cache/controller.go
// 这是一个Controller的抽象
type Controller interface {
Run(stopCh <-chan struct{
}) // 核心流程函数
HasSynced() bool // apiserver中的对象是否已经同步到了Store中
LastSyncResourceVersion() string // 最新的资源版本号
}
// 代码源自client-go/tools/cache/controller.go
type Config struct {
Queue // SharedInformer使用DeltaFIFO
ListerWatcher // 这个用来构造Reflector
Process ProcessFunc // 这个在调用DeltaFIFO.Pop()使用,弹出对象要如何处理
ObjectType runtime.Object // 对象类型,这个肯定是Reflector使用
FullResyncPeriod time.Duration // 全量同步周期,这个在Reflector使用
ShouldResync ShouldResyncFunc // Reflector在全量更新的时候会调用该函数询问
RetryOnError bool // 错误是否需要尝试
}
// 代码源自client-go/tools/cache/controller.go
// controller是Controller的实现类型
type controller struct {
config Config // 配置,上面有讲解
reflector *Reflector // 反射器
reflectorMutex sync.RWMutex // 反射器的锁
clock clock.Clock // 时钟
}
Controller构造Reflector对象,Reflector作为DeltaFIFO生产者持续监控apiserver的资源变化并推送到队列中。Controller的Run()是DeltaFIFO队列的消费者,从队列中弹出对象并调用Process()处理。所以Controller相比于Reflector因为队列的加持表现为每次有资源变化就会调用一次使用者定义的处理函数。
// 核心业务逻辑实现
func (c *controller) Run(stopCh <-chan struct{
}) {
defer utilruntime.HandleCrash()
// 创建一个协程,如果收到系统退出的信号就关闭队列,相当于在这里析构的队列
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 创建Reflector,传入的参数都是我们上一个章节解释过的,这里不赘述
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// r.ShouldResync的存在就是为了以后使用少些一点代码?否则直接使用c.config.ShouldResync不就完了么?不明白用意
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
// 记录反射器
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
// wait.Group不是本章的讲解内容,只要把它理解为类似barrier就行了
// 被他管理的所有的协程都退出后调用Wait()才会退出,否则就会被阻塞
var wg wait.Group
defer wg.Wait()
// StartWithChannel()会启动协程执行Reflector.Run(),同时接收到stopCh信号就会退出协程
wg.StartWithChannel(stopCh, r.Run)
// wait.Until()在前面的章节讲过了,周期性的调用c.processLoop(),这里来看是1秒
// 不用担心调用频率太高,正常情况下c.processLoop是不会返回的,除非遇到了解决不了的错误,因为他是个循环
wait.Until(c.processLoop, time.Second, stopCh)
}
// 代码源自client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
// 从队列中弹出一个对象,然后处理它,这才是最主要的部分,这个c.config.Process是构造Controller的时候通过Config传进来的
// 所以这个读者要特别注意了,这个函数其实是ShareInformer传进来的,所以在分析SharedInformer的时候要重点分析的
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
// 如果FIFO关闭了那就退出
if err == FIFOClosedError {
return
}
// 如果错误可以再试试
if c.config.RetryOnError {
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
这里会循环将DeltaFIFO队列中数据pop出队,然后交给Process方法进行处理,Process方法是在上面调用sharedIndexInformer的Run方法的数据设置,设置的方法是下面sharedIndexInformer的HandleDeltas方法。
首先看下SharedInformer 相关接口
// 代码源自client-go/tools/cache/shared_informer.go
type SharedInformer interface {
// 添加资源事件处理器,关于ResourceEventHandler的定义在下面
// 相当于注册回调函数,当有资源变化就会通过回调通知使用者,是不是能和上面介绍的Controller可以联系上了?
// 为什么是Add不是Reg,说明可以支持多个handler
AddEventHandler(handler ResourceEventHandler)
// 上面添加的是不需要周期同步的处理器,下面的接口添加的是需要周期同步的处理器,周期同步上面提了好多遍了,不赘述
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// Store这个有专门的文章介绍,这个函数就是获取Store的接口,说明SharedInformer内有Store对象
GetStore() Store
// Controller在上面的章节介绍了,说明SharedInformer内有Controller对象
GetController() Controller
// 这个应该是SharedInformer的核心逻辑实现的地方
Run(stopCh <-chan struct{
})
// 因为有Store,这个函数就是告知使用者Store里面是否已经同步了apiserver的资源,这个接口很有用
// 当创建完SharedInformer后,通过Reflector从apiserver同步全量对象,然后在通过DeltaFIFO一个一个的同志到cache
// 这个接口就是告知使用者,全量的对象是不是已经同步到了cache,这样就可以从cache列举或者查询了
HasSynced() bool
// 最新同步资源的版本,这个就不多说了,通过Controller(Controller通过Reflector)实现
LastSyncResourceVersion() string
}
// 扩展了SharedInformer类型,从类型名字上看共享的是Indexer,Indexer也是一种Store的实现
type SharedIndexInformer interface {
// 继承了SharedInformer
SharedInformer
// 扩展了Indexer相关的接口
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
// 代码源自client-go/tools/cache/controller.go,SharedInformer使用者如果需要处理资源的事件
// 那么就要自己实现相应的回调函数
type ResourceEventHandler interface {
// 添加对象回调函数
OnAdd(obj interface{
})
// 更新对象回调函数
OnUpdate(oldObj, newObj interface{
})
// 删除对象回调函数
OnDelete(obj interface{
})
}
接口SharedInformer按照k8s的代码风格有一个同名的实现类(结构体):
// 代码源自client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
// 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
indexer Indexer
controller Controller
// sharedIndexInformer把上面提到的ResourceEventHandler进行了在层封装,并统一由sharedProcessor管理,后面章节专门介绍
processor *sharedProcessor
// CacheMutationDetector其实没啥用,我理解是开发者自己实现的一个调试工具,用来发现对象突变的
// 实现方法也比较简单,DeltaFIFO弹出的对象在处理前先备份(深度拷贝)一份,然后定期比对两个对象是否相同
// 如果不同那就报警,说明处理过程中有人修改过对象,这个功能默认是关闭,所以我说没啥用
cacheMutationDetector CacheMutationDetector
// 这两个变量是给Reflector用的,我们知道Reflector是在Controller创建的
listerWatcher ListerWatcher
objectType runtime.Object
// 定期同步的周期,因为可能存在多个ResourceEventHandler,就有可能存在多个同步周期,sharedIndexInformer采用最小的周期
// 这个周期值就存储在resyncCheckPeriod中,通过AddEventHandler()添加的处理器都采用defaultEventHandlerResyncPeriod
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
// 时钟
clock clock.Clock
// 启动、停止标记,肯定有人会问为啥用两个变量,一个变量不就可以实现启动和停止了么?
// 其实此处是三个状态,启动前,已启动和已停止,start表示了两个状态,而且为启动标记专门做了个锁
// 说明启动前和启动后有互斥的资源操作
started, stopped bool
startedLock sync.Mutex
// 这个名字起的也是够了,因为DeltaFIFO每次Pop()的时候需要传入一个函数用来处理Deltas
// 处理Deltas也就意味着要把消息通知给处理器,如果此时调用了AddEventHandler()
// 就会存在崩溃的问题,所以要有这个锁,阻塞Deltas....细想名字也没毛病~
blockDeltas sync.Mutex
}
sharedProcessor管理processorListener监听器,一个informer可以包含多个监听器。
// client-go/tools/cache/shared_informer.go
// sharedProcessor是通过数组组织处理器的,只是分了需要定时同步和不需要要同步两类
type sharedProcessor struct {
listenersStarted bool // 所有处理器是否已经启动的标识
listenersLock sync.RWMutex // 读写锁
listeners []*processorListener // 通用的处理器
syncingListeners []*processorListener // 需要定时同步的处理器
clock clock.Clock // 时钟
wg wait.Group // 前面讲过了processorListener每个需要两个协程,
// 用wait.Group来管理所有处理器的携程,保证他们都能退出
}
processorListener 处理监听事件,进行相应监听器的回调。
// 代码源自clien-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh、addCh、handler、pendingNotifications的用法需细想总结
// 总结这四个变量实现了事件的输入、缓冲、处理,事件就是apiserver资源的变化
nextCh chan interface{
}
addCh chan interface{
}
handler ResourceEventHandler
pendingNotifications buffer.RingGrowing
// 下面四个变量就是跟定时同步相关的了,requestedResyncPeriod是处理器设定的定时同步周期
// resyncPeriod是跟sharedIndexInformer对齐的同步时间,因为sharedIndexInformer管理了多个处理器
// 最终所有的处理器都会对齐到一个周期上,nextResync就是下一次同步的时间点
requestedResyncPeriod time.Duration
resyncPeriod time.Duration
nextResync time.Time
resyncLock sync.Mutex
}
// 代码源自client-go/tools/cache/shared_informer.go
// 对,就这么简单,通过addCh传入,这里面的notification就是我们所谓的事件
func (p *processorListener) add(notification interface{
}) {
p.addCh <- notification
}
// 代码源自client-go/tools/cache/shared_informer.go
// 这个也是sharedProcessor通过wait.Group启动的
func (p *processorListener) run() {
// 因为wait.Until需要传入退出信号的chan
stopCh := make(chan struct{
})
// wait.Until不多说了,我在前期不点的文章中说过了,只要没有收到退出信号就会周期的执行传入的函数
wait.Until(func() {
// wait.ExponentialBackoff()和wait.Until()类似,wait.Until()是无限循环
// wait.ExponentialBackoff()是尝试几次,每次等待时间会以指数上涨
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
// 这也是chan的range用法,可以参看我的《深入浅出golang的chan》了解细节
for next := range p.nextCh {
// 判断事件类型,这里面的handler就是调用SharedInfomer.AddEventHandler()传入的
// 理论上处理的不是Deltas么?怎么变成了其他类型,这是SharedInformer做的二次封装,后面会看到
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
return true, nil
})
// 执行到这里只能是nextCh已经被关闭了,所以关闭stopCh,通知wait.Until()退出
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
pop()函数实现的非常巧妙,利用一个协程就把接收、缓冲、发送全部解决了。它充分的利用了golang的select可以同时操作多个chan的特性,同时从addChd读取数据从nextCh发送数据,这两个chan任何一个完成都可以激活协程。
// 代码源自client-go/tools/cache/shared_informer.go
// 这个函数是通过sharedProcessor利用wait.Group启动的,读者可以自行查看wait.Group
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
// nextCh是在这里,函数退出前析构的
defer close(p.nextCh)
// 临时变量,下面会用到
var nextCh chan<- interface{
}
var notification interface{
}
// 进入死循环啦
for {
select {
// 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
// nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞
case nextCh <- notification:
// 如果发送成功了,那就从缓冲中再取一个事件出来
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok {
// 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
nextCh = nil
}
// 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
case notificationToAdd, ok := <-p.addCh:
// 说明p.addCh关闭了,只能退出
if !ok {
return
}
// notification为空说明当前还没发送任何事件给处理器
if notification == nil {
// 那就把刚刚获取的事件通过p.nextCh发送个处理器
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 上一个事件还没有发送成功,那就先放到缓存中
// pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
通过SharedInformer.AddEventHandler()添加的处理器最终就会封装成processorListener。
/ 代码源自client-go/tools/cache/shared_informer.go
// 添加处理器,sharedIndexInformer.AddEventHandler()就会调用这个函数实现处理器的添加
func (p *sharedProcessor) addListener(listener *processorListener) {
// 加锁,这个很好理解
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
// 把处理器添加到数组中
p.addListenerLocked(listener)
// 通过wait.Group启动两个协程,做的事情我们在processorListener说过了,这里就是我们上面提到的启动两个协程的地方
// 这个地方判断了listenersStarted,这说明sharedProcessor在启动前、后都可以添加处理器
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 把处理器添加到数组中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
// 两类(定时同步和不同步)的处理器数组都添加了,这是因为没有定时同步的也会用默认的时间,后面我们会看到
// 那么问题来了,那还用两个数组干什么呢?
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
sharedProcessor会进行事件的分发, 由监听器进行处理:
// 代码源自client-go/tools/cache/shared_informer.go
// 通过函数名称也能感觉到分发的感觉~sync表示obj对象是否为同步事件对象
func (p *sharedProcessor) distribute(obj interface{
}, sync bool) {
// 加锁没毛病
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 无论是否为sync,添加处理器的代码中我们知道两个数组都会被添加,所以判断不判断没啥区别~
// 所以我的猜测是代码以前实现的是明显区分两类的,但随着代码的更新二者的界限已经没那么明显了
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// 代码源自client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{
}) {
// 启动前、后对于添加处理器的逻辑是不同,启动前的处理器是不会立刻启动连个协程执行处理器的pop()和run()函数的
// 而是在这里统一的启动
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有的处理器,然后为处理器启动两个后台协程
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待退出信号
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 关闭addCh,processorListener.pop()这个协程就会退出,不明白的可以再次回顾代码
// 因为processorListener.pop()会关闭processorListener.nextCh,processorListener.run()就会退出
// 所以这里只要关闭processorListener.addCh就可以自动实现两个协程的退出,不得不说设计的还是挺巧妙的
for _, listener := range p.listeners {
close(listener.addCh)
}
// 等待所有的协程退出,这里指的所有协程就是所有处理器的那两个协程
p.wg.Wait()
}
client-go实现了两个创建SharedInformer的接口,如下所示:
// 代码源自client-go/tools/cache/shared_informer.go
// lw:这个是apiserver客户端相关的,用于Reflector从apiserver获取资源,所以需要外部提供
// objType:这个SharedInformer监控的对象类型
// resyncPeriod:同步周期,SharedInformer需要多长时间给使用者发送一次全量对象的同步时间
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
// 还是用SharedIndexInformer实现的
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{
})
}
// 创建SharedIndexInformer对象,其中大部分参数再上面的函数已经介绍了
// indexers:需要外部提供计算对象索引键的函数,也就是这里面的对象需要通过什么方式创建索引
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{
}
sharedIndexInformer := &sharedIndexInformer{
// 管理所有处理器用的,这个上面的章节解释了
processor: &sharedProcessor{
clock: realClock},
// 其实就是在构造cache,读者可以自行查看NewIndexer()的实现,
// 在cache中的对象用DeletionHandlingMetaNamespaceKeyFunc计算对象键,用indexers计算索引键
// 可以想象成每个对象键是Namespace/Name,每个索引键是Namespace,即按照Namesapce分类
// 因为objType决定了只有一种类型对象,所以Namesapce是最大的分类
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
// 下面这两主要就是给Controller用,确切的说是给Reflector用的
listerWatcher: lw,
objectType: objType,
// 无论是否需要定时同步,SharedInformer都提供了一个默认的同步时间,当然这个是外部设置的
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
// 默认没有开启的对象突变检测器,没啥用,也不多介绍
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
创建完ShareInformer对象,就要添加事件处理器了:
// 代码源自client-go/tools/cache/shared_informer.go
// 添加没有指定同步周期的事件处理器
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
// defaultEventHandlerResyncPeriod是默认的同步周期,在创建SharedInformer的时候设置的
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
// 添加需要定期同步的事件处理器
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
// 因为是否已经开始对于添加事件处理器的方式不同,后面会有介绍,所以此处加了锁
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 如果已经结束了,那就可以直接返回了
if s.stopped {
return
}
// 如果有同步周期,==0就是永远不用同步
if resyncPeriod > 0 {
// 同步周期不能太短,太短对于系统来说反而是个负担,大量的无效计算浪费在这上面
if resyncPeriod < minimumResyncPeriod {
resyncPeriod = minimumResyncPeriod
}
// SharedInformer管理了很多处理器,每个处理器都有自己的同步周期,所以此处要统一成一个,称之为对齐
// SharedInformer会选择所有处理器中最小的那个作为所有处理器的同步周期,称为对齐后的同步周期
// 此处就要判断是不是比当前对齐后的同步周期还要小
if resyncPeriod < s.resyncCheckPeriod {
// 如果已经启动了,那么只能用和大家一样的周期
if s.started {
resyncPeriod = s.resyncCheckPeriod
// 如果没启动,那就让大家都用最新的对齐同步周期
} else {
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 创建处理器,代码一直用listener,可能想强调没事件就挂起把,我反而想用处理器这个名词
// determineResyncPeriod()这个函数读者自己分析把,非常简单,这里面只要知道创建了处理器就行了
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// 如果没有启动,那么直接添加处理器就可以了
if !s.started {
s.processor.addListener(listener)
return
}
// 这个锁就是暂停再想所有的处理器分发事件用的,因为这样会遍历所有的处理器,此时添加会有风险
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 添加处理器
s.processor.addListener(listener)
// 这里有意思啦,遍历缓冲中的所有对象,通知处理器,因为SharedInformer已经启动了,可能很多对象已经让其他的处理器处理过了,
// 所以这些对象就不会再通知新添加的处理器,此处就是解决这个问题的
for _, item := range s.indexer.List() {
listener.add(addNotification{
newObj: item})
}
}
事件处理器添加完了,就要看SharedInformer如何把事件分发给每个处理器的了:
// 代码源自client-go/tools/cache/shared_informer.go
// sharedIndexInformer的核心逻辑函数
func (s *sharedIndexInformer) Run(stopCh <-chan struct{
}) {
defer utilruntime.HandleCrash()
// 在此处构造的DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 这里的Config是我们介绍Reflector时介绍的那个Config
cfg := &Config{
// 我前面一直在说Reflector输入到DeltaFIFO,这里算是直接证明了
Queue: fifo,
// 下面这些变量我们在Reflector都说了,这里赘述
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 这个才是重点,Controller调用DeltaFIFO.Pop()接口传入的就是这个回调函数,也是我们后续重点介绍的
Process: s.HandleDeltas,
}
// 创建Controller,这个不用多说了
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 这个processorStopCh 是给sharedProcessor和cacheMutationDetector传递退出信号的
// 因为这里要创建两个协程运行sharedProcessor和cacheMutationDetector的核心函数
processorStopCh := make(chan struct{
})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
// Run()函数都退出了,也就应该设置结束的标识了
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true
}()
// 启动Controller,Controller一旦运行,整个流程就开始启动了,所以叫Controller也不为过
// 毕竟Controller是SharedInformer的发动机嘛
s.controller.Run(stopCh)
}
sharedIndexInformer通过Run()函数启动了Controller和sharedProcess(),Controller通过DeltaFIFO.Pop()函数弹出Deltas,并调用函数处理,这个处理函数就是sharedIndexInformer.HandleDeltas(),这个函数是衔接Controller和sharedProcess的关键点,他把Deltas转换为sharedProcess需要的各种Notification类型。下面我们就对这个函数进行代码分析:
// 代码源自client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{
}) error {
// 看到这里就知道为啥起名为blockDeltas了,这是阻塞处理器Deltas啊~因为分发事件到处理器,所以要加锁
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// Deltas里面包含了一个对象的多个增量操作,所以要从最老的Delta到最先的Delta遍历处理
for _, d := range obj.(Deltas) {
// 根据不同的Delta做不同的操作,但是大致分为对象添加、删除两大类操作
// 所有的操作都要先同步到cache在通知处理器,这样保持处理器和cache的状态是一致的
switch d.Type {
// 同步、添加、更新都是对象添加类的造作,至于是否是更新还要看cache是否有这个对象
case Sync, Added, Updated:
// 看看对象是不是有定时同步产生的事件
isSync := d.Type == Sync
// 检测突变,没啥用
s.cacheMutationDetector.AddObject(d.Object)
// 如果cache中有的对象,一律看做是更新事件
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 把对象更新到cache中
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 通知处理器处理事件
s.processor.distribute(updateNotification{
oldObj: old, newObj: d.Object}, isSync)
// cache中没有的对象,一律看做是新增事件
} else {
// 把对象添加到cache中
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 通知处理器处理器事件
s.processor.distribute(addNotification{
newObj: d.Object}, isSync)
}
// 对象被删除
case Deleted:
// 从cache中删除对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 通知所有的处理器对象被删除了
s.processor.distribute(deleteNotification{
oldObj: d.Object}, false)
}
}
return nil
}
五、sharedInformerFactory源码分析
每个SharedInformer其实只负责一种对象,在构造SharedInformer的时候指定了对象类型。SharedInformerFactory可以构造Kubernetes里所有对象的Informer,而且主要用在controller-manager这个服务中。因为controller-manager负责管理绝大部分controller,每类controller不仅需要自己关注的对象的informer,同时也可能需要其他对象的Informer(比如ReplicationController也需要PodInformer,否则他无法感知Pod的启动和关闭,也就达不到监控的目的了),所以一个SharedInformerFactory可以让所有的controller共享使用同一个类对象的Informer。
// 代码源自client-go/informers/factory.go
// SharedInformerFactory是个interfaces,所以肯定有具体的实现类
type SharedInformerFactory interface {
// 在informers这个包中又定义了一个SharedInformerFactory,这个主要是包内抽象,所以此处继承了这个接口
internalinterfaces.SharedInformerFactory
// 这个暂时不知道干啥用,所以我也不介绍他了
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
// 等待所有的Informer都已经同步完成,这里同步其实就是遍历调用SharedInformer.HasSynced()
// 所以函数需要周期性的调用指导所有的Informer都已经同步完毕
WaitForCacheSync(stopCh <-chan struct{
}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface // 返回admissionregistration相关的Informer组
Apps() apps.Interface // 返回app相关的Informer组
Autoscaling() autoscaling.Interface // 返回autoscaling相关的Informer组
Batch() batch.Interface // 返回job相关的Informer组
Certificates() certificates.Interface // 返回certificates相关的Informer组
Coordination() coordination.Interface // 返回coordination相关的Informer组
Core() core.Interface // 返回core相关的Informer组
Events() events.Interface // 返回event相关的Informer组
Extensions() extensions.Interface // 返回extension相关的Informer组
Networking() networking.Interface // 返回networking相关的Informer组
Policy() policy.Interface // 返回policy相关的Informer组
Rbac() rbac.Interface // 返回rbac相关的Informer组
Scheduling() scheduling.Interface // 返回scheduling相关的Informer组
Settings() settings.Interface // 返回settings相关的Informer组
Storage() storage.Interface // 返回storage相关的Informer组
}
SharedInformerFactory继承了internalinterfaces.SharedInformerFactory,internalinterfaces.SharedInformerFactory其实只提供了一个能力,就是通过对象类型构造Informer。因为SharedInformerFactory管理的就是SharedIndexInformer对象,SharedIndexInformer存储的对象类型决定了他是什么Informer,致使SharedInformerFactory无需知道具体的Informer如何构造,所以需要外部传入构造函数,这样可以减低耦合性。
// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
// 核心逻辑函数,类似于很多类的Run()函数
Start(stopCh <-chan struct{
})
// 这个很关键,通过对象类型,返回SharedIndexInformer,这个SharedIndexInformer管理的就是指定的对象
// NewInformerFunc用于当SharedInformerFactory没有这个类型的Informer的时候创建使用
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// 创建Informer的函数定义,这个函数需要apiserver的客户端以及同步周期,这个同步周期在SharedInformers反复提到
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
SharedInformerFactory只是个interface,肯定有一个实现类,按照kubernetes的风格,应该是sharedInformerFactory.
// 代码源自client-go/informers/factory.go
type sharedInformerFactory struct {
// apiserver的客户端,暂时不用关心怎么实现的,只要知道他能列举和监听资源就可以了
client kubernetes.Interface
// 哈哈,这样看来每个namesapce需要一个SharedInformerFactory,那cache用namespace建索引还有啥用呢?
// 并不是所有的使用者都需要指定namesapce,比如kubectl,他就可以列举所有namespace的资源,所以他没有指定namesapce
namespace string
// 这是个函数指针,用来调整列举选项的,这个选项用来client列举对象使用
tweakListOptions internalinterfaces.TweakListOptionsFunc
// 互斥锁
lock sync.Mutex
// 默认的同步周期,这个在SharedInformer需要用
defaultResync time.Duration
// 每个类型的Informer有自己自定义的同步周期
customResync map[reflect.Type]time.Duration
// 每类对象一个Informer,但凡使用SharedInformerFactory构建的Informer同一个类型其实都是同一个Informer
informers map[reflect.Type]cache.SharedIndexInformer
// 各种Informer启动的标记
startedInformers map[reflect.Type]bool
}
client-go里面提供构造SharedInformerFactory的几个接口函数:
// 代码源自client-go/tools/cache/shared_informer.go
// 这是一个通用的构造SharedInformerFactory的接口函数,没有任何其他的选项,只包含了apiserver的client以及同步周期
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
// 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非没有选项而已
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// 相比于上一个通用的构造函数,这个构造函数增加了namesapce过滤和调整列举选项
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
// 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非选项是2个
// WithNamespace()和WithTweakListOptions()会在后文讲解
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
// 到了构造SharedInformerFactory核心函数了,其实SharedInformerOption是个有意思的东西
// 我们写程序喜欢Option是个结构体,但是这种方式的扩展很麻烦,这里面用的是回调函数,这个让我get到新技能了
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
// 默认只有apiserver的client以及同步周期是需要外部提供的其他的都是可以有默认值的
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
//逐一遍历各个选项函数,opt是选项函数,下面面有详细介绍
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// 代码源自client-go/informers/factory.go
// 这个是SharedInformerFactory构造函数的选项,是一个函数指针,传入的是工厂指针,返回也是工厂指针
// 很明显,选项函数直接修改工厂对象,然后把修改的对象返回就可以了
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
// 把每个对象类型的同步周期这个参数转换为SharedInformerOption类型
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
// 这个实现很简单了,我就不多解释了
return func(factory *sharedInformerFactory) *sharedInformerFactory {
for k, v := range resyncConfig {
factory.customResync[reflect.TypeOf(k)] = v
}
return factory
}
}
在sharedInformerFactory中添加SharedIndexInformer
// 代码源自client-go/informers/factory.go
// InformerFor()相当于每个类型Informer的构造函数了,即便具体实现构造的地方是使用者提供的
// 这个函数需要使用者传入对象类型,因为在sharedInformerFactory里面是按照对象类型组织的Informer
// 更有趣的是这些Informer不是sharedInformerFactory创建的,需要使用者传入构造函数
// 这样做既保证了每个类型的Informer只构造一次,同时又保证了具体Informer构造函数的私有化能力
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
// 加锁操作
f.lock.Lock()
defer f.lock.Unlock()
// 通过反射获取obj的类型
informerType := reflect.TypeOf(obj)
// 看看这个类型的Informer是否已经创建了?
informer, exists := f.informers[informerType]
// 如果Informer已经创建,那么就复用这个Informer
if exists {
return informer
}
// 获取这个类型定制的同步周期,如果定制的同步周期那就用统一的默认周期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 调用使用者提供构造函数,然后把创建的Informer保存起来
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
// 这个函数定义就是具体类型Informer的构造函数,后面会有地方说明如何使用
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
sharedInformerFactory工厂对象已经创建出来了,运行Start()接口:
// 代码源自client-go/informers/factory.go
// 其实sharedInformerFactory的Start()函数就是启动所有具体类型的Informer的过程
// 因为每个类型的Informer都是SharedIndexInformer,需要需要把每个SharedIndexInformer都要启动起来
func (f *sharedInformerFactory) Start(stopCh <-chan struct{
}) {
// 加锁操作
f.lock.Lock()
defer f.lock.Unlock()
// 遍历informers这个map
for informerType, informer := range f.informers {
// 看看这个Informer是否已经启动过
if !f.startedInformers[informerType] {
// 如果没启动过,那就启动一个协程执行SharedIndexInformer的Run()函数,我们在分析SharedIndexInformer的时候
// 我们知道知道Run()是整个Informer的启动入口点,看了《深入浅出kubernetes之client-go的SharedInformer》
// 的同学应该会想Run()是谁调用的呢?这里面应该给你们答案了吧?
go informer.Run(stopCh)
// 设置Informer已经启动的标记
f.startedInformers[informerType] = true
}
}
}
六、pod Informer示例源码分析
// 代码源自client-go/informers/core/v1/pod.go
// 这个PodInformer是抽象类,Informer()就是获取SharedIndexInformer的接口函数
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// 这个是PodInformer的实现类,看到了没,他需要工厂对象的指针,貌似明细了很多把?
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// 这个就是要传入工厂的构造函数了
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 这个是实现Informer()的地方,看到了把,这里面调用了工厂的InformerFor把自己注册进去
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{
}, f.defaultInformer)
}
// 实现了PodInformer.Lister()接口函数
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
// 真正创建PodInformer的函数
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
// 还有谁记得构造SharedIndexInformer需要写啥?自己温习《深入浅出kubernetes之client-go的SharedInformer》
return cache.NewSharedIndexInformer(
// 需要ListWatch两个函数,就是用apiserver的client实现的,此处不重点解释每个代码什么意思
// 读者应该能够看懂是利用client实现了Pod的List和Watch
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(options)
},
},
// 这个是要传入对象的类型,肯定是Pod了
&corev1.Pod{
},
// 同步周期
resyncPeriod,
// 对象键的计算函数
indexers,
)
}