第二十五章:Go语言使用NSQ消息队列
文章目录
1. 概述
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件相关描述如下:
NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。 它促进了没有单点故障的分布式和分散式拓扑,从而实现了容错能力和高可用性,并提供了可靠的消息传递保证。 查看功能和保证。 从操作上讲,NSQ易于配置和部署(所有参数均在命令行上指定,并且编译的二进制文件不具有运行时相关性)。 为了获得最大的灵活性,它与数据格式无关(消息可以是JSON,MsgPack,协议缓冲区或其他任何东西)。 官方提供了Go和Python库(以及许多其他客户端库),并且,如果您有兴趣构建自己的库,则有一个协议规范。NSQ的特点
- 支持水平横向拓展(无缝添加更多节点到集群中)
- 部署配置容易,自带集群管理界面(nsqadmin)
- 提倡分布式拓扑,减少单点故障,提高容错
- 低延迟的消息传递
- 可靠的消交付保障保障 默认中消息都在内存中, nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
- 消息最少被投递一次
比较知名和常用的消息处理系统还有
2. 基础应用场景
我们知道一般的消息队列(Message Queue) 常用的场景有
系统解耦异步处理流量削峰消息通信等
3. 相关文档
- 项目地址 : https://github.com/nsqio/nsq
- 项目文档 英文: https://nsq.io/overview/design.html
- 下载地址: https://nsq.io/deployment/installing.html
- 客户端下载地址 : https://nsq.io/clients/client_libraries.html
4.安装操作
根据自己的操作平台下载解压即可
- 根据自己的操作系统下载对应的压缩包文件
- 解压压缩文件
- 进入解压后
bin目录中在
bin目录中我们能看到如下文件-rwxr-xr-x 1 captain 197121 5515776 8月 28 13:46 nsq_stat.exe* -rwxr-xr-x 1 captain 197121 5823488 8月 28 13:46 nsq_tail.exe* -rwxr-xr-x 1 captain 197121 5997568 8月 28 13:46 nsq_to_file.exe* -rwxr-xr-x 1 captain 197121 5923840 8月 28 13:46 nsq_to_http.exe* -rwxr-xr-x 1 captain 197121 5903872 8月 28 13:46 nsq_to_nsq.exe* -rwxr-xr-x 1 captain 197121 8787968 8月 28 13:46 nsqadmin.exe* -rwxr-xr-x 1 captain 197121 9108992 8月 28 13:46 nsqd.exe* -rwxr-xr-x 1 captain 197121 8384000 8月 28 13:46 nsqlookupd.exe* -rwxr-xr-x 1 captain 197121 5639680 8月 28 13:46 to_nsq.exe*
5. NSQ服务端基础组件介绍
5.1 nsqd
nsqd是一个守护进程负责
接收,排队,消息传递到客户端。 它可以独立运行,但通常由nsqlookupd实例的群集中配置(在这种情况下,它将能声明topics和发现channel)。 它侦听两个TCP端口,一个侦听客户端,另一个侦听HTTP API。 它可以选择在第三个端口上侦听HTTPS。
5.2 nsqlookupd
nsqlookupd是管理拓扑信息的守护程序。 客户端查询nsqlookupd以发现特定topic的nsqd生产者和nsqd节点广播topic和channel信息。
有两个接口:nsqd用于广播的TCP接口
客户端(nsqadmin)执行发现和管理操作的HTTP接口
5.3 nsqadmin
nsqadmin是一套 WEB管理界面,用来汇集集群的实时统计,并执行不同的管理任务。
重点提示:
NSQ还有许多功能组件,我们只介绍这三个(
nsqdnsqlookupdnsqadmin)最常用和主要的NSQ的所有组件都可以通过参数
-- help查看相关配置nsqd和nsqlookupd都有对应的http API ,需要使用的时候查看文档即可
6.操作NSQ
6.1 安装客户端
根据不同的开发语言选择不同的客户端
我们是使用Golang操作所以采用NSQ的官方提供客户端 go-nsq
go get -u github.com/nsqio/go-nsq
6.1 单机启动nsqd
默认启动的
nsqd监听 HTTP对应的4151端口和TCP对应的4150端口
$ ./nsqd [nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9) [nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825 [nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created [nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa) [nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151 [nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150
我们同样可以指定端口
$ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" [nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9) [nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825 [nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat [nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081 [nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080
这样我们就启动了一个
nsqd的实例
在NSQ中有两个非常重要的概念
topic和Channel我们看一下文档中的描述:
每个nsqd实例旨在一次处理多个数据流。这些数据流称为
“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。
topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。
topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。
channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
总而言之,消息是从
topic->channel多播的(每个channel都接收该topic的所有消息的副本),但从channel->消息消费者均匀分发(每个消费者都接收该频道的一部分消息)。
6.1.1 单NSQ的使用
编写一个消息生产者
nsq_single_product.go
package main import ( "fmt" "github.com/nsqio/go-nsq" "time" ) func main() { nsqAddr := "127.0.0.1:8081" conf :=nsq.NewConfig() p ,err := nsq.NewProducer(nsqAddr,conf) if err != nil { fmt.Println(err) return } for { message := "message :"+ time.Now().Format("2006-01-02 15:04:05") fmt.Println(message) // 发送消息 p.Publish("topic-demo1",[]byte(message)) time.Sleep(time.Second) } }
编写一个消息消费者
nsq_single_consumer.go
package main import ( "fmt" "github.com/nsqio/go-nsq" ) type NewHandler struct{} func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) { addr := msg.NSQDAddress message := string(msg.Body) fmt.Println(addr, message) return } func MyConsumers(topic, channel, addr string) { conf := nsq.NewConfig() new_consumer, err := nsq.NewConsumer(topic, channel, conf) if err != nil { } // 接收消息 new_handler := &NewHandler{} new_consumer.AddHandler(new_handler) err = new_consumer.ConnectToNSQD(addr) if err != nil { } } func main() { addr := "127.0.0.1:8081" go MyConsumers("topic-demo1", "channel-aa", addr) // 模拟多个从多个channel去消息 go MyConsumers("topic-demo1", "channel-bb", addr) select {} }
6.1.2 通过nsqadmin查看
启动
nsqadminnsqadmin的web界面默认监听了 4171端口
$ ./nsqadmin --nsqd-http-address="127.0.0.1:8080" [nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9) [nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171
我们在地址栏中输如
http://127.0.0.1:4171/就能看看管理界面
6.1.3 NSQ的单点结构
6.3 NSQ集群
6.3.1 启动NSQ各组件
构建一个NSQ的基础拓扑结构
我们可以简单的说
nsqlookupd是用来管理nsqd实例节点的
第一步
启动nsqlookupd启动的
nsqlookupd采用了默认配置 通过参数--help查看配置项
$ ./nsqlookupd [nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9) [nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160
第二步
添加
nsqd实例与前面的启动不同,需要带上参数
-lookupd-tcp-address
添加第一个实例
./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"
添加第二个实例
./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"
第三步
启动nsqadmin
与前面的也不同了需要带上参数
-lookupd-http-address
$ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"
在浏览器中访问
nsqadmin
6.3.2 NSQ的拓扑结构
- 在集群模式中,消息生产方发送消息给任意一个
nsqd实例都不影响- 消息的消费者需要通过nsqlookupd 查询nsqd的地址后才能获取消息
- 增加
nsqd节点完全不影响其他的节点
6.3.3 Go语言操作NSQ代码示例
消息生产者
nsq_cluster_product.go
package main import ( "bufio" "fmt" "github.com/nsqio/go-nsq" "log" "os" "strings" ) var pro *nsq.Producer func NewPro(addr string) (err error) { conf := nsq.NewConfig() pro, err = nsq.NewProducer(addr, conf) if err != nil { log.Println(err) return err } return nil } func main() { nsqAddr := "127.0.0.1:8091" err := NewPro(nsqAddr) if err != nil { fmt.Println(err) return }else{ fmt.Println("connect 127.0.0.1:8091 success") } // 读取标准输入 reader := bufio.NewReader(os.Stdin) for { // 读取所有内容直到遇见回车(\n) data, err := reader.ReadString('\n') if err != nil { fmt.Println("read data from stdin is field : ", err) continue } // 当输入q的时候退出 data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { break } err = pro.Publish("topic-demo1", []byte(data)) if err != nil { fmt.Println("nsq publish is field ", err) continue } } fmt.Println("exit !") }
消息消费者
nsq_cluster_consumer.go
package main import ( "fmt" "github.com/nsqio/go-nsq" ) type Handler struct{} func (m *Handler) HandleMessage(msg *nsq.Message) (err error) { addr := msg.NSQDAddress message := string(msg.Body) fmt.Println(addr, message) return } func NewConsumers(t string, c string, addr string) error { conf := nsq.NewConfig() nc, err := nsq.NewConsumer(t, c, conf) if err != nil { fmt.Println("create consumer failed err ", err) return err } consumer := &Handler{} nc.AddHandler(consumer) // 连接nsqlookupd if err:= nc.ConnectToNSQLookupd(addr);err!=nil{ fmt.Println("connect nsqlookupd failed ", err) return err } return nil } func main() { // 这是nsqlookupd的地址 addr := "127.0.0.1:4161" err := NewConsumers("topic-demo1", "channel-aa", addr) if err != nil { fmt.Println("new nsq consumer failed", err) return } select {} }
- 点赞
- 收藏
- 分享
- 文章举报
- 基于Python语言使用RabbitMQ消息队列(二)
- 基于Python语言使用RabbitMQ消息队列(六)
- go语言 消息队列NSQ基础安装及使用
- 一起talk C栗子吧(第九十八回:C语言实例--使用消息队列进行进程间通信二)
- go语言使用crc32得到网络消息的校验码
- 基于Python语言使用RabbitMQ消息队列(三)
- 基于Python语言使用RabbitMQ消息队列(四)
- RabbitMQ消息队列跨语言demo(go和python)
- 基于Python语言使用RabbitMQ消息队列(一)
- go使用redis消息队列
- 一起talk C栗子吧(第九十八回:C语言实例--使用消息队列进行进程间通信二)
- 一起talk C栗子吧(第九十七回:C语言实例--使用消息队列进行进程间通信一)
- 基于Python语言使用RabbitMQ消息队列(五)
- WRTOS系统消息队列的使用
- 使用消息队列 异步插入数据,能发送消息,但是无法读取消息
- 使用Application.DoEvents 处理消息队列中的消息
- 消息队列(Message Queue)简介及其使用
- 消息队列(Message Queue)简介及其使用
- 消息队列(Message Queue)简介及其使用
- IPC消息队列使用详细分析