您的位置:首页 > 编程语言 > Go语言

kubernetes源码之watch包streamwatcher.go阅读理解五

2017-08-07 10:15 956 查看

这是watcher包中最后一个文件了

streamwatcher.go先看数据结构

type Decoder interface {
Decode() (action EventType, object runtime.Object, err error)
Close()
}

type StreamWatcher struct {
sync.Mutex
source  Decoder
result  chan Event
stopped bool
}


再看看方法

StreamWatcher 有两 ResultChan()个stop()方法,实现Interface
func (sw *StreamWatcher) ResultChan() <-chan Event {
return sw.result
}
// Stop implements Interface.
func (sw *StreamWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
sw.Lock()
defer sw.Unlock()
if !sw.stopped {
sw.stopped = true
sw.source.Close()
}
}
实例化一个StreamWatcher并且有一个goroutine
func NewStreamWatcher(d Decoder) *StreamWatcher {
sw := &StreamWatcher{
source: d,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
}
go sw.receive()
return sw
}

func (sw *StreamWatcher) stopping() bool {
sw.Lock()
defer sw.Unlock()
return sw.stopped
}

// receive reads result from the decoder in a loop and sends down the result channel.
receive方法循环读取decoder的结果,然后发送到结果通道。
receive方法从decoder中使用Decode方法获取(action EventType, object runtime.Object, err error)
func (sw *StreamWatcher) receive() {
defer close(sw.result)
defer sw.Stop()
defer utilruntime.HandleCrash()
for {
action, obj, err := sw.source.Decode()
if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
msg := "Unable to decode an event from the watch stream: %v"
if net.IsProbableEOF(err) {
glog.V(5).Infof(msg, err)
} else {
glog.Errorf(msg, err)

4000
}
}
return
}
sw.result <- Event{
Type:   action,
Object: obj,
}
}
}


总结

说白了就是从Decoder读取数据然后发送给result
type StreamWatcher struct {
sync.Mutex
source  Decoder
result  chan Event
stopped bool


拓展,看看使用了NewStreamWatcher的文件

k8s.io\client-go\rest\request.go 中694行使用了这个函数 回头在仔细阅读以下这个文件
return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息