您的位置:首页 > 运维架构 > Docker

Docker自发现注册服务regd研发

2016-05-16 12:55 676 查看

0. 前言

[技术源于艺术, 艺术源于生活]
1) 这是我第一次发布程序相关的技术文章, 10年前发表过很多关于3dsmax和maya的技术文章
2) 有人无端转载我的文章, 所以这里留一个我的联系方式, 欢迎讨论
邮箱: kekuer@gmail.com qq: 5513219

1. 背景

注册服务其实目前已经有很多项目都在做, 比如: https://github.com/gliderlabs/registrator, 为什么我这里还要自己做一套? 很简单, 太重, 太年轻, 太不适应我的需求

2. 准备

1) 首先需要了解Docker Remote API, 比较简单, REST+JSON, 可以浏览一下官方网站:https://docs.docker.com/engine/reference/api/docker_remote_api_v1.23/

2) etcd, 该服务生来就是为服务自发现而做的, 官方介绍说得很简单且清晰: consistent key-value store for shared configuration and service discovery, 也很简单, REST+JSON

可以通过官方API来了解: https://github.com/coreos/etcd/blob/master/Documentation/v2/api.md

本文假设你已经对以上两点有比较清晰的认知

3. 需求分析

要完成该服务, 我们需要分析几个事情:

1) 用什么语言来实现

由于docker和etcd这些应用都是golang写的, 因此我毅然的决定就用golang来做了, 没有多想, 读者可以考虑用其他任何语言来实现, 比如c, c++, node.js等

golang其实在该项目上有几个好处: a) 轻量, b) 内存消耗少, c) 开发速度快, d) 什么系统都可以跑

2) 需要哪些模块

全部都是golang的内构库

a) tcp, http (用于请求)

都是REST的, 为什么不全部使用http, 而要退到tcp呢?

因为docker API中的events接口是用的http的Chunked Encoding (Transfer-Encoding: chunked)

具体的可以自己看一下http的协议定义: https://www.w3.org/Protocols/rfc2616/rfc2616.txt 中的(3.6.1 Chunked Transfer Coding), 都很简单

4. 逻辑分析

1) 监听docker events

a) docker API 中 GET /events, 上面我们已经分析过, 这里是Transfer-Encoding: chunked的一个http协议

b) 由于我不知道golang是否有对chunked encoding支持 (看了源码没有找到, 如果有人知道的话, 提供一下指引, 谢谢), 因此这里只有用tcp来封一个

c) 根据我们业务需求, 我们只需要对docker的start 和disconnect 2个事件进行监听

为什么是start而不是connect? 因为connect做了以后, 业务不一定启动起来了

为什么是disconnect而不是stop? 因为业务需求的是网络自发现, 所以不能等待到stop, 只要网络断了, 必须马上通知相应业务

