您的位置:首页 > 其它

【消息队列技术】nsq

2016-10-08 18:25 429 查看
关于nsq的启动:

1. in one shell, start nsqlookupd:

$ nsqlookupd

in another shell, start nsqd:

$ nsqd –lookupd-tcp-address=127.0.0.1:4160

in another shell, start nsqadmin:

$ nsqadmin –lookupd-http-address=127.0.0.1:4161

publish an initial message (creates the topic in the cluster, too):

$ curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/put?topic=test

finally, in another shell, start nsq_to_file:

$ nsq_to_file –topic=test –output-dir=/tmp –lookupd-http-address=127.0.0.1:4161

publish more messages to nsqd:

$ curl -d ‘hello world 2’ ‘http://127.0.0.1:4151/put?topic=test

$ curl -d ‘hello world 3’ ‘http://127.0.0.1:4151/put?topic=test

to verify things worked as expected, in a web browser open http://127.0.0.1:4171/ to view thensqadmin UI and see statistics. Also, check the contents of the log files (test.*.log) written to /tmp.

其中nsqd的http端口为4151,tcp端口为4150,producer和consumer连接的是4150,nsqd负责接收producer产生的消息,并且让consumer从nsqd这里消费消息,nsqlookupd是一个负责管理拓扑信息的守护进程,nsqadmin是一个网页管理页面。

下面用GO实现producer:

package main

import(
"os"
"github.com/nsqio/go-nsq"
)

func main(){
topic := os.Args[1]  //指定话题
message := os.Args[2]   //指定消息
p, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())  //第一个参数是nsqd的tcp端口,第二个参数是配置类Config,nsq.NewConfig()产生一个默认的Config
if err != nil{
panic(err)
}
//发布消息时指定topic,消息要转换为字节数组
if err :=p.Publish(topic, []byte(message));err != nil{
panic(err)
}
}


下面实现consumer:

package main

import(
"os"
"fmt"
"github.com/nsqio/go-nsq"
)
//定义一个结构体,实现HandleMessage方法,用于实现Handler接口
/*
type Handler interface {
HandleMessage(message *Message) error
}
*/
type ConsumerT struct{
}
func (*ConsumerT)HandleMessage(msg *nsq.Message)error{
fmt.Println(string(msg.Body))
return nil
}

func main(){
topic := os.Args[1]  //指定话题
channel := os.Args[2]  //指定通道,同一话题的不同通道会将消息进行分发;同一话题同一通道会将消息按序均分到消费者。
var consumer ConsumerT
c, err := nsq.NewConsumer(topic,channel,nsq.NewConfig())
if err != nil{
panic(err)
}
//添加一个Handler
c.AddHandler(&consumer)
//连接到nsqd的tcp端口
if err := c.ConnectToNSQD("127.0.0.1:4150");err != nil{
panic(err)
}
<-make(chan bool)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: