您的位置:首页 > 理论基础 > 计算机网络

nsq源码阅读 nsqlookupd源码五 http.go http_server.go

2017-05-16 14:41 746 查看

httpServer := newHTTPServer(ctx)


package nsqlookupd

import (


type httpServer struct {
ctx *Context
router http.Handler

func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.opts.Logger)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.opts.Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.opts.Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.opts.Logger)
s := &httpServer{
ctx: ctx,
router: router,

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

// v1 negotiate
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))

// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))

// debug
router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

return s

func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.router.ServeHTTP(w, req)

//GET /ping
func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
return "OK", nil

//GET /info
func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {

* type version struct {
* Version string `json:"version"`
* }
* v := &version {
* Version: version.Binary,
* }
* return v, nil
return struct {
Version string `json:"version"`
Version: version.Binary,
}, nil

//GET /topics
func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
topics := s.ctx.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
return map[string]interface{}{
"topics": topics,
}, nil

//GET /channels
//@param topic - the topic to list channels for
func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}

channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
return map[string]interface{}{
"channels": channels,
}, nil

//GET /lookup
//@param topic - the topic to list producers for
func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}

registration := s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
if len(registration) == 0 {
return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}

channels := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
producers = producers.FilterByActive(s.ctx.nsqlookupd.opts.InactiveProducerTimeout,
return map[string]interface{}{
"channels": channels,
"producers": producers.PeerInfo(),
}, nil

//POST /topic/create
//@param topic - the topic to created
func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}

if !protocol.IsValidTopicName(topicName) {
return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}

s.ctx.nsqlookupd.logf("DB: adding topic(%s)", topicName)
key := Registration{"topic", topicName, ""}

return nil, nil

//POST /topic/delete
//@param topic - the existing topic to delete
func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}

registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
for _, registration := range registrations {
s.ctx.nsqlookupd.logf("DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)

registrations = s.ctx.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
for _, registration := range registrations {
s.ctx.nsqlookupd.logf("DB: removing topic(%s)", topicName)

return nil, nil

//POST /topic/tombstone
//官方解释Tombstones a specific producer of an existing topic
//@param topic - the existing topic
//@param node - the producer (nsqd) to tombstone (identified by <broadcast_address>:<http_port>)
func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{
}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, err := reqParams.Get("topic")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}

//node格式 <broadcast_address>:<http_port>
node, err := reqParams.Get("node")
if err != nil {
return nil, http_api.Err{400, "MISSING_ARG_NODE"}

s.ctx.nsqlookupd.logf("DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
producers := s.ctx.nsqlookupd.DB.FindProducers("topic", topicName, "")
for _, p := range producers {
thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
if thisNode == node {

return nil, nil

//POST /channel/create
//@param topic - the topic to created
//@param channel - the channel to created
func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
return nil, http_api.Err{400, err.Error()}

s.ctx.nsqlookupd.logf("DB: adding channel(%s) in topic(%s)", channelName, topicName)
key := Registration{"channel", topicName, channelName}

s.ctx.nsqlookupd.logf("DB: adding topic(%s)", topicName)
key = Registration{"topic", topicName, ""}

return nil, nil

//POST /channel/delete
//@param topic - the existing topic
//@param channel - the existing channel to delete
func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
reqParams, err := http_api.NewReqParams(req)
if err != nil {
return nil, http_api.Err{400, "INVALID_REQUEST"}

topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
return nil, http_api.Err{400, err.Error()}

registrations := s.ctx.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
if len(registrations) == 0 {
return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}

s.ctx.nsqlookupd.logf("DB: removing channel(%s) from topic(%s)", channelName, topicName)
for _, registration := range registrations {

return nil, nil

//定义节点信息(producer (nsqd))
type node struct {
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
Tombstones []bool `json:"tombstones"`
Topics []string `json:"topics"`

//GET /nodes
func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
// don't filter out tombstoned nodes
producers := s.ctx.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
s.ctx.nsqlookupd.opts.InactiveProducerTimeout, 0)
nodes := make([]*node, len(producers))
for i, p := range producers {
topics := s.ctx.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()

// for each topic find the producer that matches this peer
// to add tombstone information
tombstones := make([]bool, len(topics))
for j, t := range topics {
topicProducers := s.ctx.nsqlookupd.DB.FindProducers("topic", t, "")
for _, tp := range topicProducers {
if tp.peerInfo == p.peerInfo {
tombstones[j] = tp.IsTombstoned(s.ctx.nsqlookupd.opts.TombstoneLifetime)

nodes[i] = &node{
RemoteAddress: p.peerInfo.RemoteAddress,
Hostname: p.peerInfo.Hostname,
BroadcastAddress: p.peerInfo.BroadcastAddress,
TCPPort: p.peerInfo.TCPPort,
HTTPPort: p.peerInfo.HTTPPort,
Version: p.peerInfo.Version,
Tombstones: tombstones,
Topics: topics,

return map[string]interface{}{
"producers": nodes,
}, nil

//GET /debug
func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
defer s.ctx.nsqlookupd.DB.RUnlock()

data := make(map[string][]map[string]interface{})
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
key := r.Category + ":" + r.Key + ":" + r.SubKey
for _, p := range producers {
m := map[string]interface{}{
"id": p.peerInfo.id,
"hostname": p.peerInfo.Hostname,
"broadcast_address": p.peerInfo.BroadcastAddress,
"tcp_port": p.peerInfo.TCPPort,
"http_port": p.peerInfo.HTTPPort,
"version": p.peerInfo.Version,
"last_update": atomic.LoadInt64(&p.peerInfo.lastUpdate),
"tombstoned": p.tombstoned,
"tombstoned_at": p.tombstonedAt.UnixNano(),
data[key] = append(data[key], m)

return data, nil
http.go定义了一系列http handle


http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)

创建HTTP server,代码位于internal/http_api/http_server.go里:

package http_api

import (


type logWriter struct {

func (l logWriter) Write(p []byte) (int, error) {
l.Logger.Output(2, string(p))
return len(p), nil

//创建HTTP Serve,并注册handler,handler在nsqlookupd/http.go中定义
func Serve(listener net.Listener, handler http.Handler, proto string, l app.Logger) {
l.Output(2, fmt.Sprintf("%s: listening on %s", proto, listener.Addr()))

server := &http.Server{
Handler: handler,
ErrorLog: log.New(logWriter{l}, "", 0),
err := server.Serve(listener)
// theres no direct way to detect this error because it is not exposed
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: http.Serve() - %s", err))

l.Output(2, fmt.Sprintf("%s: closing %s", proto, listener.Addr()))

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  golang 源码 阅读