kubernetes源码之watch包streamwatcher.go阅读理解五
2017-08-07 10:15
956 查看
这是watcher包中最后一个文件了
streamwatcher.go先看数据结构
type Decoder interface { Decode() (action EventType, object runtime.Object, err error) Close() } type StreamWatcher struct { sync.Mutex source Decoder result chan Event stopped bool }
再看看方法
StreamWatcher 有两 ResultChan()个stop()方法,实现Interface func (sw *StreamWatcher) ResultChan() <-chan Event { return sw.result } // Stop implements Interface. func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() if !sw.stopped { sw.stopped = true sw.source.Close() } } 实例化一个StreamWatcher并且有一个goroutine func NewStreamWatcher(d Decoder) *StreamWatcher { sw := &StreamWatcher{ source: d, // It's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), } go sw.receive() return sw } func (sw *StreamWatcher) stopping() bool { sw.Lock() defer sw.Unlock() return sw.stopped } // receive reads result from the decoder in a loop and sends down the result channel. receive方法循环读取decoder的结果,然后发送到结果通道。 receive方法从decoder中使用Decode方法获取(action EventType, object runtime.Object, err error) func (sw *StreamWatcher) receive() { defer close(sw.result) defer sw.Stop() defer utilruntime.HandleCrash() for { action, obj, err := sw.source.Decode() if err != nil { // Ignore expected error. if sw.stopping() { return } switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: msg := "Unable to decode an event from the watch stream: %v" if net.IsProbableEOF(err) { glog.V(5).Infof(msg, err) } else { glog.Errorf(msg, err) 4000 } } return } sw.result <- Event{ Type: action, Object: obj, } } }
总结
说白了就是从Decoder读取数据然后发送给result type StreamWatcher struct { sync.Mutex source Decoder result chan Event stopped bool
拓展,看看使用了NewStreamWatcher的文件
k8s.io\client-go\rest\request.go 中694行使用了这个函数 回头在仔细阅读以下这个文件 return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
相关文章推荐
- kubernetes源码之watch包filter.go阅读理解三
- kubernetes源码之watch包until.go阅读理解四
- FutureTask源码阅读与理解
- Go的http源码阅读笔记
- 简便使用jQuery-源码阅读全局架构设计的理解
- go-ethereum源码阅读环境
- nsq源码阅读 nsqd源码三 tcp.go
- [Chrome源码阅读] 理解ObserverList类的实现技巧
- [SDS阅读理解/9]源码中的函数/6
- 理解ThreadPoolExecutor源码(二)execute函数的巧妙设计和阅读心得
- Go语言Http Server源码阅读
- 黑马程序员【阅读源码理解String对象的不变性】
- Deep Compression阅读理解及Caffe源码修改
- Android源码阅读与理解(一):开篇
- nsq源码阅读 nsqlookupd源码一 nsqlookupd.go
- StreamCQL源码阅读(3) 拆分组合算子
- Open vSwitch源码阅读【转】及自己的理解【稍后更新】
- kubernetes源码阅读及编译
- Deep Compression阅读理解及Caffe源码修改
- Android源码阅读与理解(二):Android系统Build流程详解