func monitorDockerEvent() {
log("info", "monitorevent", "connecting to docker API server", &logDetailStruct{
Server: dockerAPIAddr,
})
reqContent := "GET /events?filters={%22event%22:[%22start%22,%22disconnect%22]} HTTP/1.1\n\n"
tcpAddr, err := net.ResolveTCPAddr("tcp", dockerAPIAddr)
if err != nil {
catchErr()
log("error", "monitorevent", "resolve tcp address failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
return
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
catchErr()
log("error", "monitorevent", "dial tcp failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
return
}
defer conn.Close()

// write request header to server
_, err = conn.Write([]byte(reqContent))
if err != nil {
catchErr()
log("error", "monitorevent", "write to server failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
return
}

reply := make([]byte, 1024)
for {
_, err = conn.Read(reply)
if err != nil {
catchErr()
log("error", "monitorevent", "read from server failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
break
}

// header is received
if strings.HasPrefix(string(reply), "HTTP") {
log("info", "monitorevent", "docker API server is connected", &logDetailStruct{
Server: dockerAPIAddr,
})
continue
}

res := strings.Split(string(reply), "\n")
// chunk is received, first line is length
if len(res) > 1 {
body := res[1]

jsonMap, err := jsonutil.ParseJsonObject(body)
if err != nil {
catchErr()
log("error", "monitorevent", "body json decode failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
break
}

action, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Action")
if err != nil {
catchErr()
log("error", "monitorevent", "get container event action failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
break
}

if action == "disconnect" {
id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "Attributes", "container")
if err != nil {
catchErr()
log("error", "monitorevent", "get disconnect container id failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
break
}
deregisterContainer(id)
} else if action == "start" {
id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "ID")
if err != nil {
catchErr()
log("error", "monitorevent", "get start container id failed: "+err.Error(), &logDetailStruct{
Server: dockerAPIAddr,
})
break
}
if info, err := inspectContainer(id); err == nil {
registerContainer(id, info)
}
}
}
}
}

如上, 我自己封了一个超简单的log方法, 就是把对象转成json string (我比较喜欢json log, 后期会写一些日志收集分析的文章)

type logDetailStruct struct {
Server    string `json:"server,omitempty"`
Container string `json:"id,omitempty"`
IP        string `json:"ip,omitempty"`
}

type logStruct struct {
Level  string           `json:"level"`
Action string           `json:"action"`
Msg    string           `json:"msg"`
Detail *logDetailStruct `json:"detail,omitempty"`
}

func log(level, action, msg string, detail *logDetailStruct) {
log := &logStruct{
Level:  level,
Action: action,
Msg:    msg,
Detail: detail,
}
logString, _ := json.Marshal(log)
fmt.Printf("%s\n", logString)
}


2) 对docker events返回JSON进行分析

a) 这里其实就是json的parse(decode, unmarshal), 反正这个方法名字随便你怎么叫了, 就是从string搞成object的意思

b) 为了简单, 我这里抽象了几个json的方法来完成重复调用和获取对应路径的值

func ParseJsonObject(jsonString string) (map[string]interface{}, error) {
var jsonMap map[string]interface{}
decoder := json.NewDecoder(bytes.NewBuffer([]byte(jsonString)))
decoder.UseNumber()

err := decoder.Decode(&jsonMap)
if err != nil {
return nil, err
}

return jsonMap, nil
}

func GetJsonObjectValueMap(jsonMap *map[string]interface{}, key string) (interface{}, error) {
jsonObjectValueMap := (*jsonMap)[key]
if jsonObjectValueMap == nil {
return nil, errors.New("key is not exists")
}

return jsonObjectValueMap, nil
}

func GetJsonValue(jsonMap *map[string]interface{}, key string) (interface{}, error) {
value := (*jsonMap)[key]
if value == nil {
return "", errors.New("key is not exists")
}

return value, nil
}

func GetJsonStringValue(jsonMap *map[string]interface{}, key string) (string, error) {
value, err := GetJsonValue(jsonMap, key)
if err != nil {
return "", nil
}

ret, ok := value.(string)

if !ok {
return "", errors.New("value type is not string")
}

return ret, nil
}

func GetJsonValueViaPath(jsonMap *map[string]interface{}, keys ...string) (interface{}, error) {
jsonObjectValueMap := jsonMap
for index, key := range keys {
if index == len(keys)-1 {
break
}
jsonObjectMap, err := GetJsonObjectValueMap(jsonObjectValueMap, key)
if err != nil {
return nil, err
}
a := jsonObjectMap.(map[string]interface{})
jsonObjectValueMap = &a
}

if jsonObjectValueMap == nil {
return nil, errors.New("key is not exists")
}

return GetJsonValue(jsonObjectValueMap, keys[len(keys)-1])
}

func GetJsonStringValueViaPath(jsonMap *map[string]interface{}, keys ...string) (string, error) {
value, err := GetJsonValueViaPath(jsonMap, keys...)
if err != nil {
return "", err
}

ret, ok := value.(string)

if !ok {
return "", errors.New("value type is not string")
}

return ret, nil
}

c) 为了后期使用数据的方便, 我把host的ip和host的gateway放入了docker inspect返回的json中, 因此需要先获取ip和gateway

func GetGateway() (string, error) {
dat, err := ioutil.ReadFile("/proc/net/route")
if err != nil {
return "", err
}
routes := strings.Split(string(dat), "\n")
for index, route := range routes {
if index == 0 {
continue
}
fields := strings.Split(route, "\t")
if len(fields) > 3 {
gateway := fields[2]
if gateway != "00000000" {
ipSegs, _ := hex.DecodeString(gateway)
return fmt.Sprintf("%v.%v.%v.%v", ipSegs[3], ipSegs[2], ipSegs[1], ipSegs[0]), nil
}
}
}
return "", errors.New("can't get gateway")
}

func GetIps() ([]string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}

ips := []string{}
// handle err
for _, i := range ifaces {
// fmt.Println(ifaces)
addrs, err := i.Addrs()
if err != nil {
return nil, err
}
// handle err
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
ips = append(ips, ipnet.IP.String())
}
}
}
}

return ips, nil
}


3) 注册docker container信息到etcd服务

b) etcd API 中 PUT http://127.0.0.1:2379/v2/keys/xxxx, form body是需要set的key和value

etcd的地址根据你自己服务来定, 一般都是一个host一个etcd, 然后注册到cluster中去;

xxxx为key, 这里你可以根据自己业务来定义一个详细路径; 在这里我使用docker container id来作为key, 然后前面加上prefix, 例如: abcd.com/docker/container/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

http请求中, header肯定需要该条: Content-Type: application/x-www-form-urlencoded

b) golang是默认包含http put方法的, 因此我们可以偷懒直接用net/http模块来实现, 在这里我简单封装了一个etcd Set的方法

func Set(address, key, value string) error {
client := http.Client{
Timeout: time.Duration(5 * time.Second),
}
req, err := http.NewRequest("PUT",
fmt.Sprintf("%s/v2/keys/%s", address, key),
strings.NewReader("value="+value))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if err != nil {
return err
}
res, err := client.Do(req)
if err != nil {
return err
}

defer res.Body.Close()
content, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}

var jsonMap map[string]interface{}
err = json.Unmarshal(content, &jsonMap)
if err != nil {
return err
}

errorCode := jsonMap["errorCode"]
message := jsonMap["message"]
if errorCode != nil {
return fmt.Errorf("set failed: [%v] %v", errorCode, message)
}

return nil
}

c) 注册方法

func registerContainer(id, info string) error {
detailJsonMap, err := jsonutil.ParseJsonObject(info)
if err != nil {
catchErr()
log("error", "register", "unmarshal info failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return err
}

ip, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "IPAddress")
if err != nil {
catchErr()
log("error", "register", "get ip address failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return errors.New("network settings is missing in docker info")
}

gateway, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "Gateway")
if err != nil {
catchErr()
log("error", "register", "get gateway failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return errors.New("network settings is missing in docker info")
}

image, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "Config", "Image")
if err != nil {
catchErr()
log("error", "register", "get image failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return errors.New("network settings is missing in docker info")
}

newInfo, err := json.Marshal(struct {
HostGateway string
HostIP      []string
IP          string
Gateway     string
Image       string
Detail      map[string]interface{}
}{
HostGateway: _gateway,
HostIP:      _hostIP,
IP:          ip,
Gateway:     gateway,
Image:       image,
Detail:      detailJsonMap,
})

if err != nil {
catchErr()
log("error", "register", "add addon info failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return err
}

if err := etcd.Set(etcdAPIUrl, "gs.io/docker/containers/"+id, string(newInfo)); err != nil {
catchErr()
log("error", "register", "etcd set failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return err
}

log("info", "register", "register container success", &logDetailStruct{
Container: id,
IP:        ip,
})

return nil
}


4) 取消注册docker container

a) etcd API 中 DELETE http://127.0.0.1:2379/v2/keys/xxxx
b) golang是默认包含http delete方法的, 因此在这里我也简单封装了一个etcd Del的方法
func Del(address string, key string) error {
client := http.Client{
Timeout: time.Duration(5 * time.Second),
}
req, err := http.NewRequest("DELETE",
fmt.Sprintf("%s/v2/keys/%s", address, key),
nil)
if err != nil {
return err
}
res, err := client.Do(req)
if err != nil {
return err
}

defer res.Body.Close()
content, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}

