以nodecontroller为例:
一、先创建sharedInformers供所有controler使用,是一个工厂Informer.(sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()))。其成员有[]sharedIndexInformer切片。
二、每次创建一个controler如node controler、rccontroler等,都会生成一个相应的sharedIndexInformer(namespaceInformer、podInformer等),每种sharedindexInformer中有对应的listwatch函数。
如pod index informer:
func newPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
},
},
&api_v1.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
三、podInformer通过向nodecontroler的taintManager进行更新、添加、删除。(所有的controler都差不多)
shareInformerFactory种类很多:大(组)类分为APP、autoScaling、core、Extensions、Storage等,每个组里再细分。
分为{group,version,resource}.(一下部分{,,}代表g,v,r,其中*代表下面的资源)所有资源的IndexInformer中都有Lister()(不同版本的lister()函数不一样,informer一样)、Informer()函数。不同版本成员lister watcher函数调用的不一样,主要是调用不同版本的函数。
lister()函数就是返回shareIndexInformer的indexer
informer()函数就是将自己将生成的podindexinformer加入到shareinformer的informer数组中,并返回podindexinformer
(1)如{core,v1,*}组里ComponentStatuses、ConfigMaps、Endpoints、Events、Namespaces、LimitRanges、Nodes、PersistentVolumes、PersistentVolumeClaims、Pods、PodTemplates、ReplicationControllers、ResourceQuotas、Secrets、Services、ServiceAccounts
(2){Extensions,v1beta1,*}组里有PodSecurityPolicies、DaemonSets、Deployments、Ingresses、ReplicaSets、ThirdPartyResources
(3){Storage,v1,*}组里有StorageClasses
{Storage,V1beta1,*}组里有StorageClasses
(4){Apps,V1beta1,*}里面有Deployments、StatefulSets
(5){Autoscaling、V1、*}里面有HorizontalPodAutoscalers
{Autoscaling、V2alpha1、*}里面有HorizontalPodAutoscalers
(6){Batch、V1、*}里面有Jobs
{Batch,V2alpha1,*}里面有CronJobs
(7){Certificates、V1beta1、*}里面有CertificateSigningRequests
(8){Policy、V1beta1、*}里面有PodDisruptionBudgets
(9){Rbac、V1alpha1 | V1beta1、*}里面有ClusterRoles、ClusterRoleBindings、Roles、RoleBindings
(10){Settings、V1alpha1、*}里面有PodPresets
(11){Storage、V1|V1beta1、*}里面有StorageClasses
代码如core,一下军返回制定了group,version的resource indexInformer.
func (f *sharedInformerFactory) Apps() apps.Interface {
return apps.New(f)
}
func (f *sharedInformerFactory) Autoscaling() autoscaling.Interface {
return autoscaling.New(f)
}
func (f *sharedInformerFactory) Batch() batch.Interface {
return batch.New(f)
}
func (f *sharedInformerFactory) Certificates() certificates.Interface {
return certificates.New(f)
}
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f)
}
func (f *sharedInformerFactory) Extensions() extensions.Interface {
return extensions.New(f)
}
func (f *sharedInformerFactory) Policy() policy.Interface {
return policy.New(f)
}
func (f *sharedInformerFactory) Rbac() rbac.Interface {
return rbac.New(f)
}
func (f *sharedInformerFactory) Settings() settings.Interface {
return settings.New(f)
}
func (f *sharedInformerFactory) Storage() storage.Interface {
return storage.New(f)
podInformer怎么工作的?(or 各种informer怎么工作的):
首先创建好podInformer后,调用podeindexinformer(shareindexinform)的addHandler后,函数里创建listener,把listener放入sharedIndexInformer.processor的listener切片中,然后run。所以每添加一个handler就创建一个listener,然后run和pop函数。
listener.run函数一个for{}循环监听事件chan,添加就调用add函数,update就调用等。
listener.pop函数一个for{}循环监听pendingnotifications切片,pop出第一个发送个chan,让run去处理。
listener中的noticications是distribute函数分发到所有listener中。
下文红色部分为各类informer公共部分,都一样,只有处理函数不一样,需要我们创建controler自定义。
podInformer则config run创建reflector,将watch到的obj存入fifo队列,然后processloop读取队列,调用HandleDeltas函数将obj存储到podinformer的indexer和cacheMutationDetetor对象(用于deepcopy obj和cache中的obj是否相等检测)中,然后存储到调用上面distribute接口存入listener中。process run则调用所有的listener run和pop,读取indexer中的值,处理存储到tainmanager(podinformer.addeventhandler()中指定,初始化listener)中.。创建CacheMutationDetector对象用于判断cache的obj和copy的obj(通过在添加obj时deepcopy函数copy的)是否一样,他会一直到用deepEqual函数进行比较,不一样要发报警信息和pannic
indexer的处理:是一个threadSafeMap类型,items就是name:obj这样存储的,indexer存放的是key:func用于处理obj,存入indices中;indice会为每个func生成的值当做键,key当做值;举例子,key是“namespace”,func是获取pod的namespace的函数(pod可以在多个namespace下,所以返回值为多个),那么indices就是indices["namespace"]={index},index = map[string]Index {"default":set("namespace"), "kube-system":set("namespace")}
indexer的作用是用来筛选,根据关心的关键字删选item中的obj返回。比如:podinformer中的indexer中的item是全部的pod,我关心pod,而item中存储的是全部的pod,我想要default namespace下的pod,则就可以通过ByIndex(“namespace”,“default”)函数返回相应的pod列表。
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
所以podinformer(各类informer)启动了三个server,
一个controller,用于处理创建reflector,watch资源,向indexer中添加obj,并发送给各种listener,
一个是processor,用于各类listener监听队列,处理监听到的obj,调用资源处理函数,如node controller的pod informer把pod添加到tainmanager. 此部分为我们创建自己controller要实现的
一个是cacheMutationDetector是用来是进行cache和copy的obj是否一样,进行不停检测,报警和pannic.
shareIndexInformer的run函数:
创建fifo用于存储object,而其podeinformer 的indexer则就为fifo的knowedobject队列。
启动config,新建reflector,不停过一段时间就调用podInformer的listandwatch函数,将结果写入fifo(threadSafeMap)队列,存入item,key为func计算出来,value为obj,并把key写入queue。启动processLOOP函数不停地pop queue中的obj,调用HandleDeltas处理obj.
podInformer的indexer通过HandleDeltas函数进行添加,update,删除等操作。
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
s.stopCh = stopCh
s.cacheMutationDetector.Run(stopCh)
s.processor.run(stopCh)
s.controller.Run(stopCh)
}