k8s informer

以nodecontroller为例:

一、先创建sharedInformers供所有controler使用,是一个工厂Informer.(sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()))。其成员有[]sharedIndexInformer切片。

二、每次创建一个controler如node controler、rccontroler等,都会生成一个相应的sharedIndexInformer(namespaceInformer、podInformer等),每种sharedindexInformer中有对应的listwatch函数。

如pod index informer:

func newPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   sharedIndexInformer := cache.NewSharedIndexInformer(
      &cache.ListWatch{
         ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
            return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
         },
         WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
            return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
         },
      },
      &api_v1.Pod{},
      resyncPeriod,
      cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
   )
   return sharedIndexInformer
}

三、podInformer通过向nodecontroler的taintManager进行更新、添加、删除。(所有的controler都差不多)

shareInformerFactory种类很多:大(组)类分为APP、autoScaling、core、Extensions、Storage等,每个组里再细分。

分为{group,version,resource}.(一下部分{,,}代表g,v,r,其中*代表下面的资源)所有资源的IndexInformer中都有Lister()(不同版本的lister()函数不一样,informer一样)、Informer()函数。不同版本成员lister watcher函数调用的不一样,主要是调用不同版本的函数。

lister()函数就是返回shareIndexInformer的indexer

informer()函数就是将自己将生成的podindexinformer加入到shareinformer的informer数组中,并返回podindexinformer

(1)如{core,v1,*}组里ComponentStatuses、ConfigMaps、Endpoints、Events、Namespaces、LimitRanges、Nodes、PersistentVolumes、PersistentVolumeClaims、Pods、PodTemplates、ReplicationControllers、ResourceQuotas、Secrets、Services、ServiceAccounts

(2){Extensions,v1beta1,*}组里有PodSecurityPolicies、DaemonSets、Deployments、Ingresses、ReplicaSets、ThirdPartyResources

(3){Storage,v1,*}组里有StorageClasses

         {Storage,V1beta1,*}组里有StorageClasses

(4){Apps,V1beta1,*}里面有Deployments、StatefulSets

(5){Autoscaling、V1、*}里面有HorizontalPodAutoscalers

         {Autoscaling、V2alpha1、*}里面有HorizontalPodAutoscalers

(6){Batch、V1、*}里面有Jobs

          {Batch,V2alpha1,*}里面有CronJobs

(7){Certificates、V1beta1、*}里面有CertificateSigningRequests

(8){Policy、V1beta1、*}里面有PodDisruptionBudgets

(9){Rbac、V1alpha1 | V1beta1、*}里面有ClusterRoles、ClusterRoleBindings、Roles、RoleBindings

(10){Settings、V1alpha1、*}里面有PodPresets

(11){Storage、V1|V1beta1、*}里面有StorageClasses

代码如core,一下军返回制定了group,version的resource indexInformer.

