您的位置:首页 > 编程语言 > Go语言

go 语言实践-goroutine+chan 并不是 CSP 的最佳方式

2017-12-11 15:51 706 查看
go语言的最大两个亮点,一个是协程,一个就是chan了。二者合体的典型应用CSP,基本就是大家认可的并行开发神器,确实简化了并行程序的开发难度,但个人并不是很推荐业务中直接面对这种硬编码。那么,本文的重点就是讨论硬编码面临的问题,以及相关的处理方案。

文中异步队列代码,具体参见:github 异步队列。不排除 github 代码库因为更新,而与文中描述不一致。

一.CSP是什么

CSP模型的全称为Communicating Sequential Processes,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。与函数式编程和actor模型类似,CSP模型也是正在复兴的古董。由于近来Go语言的兴起, CSP模型又流行起来。



由于go的协程,为我们隐藏了多线程面对多任务调度的复杂性,开发者可以非常方便的写出无锁的异步数据调度业务。比较简单的常用写法是:

package main

import (
"fmt"
"time"
)
//生产者
type Produce struct{
myChannel chan interface{}
}
func (p *Produce)send(any interface{}){
p.myChannel<-any
}
//消费者
type Consumer struct{
myChannel chan interface{}
}
func(c *Consumer)receive(){
go func(){
for data:=range c.myChannel{
fmt.Printf("receive: %v \n",data)
}
}()
}

func main(){
var mychan=make(chan interface{})
var consumer=&Consumer{mychan}
consumer.receive()
var pro=&Produce{mychan}
pro.send("Msg")

time.Sleep(time.Second*10)
}


在实际业务中,我们会看到这个myChannel必须是作为代码中的硬编码存在。可能通过以下几个方式被调用:

1.对象属性初始化

比如上述代码,myChanel是消费者和生产者的共享变量,在其初始化的时候注入。

2.函数参数注入

package main

import (
"fmt"
"time"
)

//生产者
type Produce struct {
}

func (p *Produce) send(myChannel chan<- interface{}, any interface{}) {
myChannel <- any
}

//消费者
type Consumer struct {
}

func (c *Consumer) receive(myChannel <-chan interface{}) {
go func() {
for data := range myChannel {
fmt.Printf("receive: %v \n", data)
}
fmt.Println("receive stop!")
}()
}

func main() {
var mychan = make(chan interface{})
var consumer = &Consumer{}
consumer.receive(mychan)
var pro = &Produce{}
pro.send(mychan, "Msg")

close(mychan)
time.Sleep(time.Second * 10)
}


二 业务场景需要考虑的问题

基于前一节两种基础代码,我们很容易实现【M生产者:1消费者】的数据传递。但实际业务远不是这种教科书形式这么简单。CSP在其语法上定义了:STOP、SKIP、INTERRUPT、CONDITION、TIMEOUT等常用规则。如果在业务中来考虑,就非常麻烦,也容易出错。如果我们封装了goroutinue+chan的异步队列来封装,用回调的方式来驱动消费者的函数句柄,好比使用消息中间件一样,就会简化很多。

1.对于STOP

相对于channel的关闭,go的chan是非常成熟的,可以在封装后,通过一个
closeflag int32
标签来规避重复关闭的问题。接受函数只需要正常的
for range channel
即可,没特别的技术含量。

2.对于interrupt

对于立即中断,go1.7之后提供了context包,可以利用其
cancelfunc
cancelContext
的特性,当调用
cancelfunc
后,
cancelContext.Done()
将会有一个消息发出,结合
select
就可以从消息接受中退出

以下代码片段展示了封装后,如何响应stop和interrupt:

func (queue *queueCSP) receive(channel chan interface{}, cancelCtx context.Context, invoker ReceiveFunc) {
//...
for !stoppedByChannel && !stoppedByContext {
//Notice:由于 select 的语法特点,即使 cancelCtx.Done的消息已经发出,但只要channel有值,则很可能无法立即退出循环
select {
case msg := <-channel:
if msg != nil {
invoker(msg)
} else {
stoppedByChannel = true
}
case <-cancelCtx.Done():
stoppedByContext = true
}
}
//release resource
//...
//...


3.TIMEOUT

参考上面的代码片段,如果加入
time.Ticker
,就可以轻易实现超时的控制,但发现超时后,将消息汇报、停止回调函数,则不在本文的范围。有兴趣的读者可以看看context的相关资料。

4.内存占用

通常情况下,一个channel+goroutinue要占用2k-8k的栈空间,100万个csp就是2-8G。很多业务中,这些都是闲置的。

三、channel带来的额外问题及替换方案

前一节,尽管我们利用QueueCSP封装goroutinue+chan以杜绝业务层重复关闭的隐患,但也存在一些不尽如人意之处:

1. 引入锁确保推送安全,导致推送速度较慢。

2. 引入了context包来处理即使退出信号,但channel存在未处理消息时,依然有很大几率继续执行几次业务操作,这是 select-case 的随机性造成的(还记得 select-case 生成随机数吗)。

3. goroutinue的内存开销大。

那么,go的goroutine+chan 这种模式之前,异步队列通信又是如何处理呢?比较常见的是通过bufferRing来提供一定缓冲数量的队列,而MPSC则提供无数量限制的方式。MPSC的全称是Multi-Produce Single-Consumer,是非常高效的无锁并发的多线程并发解决方案,很多时候用 MPSC 来做控制指令队列通道。go语言利用原子操作,实现非常简单:

type node struct {
next *node
val  interface{}
}

type Queue struct {
head, tail *node
}

func New() *Queue {
q := &Queue{}
stub := &node{}
q.head = stub
q.tail = stub
return q
}

// 添加一条新的消息到队列的末尾
func (q *Queue) Push(x interface{}) {
n := &node{val: x}
prev := (*node)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(n)))
prev.next = n
}

// 从队列中提取一条消息交付给消费者
func (q *Queue) Pop() interface{} {
tail := q.tail
next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) // acquire
if next != nil {
q.tail = next
v := next.val
next.val = nil
return v
}
return nil
}

// 清空队列
func (q *Queue) Empty() bool {
tail := q.tail
next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next))))
return next == nil
}


如果我们的 Queue 封装的是mpsc,那么就是在 for 循环中先判断是否关闭,然后操作Pop,如果有数据,则调用接受者。这样,系统的退出响应就会非常简单:

func (queue *queueMPSC) run() {
var msg interface{}

defer func() {
defer func() {
if err := recover(); err != nil {
log.Printf("[queue_mpsc] recovering reason is %+v. More detail:", err)
log.Println(string(debug.Stack()))
}
}()
}()
for  {
//是否立即关闭
if atomic.LoadInt32(&queue.closed) == _CLOSED {
return
}
//当完成消息处理再关闭时,系统发出poisonPill
if msg = queue.userQueue.Pop(); msg != nil && msg != poisonPill {
queue.invoker(msg)
} else {
return
}
}
}


尽管这段代码没有考虑处理大量业务时的cpu 调度(runtime.Gosched()),但基本上是业务可用的代码。我们可以看到:

1. 不需要select协程,整体上开销更小

2. 控制更加灵活,处理业务消息的同时,还能非常方便的响应控制消息(停止、暂停等)。

四、总结

客观的说,go语言的goroutine+chan来构建 CSP 可以非常直观的理解业务之间的数据传送关系。但并不是唯一解,尤其是在中型应用中,考虑利用 atomic+基础数据结构的工具库,更是成熟的工程化选择,其原因是:

- chan更适合单生产者+单/多消费者。一旦是多个生产者,就存在重复关闭的隐患。

- 通过函数回调,异步队列回调,是更安全的基础组件的编码方式。

- 采用 CSP实现的队列,在进行系统控制时,比如 stop,及时性不如 mpsc或传统的bufferRing/List等数据结构。

- 取消协程,减少额外的内存开销。

- 消费者如果带状态,且需要进行独立于业务消息的系统信息响应时,mpsc是更为普遍的控制指令的结构。

简而言之,goroutine+channel 作为一两个人的小规模、短期代码问题不大。但作为团队的长期业务进行持续开发,还是建议原始的基础库,尽管没有高大上的名称,无法技术 show,但更能解决问题!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: