您的位置:首页 > 编程语言 > Go语言

Go语言goroutine并发处理

2017-02-18 20:07 477 查看
模拟并发事务处理:

package main
 
import (
   
"fmt"
   
"math/rand"
   
"time"
)
 
type job struct {
   
jobID int
   
load int    //seconds needed to finish the job
}
 
const jobCount int = 20
const workerCount int = 5
var jobQueue = make(chan job, jobCount)
var waitQueue = make(chan bool, workerCount)
 
type worker struct {
   
id int
}
 
func (w worker) doTheJob() {
   
time.Sleep(time.Second * 5)
   
for {
       
select {
           
case j := <- jobQueue:
               
fmt.Println("Worker [", w.id, "] started doing job [", j.jobID, "], job load : ", j.load)
               
time.Sleep(time.Second * time.Duration(j.load))
               
fmt.Println("Worker [", w.id, "] finished job [", j.jobID, "]")
           
default:
               
waitQueue <- true
       
}
   
}
}
 
func main() {
   
var workers [workerCount]worker
   
for i := 0; i < workerCount; i++ {
       
workers[i] = worker{id: i + 1}
       
go workers[i].doTheJob()
   
}
   
for i := 0; i < jobCount; i++ {
       
j := job{jobID: i + 1, load: rand.Intn(10)}
       
jobQueue <- j
       
time.Sleep(time.Second)
   
}
   
for i := 0; i < workerCount; i++ {
       
<- waitQueue
   
}
 
   
close(jobQueue)
   
close(waitQueue)
}
 

使用waitGroup等待所有goroutine返回:

package main
 
import (
   
"fmt"
   
"math/rand"
   
"time"

"sync"
)
 
func main() {
   
var wg sync.WaitGroup
   
const total int = 10
   
wg.Add(total)
   
for i := 0; i < 10; i++ {
       
go func(i int) {
           
defer wg.Done()
           
fmt.Println("Start job", i)
           
time.Sleep(time.Second * time.Duration(rand.Intn(10)))
           
fmt.Println("Stop job", i)
       
} (i)
   
}
 
   
wg.Wait()
}
 

使用Timer加入等待超时的机制避免资源竞争导致死锁:

package main
 
import (
   
"fmt"
   
"time"
   
"math/rand"
   
"sync"
)
 
const CUSTOMER_REQUEST_TIMEOUT int = 30
const FRONTEND_REQUEST_TIMEOUT int = 60
const BACKEND_RESPONSE_TIMEOUT int = 60
 
type customerRequest struct {
   
message string    // request message
   
id int    // customer id
   
load int
}
 
type frontendRequest struct {
   
message string    // modified message
   
id int    // customer id
   
load int
}
 
type backendResponse struct {
   
id int    // customer id
   
message string    // response msg
}
 
type customer struct {
   
id int
}
 
type frontendWorker struct {
   
id int
}
 
type backendWorker struct {
   
id int
}
 
func (c customer) sendRequests(customerRequestQueue chan customerRequest) {
   
t := time.NewTimer(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT))
   
for {
       
r := customerRequest { id: c.id, message: "[Customer][" + fmt.Sprintf("%d", c.id) + "] request", load: rand.Intn(5) + 1 }
       
select {
           
case customerRequestQueue <- r:
               
fmt.Println("Customer", c.id, "initialized a request")
           
case <- t.C:
               
fmt.Println("Customer request timeout, customer id:", c.id)
               
t.Reset(time.Second * time.Duration(CUSTOMER_REQUEST_TIMEOUT))
       
}
       
time.Sleep(time.Second * time.Duration(rand.Intn(3)+1))   
// every 1~4 second a customer will raise a request
   
}
}
 
func (w frontendWorker) process(customerRequestQueue chan customerRequest, frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) {
   
t := time.NewTimer(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT))
   
for {
       
select {
           
case x := <- customerRequestQueue:
               
fmt.Println("[Frontend]", w.id, ": Customer request received, customer id:", x.id)
               
time.Sleep(time.Second * time.Duration(x.load))
               
fmt.Println("[Frontend]", w.id, ": Customer request processed, customer id:", x.id)
               
r := frontendRequest { id: x.id, message: "[Frontend]" + x.message, load: x.load + 2 }
               
frontendRequestQueue <- r
           
case x := <- backendResponseQueue:
               
fmt.Println(x.message, "handled")
           
case <- t.C:
               
fmt.Println("Frontend request timeout, frontend worker id:", w.id)
               
t.Reset(time.Second * time.Duration(FRONTEND_REQUEST_TIMEOUT))
       
}
   
}
}
 
func (w backendWorker) process(frontendRequestQueue chan frontendRequest, backendResponseQueue chan backendResponse) {
   
t := time.NewTimer(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT))
   
for {
       
select {
           
case x := <- frontendRequestQueue:
               
fmt.Println("[Backend]", w.id, ": Frontend request received, customer id:", x.id)
               
time.Sleep(time.Second * time.Duration(x.load))
               
fmt.Println("[Backend]", w.id, ": Frontend request processed, customer id:", x.id)
               
r := backendResponse { id: x.id, message: "[Backend]" + x.message }
               
backendResponseQueue <- r
           
case <- t.C:
               
fmt.Println("Backend response timeout, backend worker id:", w.id)
               
t.Reset(time.Second * time.Duration(BACKEND_RESPONSE_TIMEOUT))
       
}
   
}
}
 
func main() {
   
var wg sync.WaitGroup
   
wg.Add(1)
   
customerRequestQueue := make(chan customerRequest, 10)
   
frontendRequestQueue := make(chan frontendRequest, 20)
   
backendResponseQueue := make(chan backendResponse, 20)
   
for i := 0; i < 100; i++ {
       
c := customer { id: i+1 }
       
go c.sendRequests(customerRequestQueue)
   
}
   
for i := 0; i < 10; i++ {
       
w := frontendWorker { id: i+1 }
       
go w.process(customerRequestQueue, frontendRequestQueue, backendResponseQueue)
   
}
   
for i := 0; i < 20; i++ {
       
w := backendWorker { id: i+1 }
       
go w.process(frontendRequestQueue, backendResponseQueue)
   
}
   
wg.Wait()    // wait forever
}
 

通过close方法向所有“sub goroutine”发送“关闭消息”:

package main
 
import (
   
"fmt"
   
"time"
   
"math/rand"

"sync"
)
 
const TASKTIME int = 10
 
type parentProcess struct {
   
id int
}
 
type childProcess struct {
   
id int
}
 
func (p parentProcess) process(childCount int, w *sync.WaitGroup) {
   
outQueue := make(chan string, 20)
   
closeQueue := make(chan bool)
   
t := time.NewTimer(time.Second * time.Duration(TASKTIME))
   
var wg sync.WaitGroup
   
wg.Add(childCount)
   
for i := 0; i < childCount; i++ {
       
var c childProcess
       
c = childProcess{id: i+1}
       
go c.process(outQueue, closeQueue, &wg)
   
}
   
for {
       
select {
           
case x := <- outQueue:
               
fmt.Println(p.id, "=>", x)
           
case <- t.C:
               
t.Stop()
               
close(closeQueue)
               
fmt.Println(p.id, "waiting for all children to quit")
               
wg.Wait()
               
close(outQueue)
               
w.Done()
               
fmt.Println(p.id, "quit")
               
return
       
}
   
}
}
 
func (c childProcess) process(outQueue chan string, closeQueue chan bool, wg *sync.WaitGroup) {
   
for {
       
select {
           
case outQueue <- fmt.Sprintf("%d produced %d", c.id, rand.Intn(1000)):
               
time.Sleep(time.Second)
           
case <-closeQueue:
               
wg.Done()
               
return
       
}
   
}
}
 
func main() {
   
var wg sync.WaitGroup
   
for i := 0; i < 5; i++ {
       
wg.Add(1)
       
var p parentProcess
       
p = parentProcess{id: i+100}    // parent Id starts from 100
       
go p.process(rand.Intn(5)+5, &wg)
   
}
   
wg.Wait()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息