【消息队列技术】nsq
2016-10-08 18:25
429 查看
关于nsq的启动:
1. in one shell, start nsqlookupd:
$ nsqlookupd
in another shell, start nsqd:
$ nsqd –lookupd-tcp-address=127.0.0.1:4160
in another shell, start nsqadmin:
$ nsqadmin –lookupd-http-address=127.0.0.1:4161
publish an initial message (creates the topic in the cluster, too):
$ curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/put?topic=test’
finally, in another shell, start nsq_to_file:
$ nsq_to_file –topic=test –output-dir=/tmp –lookupd-http-address=127.0.0.1:4161
publish more messages to nsqd:
$ curl -d ‘hello world 2’ ‘http://127.0.0.1:4151/put?topic=test’
$ curl -d ‘hello world 3’ ‘http://127.0.0.1:4151/put?topic=test’
to verify things worked as expected, in a web browser open http://127.0.0.1:4171/ to view thensqadmin UI and see statistics. Also, check the contents of the log files (test.*.log) written to /tmp.
其中nsqd的http端口为4151,tcp端口为4150,producer和consumer连接的是4150,nsqd负责接收producer产生的消息,并且让consumer从nsqd这里消费消息,nsqlookupd是一个负责管理拓扑信息的守护进程,nsqadmin是一个网页管理页面。
下面用GO实现producer:
下面实现consumer:
1. in one shell, start nsqlookupd:
$ nsqlookupd
in another shell, start nsqd:
$ nsqd –lookupd-tcp-address=127.0.0.1:4160
in another shell, start nsqadmin:
$ nsqadmin –lookupd-http-address=127.0.0.1:4161
publish an initial message (creates the topic in the cluster, too):
$ curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/put?topic=test’
finally, in another shell, start nsq_to_file:
$ nsq_to_file –topic=test –output-dir=/tmp –lookupd-http-address=127.0.0.1:4161
publish more messages to nsqd:
$ curl -d ‘hello world 2’ ‘http://127.0.0.1:4151/put?topic=test’
$ curl -d ‘hello world 3’ ‘http://127.0.0.1:4151/put?topic=test’
to verify things worked as expected, in a web browser open http://127.0.0.1:4171/ to view thensqadmin UI and see statistics. Also, check the contents of the log files (test.*.log) written to /tmp.
其中nsqd的http端口为4151,tcp端口为4150,producer和consumer连接的是4150,nsqd负责接收producer产生的消息,并且让consumer从nsqd这里消费消息,nsqlookupd是一个负责管理拓扑信息的守护进程,nsqadmin是一个网页管理页面。
下面用GO实现producer:
package main import( "os" "github.com/nsqio/go-nsq" ) func main(){ topic := os.Args[1] //指定话题 message := os.Args[2] //指定消息 p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) //第一个参数是nsqd的tcp端口,第二个参数是配置类Config,nsq.NewConfig()产生一个默认的Config if err != nil{ panic(err) } //发布消息时指定topic,消息要转换为字节数组 if err :=p.Publish(topic, []byte(message));err != nil{ panic(err) } }
下面实现consumer:
package main import( "os" "fmt" "github.com/nsqio/go-nsq" ) //定义一个结构体,实现HandleMessage方法,用于实现Handler接口 /* type Handler interface { HandleMessage(message *Message) error } */ type ConsumerT struct{ } func (*ConsumerT)HandleMessage(msg *nsq.Message)error{ fmt.Println(string(msg.Body)) return nil } func main(){ topic := os.Args[1] //指定话题 channel := os.Args[2] //指定通道,同一话题的不同通道会将消息进行分发;同一话题同一通道会将消息按序均分到消费者。 var consumer ConsumerT c, err := nsq.NewConsumer(topic,channel,nsq.NewConfig()) if err != nil{ panic(err) } //添加一个Handler c.AddHandler(&consumer) //连接到nsqd的tcp端口 if err := c.ConnectToNSQD("127.0.0.1:4150");err != nil{ panic(err) } <-make(chan bool) }
相关文章推荐
- 消息队列中间件的技术选型分析
- 消息队列技术和原理
- 消息队列中间件的技术选型分析
- Aliware-MQ消息队列技术架构与最佳实践
- 消息队列MQ技术的介绍和原理
- 使用HTTP发送消息(消息队列技术)
- 消息队列 redis vs nsq
- 消息队列技术终结者(一)—通俗深刻地认识JMS(即Java Message Service)
- 再谈消息队列技术
- 消息队列中间件的技术选型分析
- 消息队列技术之基本概念(转)
- Azure Messaging-ServiceBus Messaging消息队列技术系列4-复杂对象消息是否需要支持序列化和消息持久化
- Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执
- 消息队列技术介绍
- Azure Messaging-ServiceBus Messaging消息队列技术系列8-服务总线配额
- 消息队列技术
- Azure Messaging-ServiceBus Messaging消息队列技术系列1-基本概念和架构
- 消息队列技术的介绍和原理(MQ)
- Azure Messaging-ServiceBus Messaging消息队列技术系列4-复杂对象消息是否需要支持序列化和消息持久化
- 消息队列MQ技术的介绍和原理