您的位置:首页 > 编程语言 > Go语言

第二十五章:Go语言使用NSQ消息队列

2020-01-11 14:34 50 查看

文章目录

  • 6.操作NSQ
  • 6.3 NSQ集群
  • 1. 概述

    NSQ
    是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件

    相关描述如下:

    NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。 它促进了没有单点故障的分布式和分散式拓扑,从而实现了容错能力和高可用性,并提供了可靠的消息传递保证。 查看功能和保证。 从操作上讲,NSQ易于配置和部署(所有参数均在命令行上指定,并且编译的二进制文件不具有运行时相关性)。 为了获得最大的灵活性,它与数据格式无关(消息可以是JSON,MsgPack,协议缓冲区或其他任何东西)。 官方提供了Go和Python库(以及许多其他客户端库),并且,如果您有兴趣构建自己的库,则有一个协议规范。

    NSQ的特点

    • 支持水平横向拓展(无缝添加更多节点到集群中)
    • 部署配置容易,自带集群管理界面(nsqadmin)
    • 提倡分布式拓扑,减少单点故障,提高容错
    • 低延迟的消息传递
    • 可靠的消交付保障保障 默认中消息都在内存中, nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
    • 消息最少被投递一次

    比较知名和常用的消息处理系统还有

    RabbitMQ

    KafKa

    2. 基础应用场景

    我们知道一般的消息队列(Message Queue) 常用的场景有

    系统解耦
    异步处理
    流量削峰
    消息通信

    3. 相关文档

    1. 项目地址 : https://github.com/nsqio/nsq
    2. 项目文档 英文: https://nsq.io/overview/design.html
    3. 下载地址: https://nsq.io/deployment/installing.html
    4. 客户端下载地址 : https://nsq.io/clients/client_libraries.html

    4.安装操作

    根据自己的操作平台下载解压即可

    • 根据自己的操作系统下载对应的压缩包文件
    • 解压压缩文件
    • 进入解压后
      bin
      目录中

    bin
    目录中我们能看到如下文件

    -rwxr-xr-x 1 captain 197121 5515776 8月  28 13:46 nsq_stat.exe*
    -rwxr-xr-x 1 captain 197121 5823488 8月  28 13:46 nsq_tail.exe*
    -rwxr-xr-x 1 captain 197121 5997568 8月  28 13:46 nsq_to_file.exe*
    -rwxr-xr-x 1 captain 197121 5923840 8月  28 13:46 nsq_to_http.exe*
    -rwxr-xr-x 1 captain 197121 5903872 8月  28 13:46 nsq_to_nsq.exe*
    -rwxr-xr-x 1 captain 197121 8787968 8月  28 13:46 nsqadmin.exe*
    -rwxr-xr-x 1 captain 197121 9108992 8月  28 13:46 nsqd.exe*
    -rwxr-xr-x 1 captain 197121 8384000 8月  28 13:46 nsqlookupd.exe*
    -rwxr-xr-x 1 captain 197121 5639680 8月  28 13:46 to_nsq.exe*

    5. NSQ服务端基础组件介绍

    5.1 nsqd

    nsqd是一个守护进程负责

    接收
    ,
    排队
    ,
    消息传递
    到客户端。 它可以独立运行,但通常由nsqlookupd实例的群集中配置(在这种情况下,它将能声明
    topics
    和发现
    channel
    )。 它侦听两个TCP端口,一个侦听客户端,另一个侦听HTTP API。 它可以选择在第三个端口上侦听HTTPS。

    5.2 nsqlookupd

    nsqlookupd
    是管理拓扑信息的守护程序。 客户端查询nsqlookupd以发现特定
    topic
    nsqd
    生产者和
    nsqd
    节点广播
    topic
    channel
    信息。
    有两个接口:

    nsqd用于广播的TCP接口

    客户端(nsqadmin)执行发现和管理操作的HTTP接口

    5.3 nsqadmin

    nsqadmin
    是一套 WEB管理界面,用来汇集集群的实时统计,并执行不同的管理任务。

    重点提示:

    NSQ还有许多功能组件,我们只介绍这三个(

    nsqd
    nsqlookupd
    nsqadmin
    )最常用和主要的

    NSQ的所有组件都可以通过参数

    -- help
    查看相关配置

    nsqd
    nsqlookupd
    都有对应的http API ,需要使用的时候查看文档即可

    6.操作NSQ

    6.1 安装客户端

    根据不同的开发语言选择不同的客户端

    我们是使用Golang操作所以采用NSQ的官方提供客户端 go-nsq

    go get -u github.com/nsqio/go-nsq

    6.1 单机启动nsqd

    默认启动的

    nsqd
    监听 HTTP对应的4151端口和TCP对应的4150端口

    $ ./nsqd
    [nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9)
    [nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825
    [nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created
    [nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa)
    [nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
    [nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151
    [nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150

    我们同样可以指定端口

    $ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081"
    [nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9)
    [nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825
    [nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
    [nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081
    [nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080

    这样我们就启动了一个

    nsqd
    的实例

    在NSQ中有两个非常重要的概念

    topic
    Channel

    我们看一下文档中的描述:

    每个nsqd实例旨在一次处理多个数据流。这些数据流称为

    “topics”
    ,一个
    topic
    具有1个或多个
    “channels”
    。每个
    channel
    都会收到
    topic
    所有消息的副本,实际上下游的服务是通过对应的
    channel
    来消费
    topic
    消息。

    topic
    channel
    不是预先配置的。
    topic
    在首次使用时创建,方法是将其发布到指定
    topic
    ,或者订阅指定
    topic
    上的
    channel
    channel
    是通过订阅指定的
    channel
    在第一次使用时创建的。

    topic
    channel
    都相互独立地缓冲数据,防止缓慢的消费者导致其他
    chennel
    的积压(同样适用于
    topic
    级别)。

    channel
    可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

    总而言之,消息是从

    topic
    ->
    channel
    多播的(每个
    channel
    都接收该
    topic
    的所有消息的副本),但从
    channel
    ->
    消息消费者
    均匀分发(每个消费者都接收该频道的一部分消息)。

    6.1.1 单NSQ的使用

    编写一个消息生产者

    nsq_single_product.go

    package main
    
    import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "time"
    )
    func main() {
    nsqAddr := "127.0.0.1:8081"
    conf :=nsq.NewConfig()
    p ,err := nsq.NewProducer(nsqAddr,conf)
    if err != nil {
    fmt.Println(err)
    return
    }
    for  {
    message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
    fmt.Println(message)
    // 发送消息
    p.Publish("topic-demo1",[]byte(message))
    time.Sleep(time.Second)
    }
    
    }

    编写一个消息消费者

    nsq_single_consumer.go

    package main
    
    import (
    "fmt"
    "github.com/nsqio/go-nsq"
    )
    
    type NewHandler struct{}
    
    func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
    }
    func MyConsumers(topic, channel, addr string) {
    conf := nsq.NewConfig()
    new_consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
    
    }
    // 接收消息
    new_handler := &NewHandler{}
    new_consumer.AddHandler(new_handler)
    err = new_consumer.ConnectToNSQD(addr)
    if err != nil {
    
    }
    }
    func main() {
    addr := "127.0.0.1:8081"
    go MyConsumers("topic-demo1", "channel-aa", addr)
    // 模拟多个从多个channel去消息
    go MyConsumers("topic-demo1", "channel-bb", addr)
    select {}
    }
    6.1.2 通过nsqadmin查看

    启动

    nsqadmin

    nsqadmin
    的web界面默认监听了 4171端口

    $ ./nsqadmin --nsqd-http-address="127.0.0.1:8080"
    [nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
    [nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171

    我们在地址栏中输如

    http://127.0.0.1:4171/

    就能看看管理界面


    6.1.3 NSQ的单点结构

    6.3 NSQ集群

    6.3.1 启动NSQ各组件

    构建一个NSQ的基础拓扑结构

    我们可以简单的说

    nsqlookupd
    是用来管理
    nsqd
    实例节点的

    第一步
    启动

    nsqlookupd

    启动的

    nsqlookupd
    采用了默认配置 通过参数
    --help
    查看配置项

    $ ./nsqlookupd
    [nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
    [nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161
    [nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160

    第二步

    添加

    nsqd
    实例

    与前面的启动不同,需要带上参数

    -lookupd-tcp-address

    添加第一个实例

    ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"

    添加第二个实例

    ./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"

    第三步

    启动nsqadmin

    与前面的也不同了需要带上参数

    -lookupd-http-address

    $ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"

    在浏览器中访问

    nsqadmin

    6.3.2 NSQ的拓扑结构

    1. 在集群模式中,消息生产方发送消息给任意一个
      nsqd
      实例都不影响
    2. 消息的消费者需要通过nsqlookupd 查询nsqd的地址后才能获取消息
    3. 增加
      nsqd
      节点完全不影响其他的节点
    6.3.3 Go语言操作NSQ代码示例

    消息生产者

    nsq_cluster_product.go

    package main
    
    import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "strings"
    )
    
    var pro *nsq.Producer
    
    func NewPro(addr string) (err error) {
    conf := nsq.NewConfig()
    pro, err = nsq.NewProducer(addr, conf)
    if err != nil {
    log.Println(err)
    return err
    }
    return nil
    }
    func main() {
    nsqAddr := "127.0.0.1:8091"
    err := NewPro(nsqAddr)
    if err != nil {
    fmt.Println(err)
    return
    }else{
    fmt.Println("connect 127.0.0.1:8091 success")
    }
    // 读取标准输入
    reader := bufio.NewReader(os.Stdin)
    for {
    // 读取所有内容直到遇见回车(\n)
    data, err := reader.ReadString('\n')
    if err != nil {
    fmt.Println("read data from stdin is field : ", err)
    continue
    }
    // 当输入q的时候退出
    data = strings.TrimSpace(data)
    if strings.ToUpper(data) == "Q" {
    break
    }
    err = pro.Publish("topic-demo1", []byte(data))
    if err != nil {
    fmt.Println("nsq publish is field ", err)
    continue
    }
    }
    fmt.Println("exit !")
    }

    消息消费者

    nsq_cluster_consumer.go

    package main
    
    import (
    "fmt"
    "github.com/nsqio/go-nsq"
    )
    
    type Handler struct{}
    
    func (m *Handler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
    }
    func NewConsumers(t string, c string, addr string) error {
    conf := nsq.NewConfig()
    nc, err := nsq.NewConsumer(t, c, conf)
    if err != nil {
    fmt.Println("create consumer failed err ", err)
    return err
    }
    consumer := &Handler{}
    nc.AddHandler(consumer)
    // 连接nsqlookupd
    if err:= nc.ConnectToNSQLookupd(addr);err!=nil{
    fmt.Println("connect nsqlookupd failed ", err)
    return err
    }
    return nil
    }
    func main() {
    // 这是nsqlookupd的地址
    addr := "127.0.0.1:4161"
    err := NewConsumers("topic-demo1", "channel-aa", addr)
    if err != nil {
    fmt.Println("new nsq consumer failed", err)
    return
    }
    select {}
    }
    • 点赞
    • 收藏
    • 分享
    • 文章举报
    Gcaptain 发布了28 篇原创文章 · 获赞 1 · 访问量 357 私信 关注
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: