您的位置:首页 > 编程语言 > Go语言

Beanstalkd的使用(Golang)

2015-08-16 20:25 666 查看
          最近需要引入一种新的消息队列,这个队列最好有专业、简单、消息不丢失等特性,但又不会引入过多的复杂性,

 特别是在目前单枪匹马的情况下。然后发现Beanstalkd看起来是我所需要的.

        Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 

 同时支持binlog.最后速度还可以。

 看了下源码,c语言代码量小而清晰.作者从07年维护到14年,Star也很高,质量应当是有保障的。

 队列作者提供了Go客户端,从作者项目列表中可以看到已经写了不少Go相关的东西,看来Go很受后台开发的欢迎。
 Beanstalkd 主页在这:  http://kr.github.io/beanstalkd

      写了个调用例子如下. 

   

/*
xcl (2015-8-15)
多TubeName 多消费者
*/

package main

import (
"fmt"
"github.com/kr/beanstalk"
"runtime"
"strings"
"time"
)

var (
TubeName1 string = "channel1"
TubeName2 string = "channel2"
)

func Producer(fname, tubeName string) {
if fname == "" || tubeName == "" {
return
}

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
if err != nil {
panic(err)
}
defer c.Close()

c.Tube.Name = tubeName
c.TubeSet.Name[tubeName] = true
fmt.Println(fname, " [Producer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name)

for i := 0; i < 5; i++ {
msg := fmt.Sprintf("for %s %d", tubeName, i)
c.Put([]byte(msg), 30, 0, 120*time.Second)
fmt.Println(fname, " [Producer] beanstalk put body:", msg)
//time.Sleep(1 * time.Second)
}

c.Close()
fmt.Println("Producer() end.")
}

func Consumer(fname, tubeName string) {
if fname == "" || tubeName == "" {
return
}

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
if err != nil {
panic(err)
}
defer c.Close()

c.Tube.Name = tubeName
c.TubeSet.Name[tubeName] = true

fmt.Println(fname, " [Consumer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name)

substr := "timeout"
for {
fmt.Println(fname, " [Consumer]///////////////////////// ")
//从队列中取出
id, body, err := c.Reserve(1 * time.Second)
if err != nil {
if !strings.Contains(err.Error(), substr) {
fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] err:", err, " id:", id)
}
continue
}
fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] job:", id, " body:", string(body))

//从队列中清掉
err = c.Delete(id)
if err != nil {
fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Delete err:", err, " id:", id)
} else {
fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Successfully deleted. id:", id)
}
fmt.Println(fname, " [Consumer]/////////////////////////")
//time.Sleep(1 * time.Second)
}
fmt.Println("Consumer() end. ")
}

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

go Producer("PA", TubeName1)
go Producer("PB", TubeName2)

go Consumer("CA", TubeName1)
go Consumer("CB", TubeName2)

time.Sleep(10 * time.Second)
}

/*
运行结果:

XCLdeiMac:src xcl$ clear
XCLdeiMac:src xcl$ go run testmq.go
CB  [Consumer] tubeName: channel2  c.Tube.Name: channel2
CA  [Consumer] tubeName: channel1  c.Tube.Name: channel1
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
PB  [Producer] tubeName: channel2  c.Tube.Name: channel2
PA  [Producer] tubeName: channel1  c.Tube.Name: channel1
PB  [Producer] beanstalk put body: for channel2 0
PA  [Producer] beanstalk put body: for channel1 0
CA  [Consumer] [ channel1 ] job: 47027  body: for channel1 0
CB  [Consumer] [ channel2 ] job: 47026  body: for channel2 0
PB  [Producer] beanstalk put body: for channel2 1
PA  [Producer] beanstalk put body: for channel1 1
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47026
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47027
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47028  body: for channel1 1
PA  [Producer] beanstalk put body: for channel1 2
CB  [Consumer] [ channel2 ] job: 47029  body: for channel2 1
PB  [Producer] beanstalk put body: for channel2 2
PA  [Producer] beanstalk put body: for channel1 3
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47028
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47029
PB  [Producer] beanstalk put body: for channel2 3
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
PB  [Producer] beanstalk put body: for channel2 4
CB  [Consumer] [ channel2 ] job: 47030  body: for channel2 2
CA  [Consumer] [ channel1 ] job: 47031  body: for channel1 2
PA  [Producer] beanstalk put body: for channel1 4
Producer() end.
Producer() end.
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47031
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47030
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] job: 47033  body: for channel2 3
CA  [Consumer] [ channel1 ] job: 47032  body: for channel1 3
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47033
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47032
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47034  body: for channel1 4
CB  [Consumer] [ channel2 ] job: 47035  body: for channel2 4
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47035
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47034
CB  [Consumer]/////////////////////////
XCLdeiMac:src xcl$
*/
 可用beanstool来查看队列状态

 


也可以参考我写下面两段,来查。


ar, er := c.ListTubes()
if er != nil {
fmt.Println("[Example]  er:", er)
} else {
for i, v := range ar {
fmt.Println("[Example] ListTubes  i:", i, " v:", v)
c.Tube.Name = v
id, body, err := c.Reserve(5 * time.Second)
if err != nil {
fmt.Println("[Example] err:", err, " name:", c.Tube.Name)
continue
} else {
fmt.Println("[Example] job:", id)
fmt.Println("[Example] body:", string(body))
}

}
}

func tubeStatus(c *beanstalk.Conn) {
fmt.Println("[tubeStatus]/////////////////////////")
fmt.Println("Tube(", c.Tube.Name, ") Stats:")
m, er := c.Tube.Stats()
if er != nil {
fmt.Println("[tubeStatus] err:", er)
} else {
for k, v := range m {
fmt.Println(k, " : ", v)
}
}
fmt.Println("[tubeStatus]/////////////////////////")
}
  从测试看,Beanstalkd 足以满足我现在的需求了.

BLOG: http://blog.csdn.net/xcl168
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息