var jsonMap map[string]interface{}
err = json.Unmarshal(content, &jsonMap)
if err != nil {
return err
}

errorCode := jsonMap["errorCode"]
message := jsonMap["message"]
if errorCode != nil {
return fmt.Errorf("delete failed: [%v] %v", errorCode, message)
}

return nil
}

c) 取消注册方法

func deregisterContainer(id string) error {
if err := etcd.Del(etcdAPIUrl, "gs.io/docker/containers/"+id); err != nil {
// Key not found is valid
if !strings.Contains(err.Error(), "100") {
catchErr()
log("error", "register", "etcd delete failed: "+err.Error(), &logDetailStruct{
Container: id,
})
return err
}
}

log("info", "deregister", "deregister container success", &logDetailStruct{
Container: id,
})

return nil
}


5. 源码

整理之后会发布源码到码云

6. 构造

1) 先做一个docker文件

FROM alpine:3.3
MAINTAINER Docker Containers Registrator Maintainers "kekuer@gmail.com"

ADD ./regd /usr/bin
RUN chmod +x /usr/bin/regd
ENTRYPOINT ["/usr/bin/regd"]


2) 做一个build.sh

这里使用了一个我做的go-build的镜像

FROM alpine:3.3
MAINTAINER Docker Golang build Maintainers "kekuer@gmail.com"

RUN apk --update add curl git mercurial bzr go && rm -rf /var/cache/apk/*

WORKDIR "/app"

ENV GOROOT /usr/lib/go
ENV GOPATH /gopath
ENV GOBIN /gopath/bin
ENV PATH $PATH:$GOROOT/bin:$GOPATH/bin

通过shell构建注册镜像

#!/bin/bash

VER=1.1

docker run --rm -v $(pwd):/app funwun.io/go-build:1.0  go build regd.go
docker build -t funwun.io/regd:${VER} -f ${VER}.Dockerfile .


7. 结语

此项目, 我们牵涉到了几个点, 1) docker api, 2) etcd, 3) golang, 4) http client, 当然都是基础下的基础

还有就是提供一个bash的方案, 比较轻, 功能也比较弱, 但是基本能完成:

#!/bin/bash

/usr/bin/docker events -f event=disconnect -f event=connect |
while IFS= read -r line
do
ID=$(echo $line | grep -o "container=[0-9a-f]\{64\}" | sed -e "s/container=//g")
REMOVE=false
if [[ $line == *"disconnect"* ]]; then
REMOVE=true
fi
if [ ! -z "$ID" -a "$ID" != " " ]; then
ETCD_KEY=funwun.io/docker/containers/$ID
if [ $REMOVE = true ]; then
RET=$(etcdctl rm $ETCD_KEY)
if [[ $RET == "Error*" ]]; then
echo $RET
else
echo "[DOWN] /funwun.io/docker/containers/$ID"
fi
else
GATEWAY=$(route -n | grep '^0\.0\.0\.0' | awk '{print $2}')
INFO=$(docker inspect $ID)
RET=$(etcdctl set $ETCD_KEY "{\"Info\": ${INFO}, \"Gateway\": \"$GATEWAY\"}")
if [[ $RET == "Error*" ]]; then
echo $RET
else
echo "[UP  ] /funwun.io/docker/containers/$ID"
fi
fi
fi
done
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: