go 语言实践-goroutine+chan 并不是 CSP 的最佳方式
2017-12-11 15:51
706 查看
go语言的最大两个亮点,一个是协程,一个就是chan了。二者合体的典型应用CSP,基本就是大家认可的并行开发神器,确实简化了并行程序的开发难度,但个人并不是很推荐业务中直接面对这种硬编码。那么,本文的重点就是讨论硬编码面临的问题,以及相关的处理方案。
文中异步队列代码,具体参见:github 异步队列。不排除 github 代码库因为更新,而与文中描述不一致。
由于go的协程,为我们隐藏了多线程面对多任务调度的复杂性,开发者可以非常方便的写出无锁的异步数据调度业务。比较简单的常用写法是:
在实际业务中,我们会看到这个myChannel必须是作为代码中的硬编码存在。可能通过以下几个方式被调用:
以下代码片段展示了封装后,如何响应stop和interrupt:
1. 引入锁确保推送安全,导致推送速度较慢。
2. 引入了context包来处理即使退出信号,但channel存在未处理消息时,依然有很大几率继续执行几次业务操作,这是 select-case 的随机性造成的(还记得 select-case 生成随机数吗)。
3. goroutinue的内存开销大。
那么,go的goroutine+chan 这种模式之前,异步队列通信又是如何处理呢?比较常见的是通过bufferRing来提供一定缓冲数量的队列,而MPSC则提供无数量限制的方式。MPSC的全称是Multi-Produce Single-Consumer,是非常高效的无锁并发的多线程并发解决方案,很多时候用 MPSC 来做控制指令队列通道。go语言利用原子操作,实现非常简单:
如果我们的 Queue 封装的是mpsc,那么就是在 for 循环中先判断是否关闭,然后操作Pop,如果有数据,则调用接受者。这样,系统的退出响应就会非常简单:
尽管这段代码没有考虑处理大量业务时的cpu 调度(runtime.Gosched()),但基本上是业务可用的代码。我们可以看到:
1. 不需要select协程,整体上开销更小
2. 控制更加灵活,处理业务消息的同时,还能非常方便的响应控制消息(停止、暂停等)。
- chan更适合单生产者+单/多消费者。一旦是多个生产者,就存在重复关闭的隐患。
- 通过函数回调,异步队列回调,是更安全的基础组件的编码方式。
- 采用 CSP实现的队列,在进行系统控制时,比如 stop,及时性不如 mpsc或传统的bufferRing/List等数据结构。
- 取消协程,减少额外的内存开销。
- 消费者如果带状态,且需要进行独立于业务消息的系统信息响应时,mpsc是更为普遍的控制指令的结构。
简而言之,goroutine+channel 作为一两个人的小规模、短期代码问题不大。但作为团队的长期业务进行持续开发,还是建议原始的基础库,尽管没有高大上的名称,无法技术 show,但更能解决问题!
文中异步队列代码,具体参见:github 异步队列。不排除 github 代码库因为更新,而与文中描述不一致。
一.CSP是什么
CSP模型的全称为Communicating Sequential Processes,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。与函数式编程和actor模型类似,CSP模型也是正在复兴的古董。由于近来Go语言的兴起, CSP模型又流行起来。由于go的协程,为我们隐藏了多线程面对多任务调度的复杂性,开发者可以非常方便的写出无锁的异步数据调度业务。比较简单的常用写法是:
package main import ( "fmt" "time" ) //生产者 type Produce struct{ myChannel chan interface{} } func (p *Produce)send(any interface{}){ p.myChannel<-any } //消费者 type Consumer struct{ myChannel chan interface{} } func(c *Consumer)receive(){ go func(){ for data:=range c.myChannel{ fmt.Printf("receive: %v \n",data) } }() } func main(){ var mychan=make(chan interface{}) var consumer=&Consumer{mychan} consumer.receive() var pro=&Produce{mychan} pro.send("Msg") time.Sleep(time.Second*10) }
在实际业务中,我们会看到这个myChannel必须是作为代码中的硬编码存在。可能通过以下几个方式被调用:
1.对象属性初始化
比如上述代码,myChanel是消费者和生产者的共享变量,在其初始化的时候注入。2.函数参数注入
package main import ( "fmt" "time" ) //生产者 type Produce struct { } func (p *Produce) send(myChannel chan<- interface{}, any interface{}) { myChannel <- any } //消费者 type Consumer struct { } func (c *Consumer) receive(myChannel <-chan interface{}) { go func() { for data := range myChannel { fmt.Printf("receive: %v \n", data) } fmt.Println("receive stop!") }() } func main() { var mychan = make(chan interface{}) var consumer = &Consumer{} consumer.receive(mychan) var pro = &Produce{} pro.send(mychan, "Msg") close(mychan) time.Sleep(time.Second * 10) }
二 业务场景需要考虑的问题
基于前一节两种基础代码,我们很容易实现【M生产者:1消费者】的数据传递。但实际业务远不是这种教科书形式这么简单。CSP在其语法上定义了:STOP、SKIP、INTERRUPT、CONDITION、TIMEOUT等常用规则。如果在业务中来考虑,就非常麻烦,也容易出错。如果我们封装了goroutinue+chan的异步队列来封装,用回调的方式来驱动消费者的函数句柄,好比使用消息中间件一样,就会简化很多。1.对于STOP
相对于channel的关闭,go的chan是非常成熟的,可以在封装后,通过一个closeflag int32标签来规避重复关闭的问题。接受函数只需要正常的
for range channel即可,没特别的技术含量。
2.对于interrupt
对于立即中断,go1.7之后提供了context包,可以利用其cancelfunc和
cancelContext的特性,当调用
cancelfunc后,
cancelContext.Done()将会有一个消息发出,结合
select就可以从消息接受中退出
以下代码片段展示了封装后,如何响应stop和interrupt:
func (queue *queueCSP) receive(channel chan interface{}, cancelCtx context.Context, invoker ReceiveFunc) { //... for !stoppedByChannel && !stoppedByContext { //Notice:由于 select 的语法特点,即使 cancelCtx.Done的消息已经发出,但只要channel有值,则很可能无法立即退出循环 select { case msg := <-channel: if msg != nil { invoker(msg) } else { stoppedByChannel = true } case <-cancelCtx.Done(): stoppedByContext = true } } //release resource //... //...
3.TIMEOUT
参考上面的代码片段,如果加入time.Ticker,就可以轻易实现超时的控制,但发现超时后,将消息汇报、停止回调函数,则不在本文的范围。有兴趣的读者可以看看context的相关资料。
4.内存占用
通常情况下,一个channel+goroutinue要占用2k-8k的栈空间,100万个csp就是2-8G。很多业务中,这些都是闲置的。三、channel带来的额外问题及替换方案
前一节,尽管我们利用QueueCSP封装goroutinue+chan以杜绝业务层重复关闭的隐患,但也存在一些不尽如人意之处:1. 引入锁确保推送安全,导致推送速度较慢。
2. 引入了context包来处理即使退出信号,但channel存在未处理消息时,依然有很大几率继续执行几次业务操作,这是 select-case 的随机性造成的(还记得 select-case 生成随机数吗)。
3. goroutinue的内存开销大。
那么,go的goroutine+chan 这种模式之前,异步队列通信又是如何处理呢?比较常见的是通过bufferRing来提供一定缓冲数量的队列,而MPSC则提供无数量限制的方式。MPSC的全称是Multi-Produce Single-Consumer,是非常高效的无锁并发的多线程并发解决方案,很多时候用 MPSC 来做控制指令队列通道。go语言利用原子操作,实现非常简单:
type node struct { next *node val interface{} } type Queue struct { head, tail *node } func New() *Queue { q := &Queue{} stub := &node{} q.head = stub q.tail = stub return q } // 添加一条新的消息到队列的末尾 func (q *Queue) Push(x interface{}) { n := &node{val: x} prev := (*node)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(n))) prev.next = n } // 从队列中提取一条消息交付给消费者 func (q *Queue) Pop() interface{} { tail := q.tail next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) // acquire if next != nil { q.tail = next v := next.val next.val = nil return v } return nil } // 清空队列 func (q *Queue) Empty() bool { tail := q.tail next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) return next == nil }
如果我们的 Queue 封装的是mpsc,那么就是在 for 循环中先判断是否关闭,然后操作Pop,如果有数据,则调用接受者。这样,系统的退出响应就会非常简单:
func (queue *queueMPSC) run() { var msg interface{} defer func() { defer func() { if err := recover(); err != nil { log.Printf("[queue_mpsc] recovering reason is %+v. More detail:", err) log.Println(string(debug.Stack())) } }() }() for { //是否立即关闭 if atomic.LoadInt32(&queue.closed) == _CLOSED { return } //当完成消息处理再关闭时,系统发出poisonPill if msg = queue.userQueue.Pop(); msg != nil && msg != poisonPill { queue.invoker(msg) } else { return } } }
尽管这段代码没有考虑处理大量业务时的cpu 调度(runtime.Gosched()),但基本上是业务可用的代码。我们可以看到:
1. 不需要select协程,整体上开销更小
2. 控制更加灵活,处理业务消息的同时,还能非常方便的响应控制消息(停止、暂停等)。
四、总结
客观的说,go语言的goroutine+chan来构建 CSP 可以非常直观的理解业务之间的数据传送关系。但并不是唯一解,尤其是在中型应用中,考虑利用 atomic+基础数据结构的工具库,更是成熟的工程化选择,其原因是:- chan更适合单生产者+单/多消费者。一旦是多个生产者,就存在重复关闭的隐患。
- 通过函数回调,异步队列回调,是更安全的基础组件的编码方式。
- 采用 CSP实现的队列,在进行系统控制时,比如 stop,及时性不如 mpsc或传统的bufferRing/List等数据结构。
- 取消协程,减少额外的内存开销。
- 消费者如果带状态,且需要进行独立于业务消息的系统信息响应时,mpsc是更为普遍的控制指令的结构。
简而言之,goroutine+channel 作为一两个人的小规模、短期代码问题不大。但作为团队的长期业务进行持续开发,还是建议原始的基础库,尽管没有高大上的名称,无法技术 show,但更能解决问题!
相关文章推荐
- Go语言最佳实践—— 字符串
- 寻找多语言的最佳实践方式
- Go语言最佳实践——面向对象
- 探寻ASP.NET MVC鲜为人知的奥秘(3):寻找多语言的最佳实践方式
- 产品环境中 Go 语言的最佳实践
- 产品环境中 Go 语言的最佳实践
- golang实战使用gin+xorm搭建go语言web框架restgo详解6.4 推荐编程方式
- go语言实践-protoactor使用小结
- go语言快速入门:项目构建实践(21)
- go context专题(四)- context 最佳实践和相关争议
- go语言int类型转化成string类型的方式
- Go语言goroutine并发处理
- Go语言设计模式实践:迭代器(Iterator)
- Go语言关于chan理解的实验
- Web前端开发最佳实践(11):使用更严格的JavaScript编码方式,提高代码质量
- 海量数据处理的最佳语言是C,而不是C++,更不是JAVA
- Go语言的实时GC原理和实践
- GO语言的进程管理工具-实践
- 营销最佳的语言是自己的语言,而不是套用别人的话。(这需要自己平时的博客积累和思考)(在多做中多思考)
- Go语言创建、初始化数组的各种方式