kubernetes源码之watch包watch.go阅读理解一

watch.go文件阅读

emptyWatch,FakeWatcher,RaceFreeFakeWatcher三个对象
后两个对象均实现了写入对象和读取对象中Event事件(event事件包括add,modfiled,delete、err等类型)
只有最后一个设置了缓冲区DefaultChanSize200个。
FakeWatcher的方法:
NewFakeWithChanSize(size int, blocking bool) *FakeWatcher 设置缓冲区的大小
func (f *FakeWatcher) Stop()
func (f *FakeWatcher) IsStopped() bool
func (f *FakeWatcher) Reset()
func (f *FakeWatcher) ResultChan() <-chan Event
func (f *FakeWatcher) Add(obj runtime.Object)
func (f *FakeWatcher) Modify(obj runtime.Object)
func (f *FakeWatcher) Delete(lastValue runtime.Object)
func (f *FakeWatcher) Error(errValue runtime.Object)
func (f *FakeWatcher) Action(action EventType, obj runtime.Object)
RaceFreeFakeWatcher的方法:
func (f *RaceFreeFakeWatcher) Stop()
func (f *RaceFreeFakeWatcher) IsStopped() bool
func (f *RaceFreeFakeWatcher) Reset()
func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event
func (f *RaceFreeFakeWatcher) Add(obj runtime.Object)
func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object)
func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object)
func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object)
func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object)

接口

type Interface interface {
    Stop()
    ResultChan() <-chan Event
}

事件类型

type EventType string
const (
    Added    EventType = "ADDED"
    Modified EventType = "MODIFIED"
    Deleted  EventType = "DELETED"
    Error    EventType = "ERROR"
    DefaultChanSize int32 = 100
)

event 事件对象

event数据结构
type Event struct {
    //事件类型
    Type EventType
    //事件对象,rc,pod,svc,deploy等
    Object runtime.Object
}

type Object interface {
    GetObjectKind() schema.ObjectKind
}

type ObjectKind interface {
    // SetGroupVersionKind sets or clears the intended serialized kind of an object. Passing kind nil
    // should clear the current setting.
    SetGroupVersionKind(kind GroupVersionKind)
    // GroupVersionKind returns the stored group, version, and kind of an object, or nil if the object does
    // not expose or provide these fields.
    GroupVersionKind() GroupVersionKind
}

type GroupVersionKind struct {
    Group   string
    Version string
    Kind    string
}

第一个interface的实现emptyWatch

type emptyWatch chan Event
//emptyWatch 对Interface的实现
-------------------------------------
func (w emptyWatch) Stop() {}
func (w emptyWatch) ResultChan() <-chan Event {
    return chan Event(w)
}
-------------------------------------
创建一个空的watch
func NewEmptyWatch() Interface {
ch := make(chan Event)
close(ch)
return emptyWatch(ch)
}

func (w emptyWatch) Stop() {
}

第二个interface的实现FakeWatcher

不知道FakeWatcher该如何翻译,这个对象中有三个字段,一个管道类型中存放了result,一个字段的意思为是否停止为bool类型,还有一个锁
实现了对象的安全操作。对象有很多种方法,其中两个方法是对Interface的实现,stop方法关闭通道,FakeWatcher的状态变为停止状态。

type FakeWatcher struct {
    result  chan Event
    Stopped bool
    sync.Mutex
}
创建一个FakeWatcher, 一个管道,Stoped状态为false
func NewFake() *FakeWatcher {
    return &FakeWatcher{
        result: make(chan Event),
    }
}
设置管道的缓冲区大小
func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
    return &FakeWatcher{
        result: make(chan Event, size),
    }
}

// Stop implements Interface.Stop().
实现Interface的stop方法,首先锁定对象,然后关闭通道,修改stoped状态为true。
func (f *FakeWatcher) Stop() {
    f.Lock()
    defer f.Unlock()
    if !f.Stopped {
        glog.V(4).Infof("Stopping fake watcher.")
        close(f.result)
        f.Stopped = true
    }
}
判断FakeWatcher的状态是否是停止
func (f *FakeWatcher) IsStopped() bool {
    f.Lock()
    defer f.Unlock()
    return f.Stopped
}

// Reset prepares the watcher to be reused.
重置,准备让watcher使用。
func (f *FakeWatcher) Reset() {
    f.Lock()
    defer f.Unlock()
    f.Stopped = false
    f.result = make(chan Event)
}
从FakeWatcher中的管道读取数据,实现了Interface的ResultChan方法
func (f *FakeWatcher) ResultChan() <-chan Event {
    return f.result
}


// Add sends an add event.
添加一个Add事件。
func (f *FakeWatcher) Add(obj runtime.Object) {
    f.result <- Event{Added, obj}
}

// Modify sends a modify event.
添加一个修改事件
func (f *FakeWatcher) Modify(obj runtime.Object) {
    f.result <- Event{Modified, obj}
}
添加一个删除事件
// Delete sends a delete event.
func (f *FakeWatcher) Delete(lastValue runtime.Object) {
    f.result <- Event{Deleted, lastValue}
}

// Error sends an Error event.
添加一个错误事件
func (f *FakeWatcher) Error(errValue runtime.Object) {
    f.result <- Event{Error, errValue}
}
Action 可以替换以方法的使用
// Action sends an event of the requested type, for table-based testing.
func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
    f.result <- Event{action, obj}
}

第三个interface的实现RaceFreeFakeWatcher

RaceFreeFakeWatcher类似于FakeWatcher,只是在做任何操作的时候判断对象的状态(stopped)是否停止,如果不能写入的话则判断管道full
type RaceFreeFakeWatcher struct {
    result  chan Event
    Stopped bool
    sync.Mutex
}
func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
    f.Lock()
    defer f.Unlock()
    if !f.Stopped {
        select {
        case f.result <- Event{action, obj}:
            return
        default:
            panic(fmt.Errorf("channel full"))
        }
    }
}

在此感谢http://my.csdn.net/u013812710博主,从他的博客中学习到很多k8s的知识。

猜你喜欢

转载自blog.csdn.net/mofiu/article/details/76247917