func (f *sharedInformerFactory) Apps() apps.Interface {
   return apps.New(f)
}
func (f *sharedInformerFactory) Autoscaling() autoscaling.Interface {
   return autoscaling.New(f)
}
func (f *sharedInformerFactory) Batch() batch.Interface {
   return batch.New(f)
}
func (f *sharedInformerFactory) Certificates() certificates.Interface {
   return certificates.New(f)
}
func (f *sharedInformerFactory) Core() core.Interface {
   return core.New(f)
}
func (f *sharedInformerFactory) Extensions() extensions.Interface {
   return extensions.New(f)
}
func (f *sharedInformerFactory) Policy() policy.Interface {
   return policy.New(f)
}
func (f *sharedInformerFactory) Rbac() rbac.Interface {
   return rbac.New(f)
}
func (f *sharedInformerFactory) Settings() settings.Interface {
   return settings.New(f)
}
func (f *sharedInformerFactory) Storage() storage.Interface {
   return storage.New(f)

podInformer怎么工作的?(or 各种informer怎么工作的):

    首先创建好podInformer后,调用podeindexinformer(shareindexinform)的addHandler后,函数里创建listener,把listener放入sharedIndexInformer.processor的listener切片中,然后run。所以每添加一个handler就创建一个listener,然后run和pop函数。

    listener.run函数一个for{}循环监听事件chan,添加就调用add函数,update就调用等。

    listener.pop函数一个for{}循环监听pendingnotifications切片,pop出第一个发送个chan,让run去处理。

    listener中的noticications是distribute函数分发到所有listener中。

下文红色部分为各类informer公共部分,都一样,只有处理函数不一样,需要我们创建controler自定义。

podInformer则config run创建reflector,将watch到的obj存入fifo队列,然后processloop读取队列,调用HandleDeltas函数将obj存储到podinformer的indexer和cacheMutationDetetor对象(用于deepcopy obj和cache中的obj是否相等检测)中,然后存储到调用上面distribute接口存入listener中。process run则调用所有的listener run和pop,读取indexer中的值,处理存储到tainmanager(podinformer.addeventhandler()中指定,初始化listener)中.。创建CacheMutationDetector对象用于判断cache的obj和copy的obj(通过在添加obj时deepcopy函数copy的)是否一样,他会一直到用deepEqual函数进行比较,不一样要发报警信息和pannic

indexer的处理:是一个threadSafeMap类型,items就是name:obj这样存储的,indexer存放的是key:func用于处理obj,存入indices中;indice会为每个func生成的值当做键,key当做值;举例子,key是“namespace”,func是获取pod的namespace的函数(pod可以在多个namespace下,所以返回值为多个),那么indices就是indices["namespace"]={index},index = map[string]Index {"default":set("namespace"), "kube-system":set("namespace")}

indexer的作用是用来筛选,根据关心的关键字删选item中的obj返回。比如:podinformer中的indexer中的item是全部的pod,我关心pod,而item中存储的是全部的pod,我想要default namespace下的pod,则就可以通过ByIndex(“namespace”,“default”)函数返回相应的pod列表。

type threadSafeMap struct {

    lock sync.RWMutex

    items map[string]interface{}

    // indexers maps a name to an IndexFunc

    indexers Indexers

    // indices maps a name to an Index

    indices Indices

}

所以podinformer(各类informer)启动了三个server,

一个controller,用于处理创建reflector,watch资源,向indexer中添加obj,并发送给各种listener,

一个是processor,用于各类listener监听队列,处理监听到的obj,调用资源处理函数,如node controller的pod informer把pod添加到tainmanager.  此部分为我们创建自己controller要实现的

一个是cacheMutationDetector是用来是进行cache和copy的obj是否一样,进行不停检测,报警和pannic.

shareIndexInformer的run函数:

创建fifo用于存储object,而其podeinformer 的indexer则就为fifo的knowedobject队列。     

   启动config,新建reflector,不停过一段时间就调用podInformer的listandwatch函数,将结果写入fifo(threadSafeMap)队列,存入item,key为func计算出来,value为obj,并把key写入queue。启动processLOOP函数不停地pop queue中的obj,调用HandleDeltas处理obj.

podInformer的indexer通过HandleDeltas函数进行添加,update,删除等操作。

type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}

   // indexers maps a name to an IndexFunc
   indexers Indexers
   // indices maps a name to an Index
   indices Indices
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
   cfg := &Config{
      Queue:            fifo,
      ListerWatcher:    s.listerWatcher,
      ObjectType:       s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError:     false,
      ShouldResync:     s.processor.shouldResync,
      Process: s.HandleDeltas,
   }
   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()
   s.stopCh = stopCh
   s.cacheMutationDetector.Run(stopCh)
   s.processor.run(stopCh)
   s.controller.Run(stopCh)
}

猜你喜欢

转载自blog.csdn.net/wasd12121212/article/details/82903288
k8s