nsq源码分析(2):nsqlookup之tcp服务
2017-06-07 16:13
281 查看
nsq源码分析(2):nsqlookup之tcp服务
本章涉及db的读写操作,请参考 nsqlookup之RegistrationDB数据库通信协议
本章内容涉及tcp协议的封包解包内容,请参考 nsq tcp协议规范[ ][ ][V][1]
client连接后,客户端必须发送一个4 字节的 “magic” 标识码来选择通讯协议的版本。
启动tcp服务
nsqlookupd/nsqlookupd.go// 启动tcp服务 tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} // 封装的waitGroup,内部使用goroutine启动该服务,使用waitGroup守护改协程直到退出 l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger) })
接收client连接并回调handle方法
internal/protocol/tcp_server.gotype TCPHandler interface { Handle(net.Conn) } func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) { l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr())) // 接收client连接并回调handle方法 for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err)) // runtime.Gosched让出cpu时间片,让另一时间片处理 runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err)) } break } // 这里的handle方法虽然是TCPHandler的接口类型,实际回调的是nsqlookupd/tcp.go中Handle方法 go handler.Handle(clientConn) } l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr())) }
解析协议版本和命令
解析协议版本nsqlookupd/tcp.go
// 回调handle:解析协议版本 func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqlookupd.logf("TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... // 读取tcp流协议的前四个字节,用于选择使用哪种协议 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqlookupd.logf("ERROR: failed to read protocol version - %s", err) return } protocolMagic := string(buf) p.ctx.nsqlookupd.logf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) // 作者为了nsq的可扩展性,可支持多种协议,目前只支持V1版本 var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // 其他协议则发送E_BAD_PROTOCOL并关闭连接并 protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqlookupd.logf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } // 使用对应的协议处理client请求 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqlookupd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err) return } }
解析命令
nsqlookupd/lookup_protocol_v1.go
// IOLoop中不断的for循环等待用户的消息 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string // 实例化V1协议 client := NewClientV1(conn) reader := bufio.NewReader(client) for { // 每次读取一行 line, err = reader.ReadString('\n') if err != nil { break } // 解析出数据中的params line = strings.TrimSpace(line) params := strings.Split(line, " ") var response []byte response, err = p.Exec(client, reader, params) if err != nil { // Exec过程中出现err则将err发送给client ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx) _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection // 如果err为Fatal类型的错误则break后强制关闭该连接 if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } // 将Exec结果返回给client if response != nil { _, err = protocol.SendResponse(client, response) if err != nil { break } } } // for循环退出后则关闭client连接,并从db中删除client信息 conn.Close() p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err }
处理client指令
nsqlookupd/lookup_protocol_v1.gov1协议支持如下命令
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": return p.PING(client, params) case "IDENTIFY": return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": return p.UNREGISTER(client, reader, params[1:]) } // 非以上命令则返回E_INVALID错误 return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
PING命令
// PING:nsqd每隔一段时间都会向nsqlookupd发送心跳 // nsqlookup接收到心跳,原子操作更新nsqd的lastUpdate字段并返回OK func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) { if client.peerInfo != nil { // we could get a PING before other commands on the same client connection cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate)) now := time.Now() p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, now.Sub(cur)) atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) } return []byte("OK"), nil }
IDENTITY命令
// IDENTITY: 当nsqd第一次连接nsqlookupd时,发送IDENTITY验证自己身份 // nsqd将自身的相关信息(ip、hostname等)提交给nsqlookup写入到RegistrationDB func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } body := make([]byte, bodyLen) _, err = io.ReadFull(reader, body) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information peerInfo := PeerInfo{id: client.RemoteAddr().String()} err = json.Unmarshal(body, &peerInfo) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } peerInfo.RemoteAddress = client.RemoteAddr().String() // require all fields if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") } atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version) client.peerInfo = &peerInfo if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // build a response data := make(map[string]interface{}) data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port data["version"] = version.Binary hostname, err := os.Hostname() if err != nil { log.Fatalf("ERROR: unable to get hostname %s", err) } data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress data["hostname"] = hostname response, err := json.Marshal(data) if err != nil { p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data) return []byte("OK"), nil } return response, nil }
REGISTER命令
// REGISTER:当nsqd创建一个topic或者channel时,向nsqlookupd发送REGISTER请求 // nsqlookupd更新当前nsqd的topic或者channel信息 func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } // 获取topci和channel topic, channel, err := getTopicChan("REGISTER", params) if err != nil { return nil, err } // 更新channel信息 if channel != "" { key := Registration{"channel", topic, channel} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } // 更新topic信息 key := Registration{"topic", topic, ""} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } return []byte("OK"), nil }
UNREGISTER命令
// UNREGISTER:当nsqd删除一个topic或者channel时,向nsqlookupd发送UNREGISTER请求 // nsqlookupd更新当前nsqd的topic或者channel信息 func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } // 获取topci和channel topic, channel, err := getTopicChan("UNREGISTER", params) if err != nil { return nil, err } if channel != "" { // 删除channel信息 key := Registration{"channel", topic, channel} removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers // 如果存在临时channel同样删除 // channel名称前缀带#ephemeral则表示临时channel if left == 0 && strings.HasSuffix(channel, "#ephemeral") { p.ctx.nsqlookupd.DB.RemoveRegistration(key) } } else { // 删除topic,并删除topic下的所有channel // no channel was specified so this is a topic unregistration // remove all of the channel registrations... // normally this shouldn't happen which is why we print a warning message // if anything is actually removed // 查找出topci下的所有channel并删除 registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } // 最后删除topic key := Registration{"topic", topic, ""} if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } } return []byte("OK"), nil }
相关文章推荐
- nsq源码分析(2):nsqlookup之http服务
- nsq源码分析(2):nsqlookup之RegistrationDB数据库
- 应用框架的设计与实现——.NET平台(7.事件通知服务.源码分析)
- Tcpreplay3.x的安装、使用和源码分析
- Glusterfs之rpc模块源码分析(下)之RDMA over TCP的协议栈工作过程浅析
- 【原创】OpenStack Swift源码分析(六)object服务
- android包管理服务(PackageManagerService)源码分析
- Nmap源码分析(服务与版本扫描)
- 微软AJax.net源码初步分析(2)--服务执行流程
- Glusterfs之rpc模块源码分析(下)之RDMA over TCP的协议栈工作过程浅析
- Glusterfs之rpc模块源码分析(下)之RDMA over TCP的协议栈工作过程浅析
- 【原创】OpenStack Swift源码分析(三)proxy服务启动
- 消息中间件 activeMQ的源码分析 之 TCP通讯机制
- LwIP(V1.0.0) RAW API函数源码分析1----tcp_new()函数
- lwIP(V1.3.0) RAW API函数源码分析4----tcp_accept()函数
- Linux TCP/IP 协议栈源码分析
- 应用框架的设计与实现——.NET平台(10 授权服务.源码分析)
- lwIP(V1.3.0) RAW API函数源码分析3----tcp_listen()函数
- lwIP(V1.0.0) RAW API函数源码分析2----tcp_bind()函数
- Linux TCP/IP 协议栈源码分析