golang实时消息平台NSQ的使用
2017-02-13 19:54
176 查看
NSQ是什么
(本文作者 changjixiong,以下是正文)NSQ是一个实时消息平台,引用一段InfoQ上的介绍:
“NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。”
如何开始使用
这里有一个例子用来说明如何安装、启动以及发送与接收消息:An Example of Using NSQ From Go(地址:http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/)
构建消息的响应函数
如果单是用一个匿名函数来处理收到的消息显然是不够的,下面用代码来演示一下如果根据收到的消息来使用相应的处理函数。生产者
首先我们来创建生产者config := nsq.NewConfig() w, _ := nsq.NewProducer("127.0.0.1:4150", config) jsonData := []string{} jsonData = append(jsonData, ` { "func_name":"BarFuncAdd", "params":[0.5,0.51] }`) jsonData = append(jsonData, ` { "func_name":"FooFuncSwap", "params":["a","b"] }`) for _, j := range jsonData { w.Publish("Topic_json", []byte(j)) }
上面的代码向NSQ发送了2个json格式的消息,从字面上不难看出其目的是调用2个函数,分别是BarFuncAdd和FooFuncSwap。
消费者
现在我们来创建消费者config := nsq.NewConfig() config.DefaultRequeueDelay = 0 config.MaxBackoffDuration = 20 * time.Millisecond config.LookupdPollInterval = 1000 * time.Millisecond config.RDYRedistributeInterval = 1000 * time.Millisecond config.MaxInFlight = 2500 MakeConsumer("Topic_json", "ch", config, HandleJsonMessage)
MakeConsumer的定义如下:
func MakeConsumer(topic, channel string, config *nsq.Config, handle func(message *nsq.Message) error) { consumer, _ := nsq.NewConsumer(topic, channel, config) consumer.AddHandler(nsq.HandlerFunc(handle)) err := consumer.ConnectToNSQD("127.0.0.1:4150") if err != nil { log.Panic("Could not connect") } }
处理器函数
NSQ消息的处理器函数定义如下:func HandleJsonMessage(message *nsq.Message) error { resultJson := reflectinvoke.InvokeByJson([]byte(message.Body)) result := reflectinvoke.Response{} err := json.Unmarshal(resultJson, &result) if err != nil { return err } info := "HandleJsonMessage get a result\n" info += "raw:\n" + string(resultJson) + "\n" info += "function: " + result.FuncName + " \n" info += fmt.Sprintf("result: %v\n", result.Data) info += fmt.Sprintf("error: %d,%s\n\n", result.ErrorCode, reflectinvoke.ErrorMsg(result.ErrorCode)) fmt.Println(info) return nil }
功能函数
处理器函数根据收到的json数据通过反射最终调用了Foo的FooFuncSwap方法及Bar的BarFuncAdd方法。type Foo struct { } type Bar struct { } func (b *Bar) BarFuncAdd(argOne, argTwo float64) float64 { return argOne + argTwo } func (f *Foo) FooFuncSwap(argOne, argTwo string) (string, string) { return argTwo, argOne }
怎么调用的
reflectinvoke.InvokeByJson是如何根据形如:{ "func_name":"BarFuncAdd", "params":[0.5,0.51] }
的 json数据调用Bar.BarFuncAdd的?
请参考《golang通过反射使用json字符串调用struct的指定方法及返回json结果》(如果前面这段没有连接地址,那肯定是文章被爬虫干掉了连接,请找本文的原文阅读)
文中代码的完整内容在https://github.com/changjixiong/goNotes/tree/master/nsqNotes以及https://github.com/changjixiong/goNotes/tree/master/reflectinvoke中。
注意事项
同一个消息channel如果有多个消费者则消费者收到的消息是不确定的。例如,如果将文中的生产者运行一个实例,将消费者运行两个实例(命名为A,B),则会出现A收到2个消息或者B收到2个消息或者AB各收到一个消息。相关文章推荐
- 实时消息平台NSQ的特性
- 去中心化分布式服务实时消息平台-NSQ
- NSQ:分布式的实时消息平台
- NSQ:分布式的实时消息平台
- windows平台下使用LiteIDE交叉编译golang
- 使用Mkfifo和Script命令实现在Linux平台上实时演示
- iOS不使用第三方平台,发送推送消息
- [转载]使用node.js+socket.io搭建实时消息系统
- Golang 如何在windows平台下使用LiteIDE交叉编译linux执行程序
- 使用tomcat8.0.36实现的websocket技术,实现向单个以及全部用户实时推送消息的功能
- 平台实时弹框消息提醒SQL
- 消息称亚马逊明年推实时广告交易平台
- 实时CORBA平台TAO的安装和使用
- 58到家通用实时消息平台架构细节(Qcon2016)
- 58到家通用实时消息平台架构细节
- 58到家通用实时消息平台架构细节
- 使用jQuery Uploadify借助Dorado平台实现批量上传文件动态显示实时上传文件状态
- wince下钩子的使用-实时捕获按键消息
- golang:高性能消息队列moonmq的简单使用
- unity3d移动平台优化专题(2):不要使用实时光照