您的位置:首页 > 数据库 > SQL

nsq源码阅读 nsqlookupd源码四 lookup_protocol_v1.go

2017-05-03 17:36 751 查看
阅读nsqlookupd/tcp.go源码时,有一段代码:

err = prot.IOLoop(clientConn)

对TCP数据处理的核心逻辑,对应的代码就是nsqlookupd/lookup_protocol_v1.go,现在来阅读这里的代码:

package nsqlookupd

import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"sync/atomic"
"time"

"github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/version"
)

type LookupProtocolV1 struct {
ctx *Context
}

//实现internal/protocol/protocol.go中Protocol接口的IOLoop方法
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
var err error
var line string

//新建客户端实例, NewClientV1在nsqlookupd/client_v1.go文件中定义
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
//方法原型 func (b *Reader) ReadString(delim byte) (line string, err error)
//参数delim,传入定界符,这里表示读取一行数据
line, err = reader.ReadString('\n')
if err != nil {
break
}

//去除两边的空格
line = strings.TrimSpace(line)
//使用空格分割数据
//函数原型 func Split(s, sep string) []string
params := strings.Split(line, " ")

var response []byte
//调用LookupProtocolV1的Exec()方法
//查看Exec()方法源码可知,params第一个元素是动作的类型,共四种PING IDENTIFY REGISTER UNREGISTER
//当客户端初始化完后,首先发送IDENTIFY操作,辨认身份,再发送REGISTER 注册一个Registration category:client key: subkey:
//如果客户端新增一个topic,发送REGISTER,注册这个topic
//如果客户端删除一个topic,发送UNREGISTER,注销即删除这个注册信息
//TODO 客户端新增或删除channel时,是否调用REGISTER UNREGISTER操作,待确认
//客户端每15秒发送一个PING操作
response, err = p.Exec(client, reader, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx)

//将错误信息发送给客户端
_, sendErr := protocol.SendResponse(client, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
break
}

// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}

//Exec()方法返回了响应数据,将响应数据发送给客户端
if response != nil {
_, err = protocol.SendResponse(client, response)
if err != nil {
break
}
}
}

//如果上面的循环终止了(程序退出或致命错误),关闭客户端连接,并删除客户端的所有注册信息
conn.Close()
p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client)
if client.peerInfo != nil {
registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
for _, r := range registrations {
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
client, r.Category, r.Key, r.SubKey)
}
}
}
return err
}

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
switch params[0] {
case "PING":
return p.PING(client, params)
case "IDENTIFY":
return p.IDENTIFY(client, reader, params[1:])
case "REGISTER":
return p.REGISTER(client, reader, params[1:])
case "UNREGISTER":
return p.UNREGISTER(client, reader, params[1:])
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

func getTopicChan(command string, params []string) (string, string, error) {
if len(params) == 0 {
return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))
}

topicName := params[0]
var channelName string
if len(params) >= 2 {
channelName = params[1]
}

if !protocol.IsValidTopicName(topicName) {
return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))
}

if channelName != "" && !protocol.IsValidChannelName(channelName) {
return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))
}

return topicName, channelName, nil
}

//client新建topic时,执行REGISTER操作,注册信息
//TODO client新建channel时,是否执行REGISTER,待确认
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
//必须先执行IDENTIFY操作
if client.peerInfo == nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
}

//获取topic名称和channel名称
//猜测REGISTER时,client发送的消息为 "REGISTER topicName channelName"
topic, channel, err := getTopicChan("REGISTER", params)
if err != nil {
return nil, err
}

//注册信息到RegistrationDB
if channel != "" {
key := Registration{"channel", topic, channel}
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
client, "channel", topic, channel)
}
}
key := Registration{"topic", topic, ""}
if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
client, "topic", topic, "")
}

return []byte("OK"), nil
}

//注销
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
if client.peerInfo == nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
}

topic, channel, err := getTopicChan("UNREGISTER", params)
if err != nil {
return nil, err
}

if channel != "" {
key := Registration{"channel", topic, channel}
removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
if removed {
p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
client, "channel", topic, channel)
}
// for ephemeral channels, remove the channel as well if it has no producers
if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
p.ctx.nsqlookupd.DB.RemoveRegistration(key)
}
} else {
// no channel was specified so this is a topic unregistration
// remove all of the channel registrations...
// normally this shouldn't happen which is why we print a warning message
// if anything is actually removed
registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
for _, r := range registrations {
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
client, "channel", topic, r.SubKey)
}
}

key := Registration{"topic", topic, ""}
if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
client, "topic", topic, "")
}
}

return []byte("OK"), nil
}

//客户端初始化后,先进行IDENTIFY操作,验证身份,初始化peerInfo
//IDENTIFY操作在四个操作中最先执行,且只执行一次
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
var err error

//已经验证过身份了
if client.peerInfo != nil {
return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
}

//读取数据body的长度到bodyLen变量中
var bodyLen int32
err = binary.Read(reader, binary.BigEndian, &bodyLen)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
}

body := make([]byte, bodyLen)
//读取bodyLen长度的数据
_, err = io.ReadFull(reader, body)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
}

// body is a json structure with producer information
//将客户端的地址作为peerInfo的唯一标识符
peerInfo := PeerInfo{id: client.RemoteAddr().String()}
//将json格式的数据解析到peerInfo
err = json.Unmarshal(body, &peerInfo)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
}

peerInfo.RemoteAddress = client.RemoteAddr().String()

// require all fields
//验证数据完整性
if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
}

//修改peerInfo的lastUpdate为当前时间,不理解为什么这样做
//老版本是这样做的 peerInfo.lastUpdate = time.Now().UnixNano()
//atomic.StoreInt64() 赋予变量新值,而不管它原来是什么值。
atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

//将当前client注册到RegistrationDB里,Registration的Category是"client", Key和SubKey都为空
client.peerInfo = &peerInfo
if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf("DB: client(%s) REGIS
becd
TER category:%s key:%s subkey:%s", client, "client", "", "")
}

// build a response
//构建一个响应数据给client
data := make(map[string]interface{})
//nsqlookupd监听的TCP端口
data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
//nsqlookupd监听的HTTP端口
data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
//当前版本
data["version"] = version.Binary
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err)
}
//nsqlookupd的广播地址
data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
//hostname
data["hostname"] = hostname

//转化为json
response, err := json.Marshal(data)
if err != nil {
p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data)
return []byte("OK"), nil
}
return response, nil
}

func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
if client.peerInfo != nil {
// we could get a PING before other commands on the same client connection
cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
now := time.Now()
p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
now.Sub(cur))
//赋予变量新值,而不管它原来是什么值。
//函数原型:
//atomic.StoreUint32(addr *uint32, val uint32)
//atomic.StoreUint64(addr *uint64, val uint64)
//atomic.StoreInt32(addr *int32, val int32)
//atomic.StoreInt64(addr *int64, val int64)
//atomic.StoreUintptr(addr *uintptr, val uintptr)
//atomic.StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
}
return []byte("OK"), nil
}

REGISTER和UNREGISTER操作时,对于channel的新增和删除是否触发这俩操作,不是很清楚,先放一放。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  golang 源码 阅读