Go语言goroutine并发处理
2017-02-18 20:07
477 查看
模拟并发事务处理:
使用waitGroup等待所有goroutine返回:
使用Timer加入等待超时的机制避免资源竞争导致死锁:
通过close方法向所有“sub goroutine”发送“关闭消息”:
package main import ( "fmt" "math/rand" "time" ) type job struct { jobID int load int //seconds needed to finish the job } const jobCount int = 20 const workerCount int = 5 var jobQueue = make(chan job, jobCount) var waitQueue = make(chan bool, workerCount) type worker struct { id int } func (w worker) doTheJob() { time.Sleep(time.Second * 5) for { select { case j := <- jobQueue: fmt.Println("Worker [", w.id, "] started doing job [", j.jobID, "], job load : ", j.load) time.Sleep(time.Second * time.Duration(j.load)) fmt.Println("Worker [", w.id, "] finished job [", j.jobID, "]") default: waitQueue <- true } } } func main() { var workers [workerCount]worker for i := 0; i < workerCount; i++ { workers[i] = worker{id: i + 1} go workers[i].doTheJob() } for i := 0; i < jobCount; i++ { j := job{jobID: i + 1, load: rand.Intn(10)} jobQueue <- j time.Sleep(time.Second) } for i := 0; i < workerCount; i++ { <- waitQueue } close(jobQueue) close(waitQueue) } |
使用waitGroup等待所有goroutine返回:
package main import ( "fmt" "math/rand" "time" "sync" ) func main() { var wg sync.WaitGroup const total int = 10 wg.Add(total) for i := 0; i < 10; i++ { go func(i int) { defer wg.Done() fmt.Println("Start job", i) time.Sleep(time.Second * time.Duration(rand.Intn(10))) fmt.Println("Stop job", i) } (i) } wg.Wait() } |
使用Timer加入等待超时的机制避免资源竞争导致死锁:
package main import ( "fmt" "time" "math/rand" "sync" ) const CUSTOMER_REQUEST_TIMEOUT int = 30 const FRONTEND_REQUEST_TIMEOUT int = 60 const BACKEND_RESPONSE_TIMEOUT int = 60 type customerRequest struct { message string // request message id int // customer id load int } type frontendRequest struct { message string // modified message id int // customer id load int } type backendResponse struct { id int // customer id message string // response msg } type customer struct { id int } type frontendWorker struct { id int } type backendWorker struct { id int } func (c customer) sendRequests(customerRequestQueue chan customerRequest) { t := time.NewTimer(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT)) for { r := customerRequest { id: c.id, message: "[Customer][" + fmt.Sprintf("%d", c.id) + "] request", load: rand.Intn(5) + 1 } select { case customerRequestQueue <- r: fmt.Println("Customer", c.id, "initialized a request") case <- t.C: fmt.Println("Customer request timeout, customer id:", c.id) t.Reset(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT)) } time.Sleep(time.Second * time.Duration(rand.Intn(3)+1)) // every 1~4 second a customer will raise a request } } func (w frontendWorker) process(customerRequestQueue chan customerRequest, frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) { t := time.NewTimer(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT)) for { select { case x := <- customerRequestQueue: fmt.Println("[Frontend]", w.id, ": Customer request received, customer id:", x.id) time.Sleep(time.Second * time.Duration(x.load)) fmt.Println("[Frontend]", w.id, ": Customer request processed, customer id:", x.id) r := frontendRequest { id: x.id, message: "[Frontend]" + x.message, load: x.load + 2 } frontendRequestQueue <- r case x := <- backendResponseQueue: fmt.Println(x.message, "handled") case <- t.C: fmt.Println("Frontend request timeout, frontend worker id:", w.id) t.Reset(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT)) } } } func (w backendWorker) process(frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) { t := time.NewTimer(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT)) for { select { case x := <- frontendRequestQueue: fmt.Println("[Backend]", w.id, ": Frontend request received, customer id:", x.id) time.Sleep(time.Second * time.Duration(x.load)) fmt.Println("[Backend]", w.id, ": Frontend request processed, customer id:", x.id) r := backendResponse { id: x.id, message: "[Backend]" + x.message } backendResponseQueue <- r case <- t.C: fmt.Println("Backend response timeout, backend worker id:", w.id) t.Reset(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT)) } } } func main() { var wg sync.WaitGroup wg.Add(1) customerRequestQueue := make(chan customerRequest, 10) frontendRequestQueue := make(chan frontendRequest, 20) backendResponseQueue := make(chan backendResponse, 20) for i := 0; i < 100; i++ { c := customer { id: i+1 } go c.sendRequests(customerRequestQueue) } for i := 0; i < 10; i++ { w := frontendWorker { id: i+1 } go w.process(customerRequestQueue, frontendRequestQueue, backendResponseQueue) } for i := 0; i < 20; i++ { w := backendWorker { id: i+1 } go w.process(frontendRequestQueue, backendResponseQueue) } wg.Wait() // wait forever } |
通过close方法向所有“sub goroutine”发送“关闭消息”:
package main import ( "fmt" "time" "math/rand" "sync" ) const TASKTIME int = 10 type parentProcess struct { id int } type childProcess struct { id int } func (p parentProcess) process(childCount int, w *sync.WaitGroup) { outQueue := make(chan string, 20) closeQueue := make(chan bool) t := time.NewTimer(time.Second * time.Duration(TASKTIME)) var wg sync.WaitGroup wg.Add(childCount) for i := 0; i < childCount; i++ { var c childProcess c = childProcess{id: i+1} go c.process(outQueue, closeQueue, &wg) } for { select { case x := <- outQueue: fmt.Println(p.id, "=>", x) case <- t.C: t.Stop() close(closeQueue) fmt.Println(p.id, "waiting for all children to quit") wg.Wait() close(outQueue) w.Done() fmt.Println(p.id, "quit") return } } } func (c childProcess) process(outQueue chan string, closeQueue chan bool, wg *sync.WaitGroup) { for { select { case outQueue <- fmt.Sprintf("%d produced %d", c.id, rand.Intn(1000)): time.Sleep(time.Second) case <-closeQueue: wg.Done() return } } } func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) var p parentProcess p = parentProcess{id: i+100} // parent Id starts from 100 go p.process(rand.Intn(5)+5, &wg) } wg.Wait() } |
相关文章推荐
- go语言中的并发处理
- Go语言如何并发超时处理详解
- GO语言的进阶之路-goroutine(并发)
- go 语言并发机制 goroutine 初探
- Go语言并发与并行学习笔记(三)
- Go语言_并发篇
- Go 语言的错误处理机制引发争议
- Go语言Windows程序设计(4)--处理子窗体消息之按钮点击事件
- Go语言并发与并行学习笔记(一)
- go语言中的defer、panic、recover处理异常
- Go语言学习笔记---并发
- Go语言并发与并行学习笔记(二)
- go语言学习笔记之并发编程
- Go语言并发与并行学习笔记(三)
- Go语言并发例子
- Go语言并发之美
- go语言中goroutine的使用
- Go语言_并发篇
- Go语言的并发
- Go语言并发与并行学习笔记(一)