Docker自发现注册服务regd研发
2016-05-16 12:55
676 查看
0. 前言
[技术源于艺术, 艺术源于生活]1) 这是我第一次发布程序相关的技术文章, 10年前发表过很多关于3dsmax和maya的技术文章
2) 有人无端转载我的文章, 所以这里留一个我的联系方式, 欢迎讨论
邮箱: kekuer@gmail.com qq: 5513219
1. 背景
注册服务其实目前已经有很多项目都在做, 比如: https://github.com/gliderlabs/registrator, 为什么我这里还要自己做一套? 很简单, 太重, 太年轻, 太不适应我的需求2. 准备
1) 首先需要了解Docker Remote API, 比较简单, REST+JSON, 可以浏览一下官方网站:https://docs.docker.com/engine/reference/api/docker_remote_api_v1.23/2) etcd, 该服务生来就是为服务自发现而做的, 官方介绍说得很简单且清晰: consistent key-value store for shared configuration and service discovery, 也很简单, REST+JSON
可以通过官方API来了解: https://github.com/coreos/etcd/blob/master/Documentation/v2/api.md
本文假设你已经对以上两点有比较清晰的认知
3. 需求分析
要完成该服务, 我们需要分析几个事情:1) 用什么语言来实现
由于docker和etcd这些应用都是golang写的, 因此我毅然的决定就用golang来做了, 没有多想, 读者可以考虑用其他任何语言来实现, 比如c, c++, node.js等golang其实在该项目上有几个好处: a) 轻量, b) 内存消耗少, c) 开发速度快, d) 什么系统都可以跑
2) 需要哪些模块
全部都是golang的内构库a) tcp, http (用于请求)
都是REST的, 为什么不全部使用http, 而要退到tcp呢?
因为docker API中的events接口是用的http的Chunked Encoding (Transfer-Encoding: chunked)
具体的可以自己看一下http的协议定义: https://www.w3.org/Protocols/rfc2616/rfc2616.txt 中的(3.6.1 Chunked Transfer Coding), 都很简单
4. 逻辑分析
1) 监听docker events
a) docker API 中 GET /events, 上面我们已经分析过, 这里是Transfer-Encoding: chunked的一个http协议b) 由于我不知道golang是否有对chunked encoding支持 (看了源码没有找到, 如果有人知道的话, 提供一下指引, 谢谢), 因此这里只有用tcp来封一个
c) 根据我们业务需求, 我们只需要对docker的start 和disconnect 2个事件进行监听
为什么是start而不是connect? 因为connect做了以后, 业务不一定启动起来了
为什么是disconnect而不是stop? 因为业务需求的是网络自发现, 所以不能等待到stop, 只要网络断了, 必须马上通知相应业务
func monitorDockerEvent() { log("info", "monitorevent", "connecting to docker API server", &logDetailStruct{ Server: dockerAPIAddr, }) reqContent := "GET /events?filters={%22event%22:[%22start%22,%22disconnect%22]} HTTP/1.1\n\n" tcpAddr, err := net.ResolveTCPAddr("tcp", dockerAPIAddr) if err != nil { catchErr() log("error", "monitorevent", "resolve tcp address failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) return } conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { catchErr() log("error", "monitorevent", "dial tcp failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) return } defer conn.Close() // write request header to server _, err = conn.Write([]byte(reqContent)) if err != nil { catchErr() log("error", "monitorevent", "write to server failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) return } reply := make([]byte, 1024) for { _, err = conn.Read(reply) if err != nil { catchErr() log("error", "monitorevent", "read from server failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) break } // header is received if strings.HasPrefix(string(reply), "HTTP") { log("info", "monitorevent", "docker API server is connected", &logDetailStruct{ Server: dockerAPIAddr, }) continue } res := strings.Split(string(reply), "\n") // chunk is received, first line is length if len(res) > 1 { body := res[1] jsonMap, err := jsonutil.ParseJsonObject(body) if err != nil { catchErr() log("error", "monitorevent", "body json decode failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) break } action, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Action") if err != nil { catchErr() log("error", "monitorevent", "get container event action failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) break } if action == "disconnect" { id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "Attributes", "container") if err != nil { catchErr() log("error", "monitorevent", "get disconnect container id failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) break } deregisterContainer(id) } else if action == "start" { id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "ID") if err != nil { catchErr() log("error", "monitorevent", "get start container id failed: "+err.Error(), &logDetailStruct{ Server: dockerAPIAddr, }) break } if info, err := inspectContainer(id); err == nil { registerContainer(id, info) } } } } }
如上, 我自己封了一个超简单的log方法, 就是把对象转成json string (我比较喜欢json log, 后期会写一些日志收集分析的文章)
type logDetailStruct struct { Server string `json:"server,omitempty"` Container string `json:"id,omitempty"` IP string `json:"ip,omitempty"` } type logStruct struct { Level string `json:"level"` Action string `json:"action"` Msg string `json:"msg"` Detail *logDetailStruct `json:"detail,omitempty"` } func log(level, action, msg string, detail *logDetailStruct) { log := &logStruct{ Level: level, Action: action, Msg: msg, Detail: detail, } logString, _ := json.Marshal(log) fmt.Printf("%s\n", logString) }
2) 对docker events返回JSON进行分析
a) 这里其实就是json的parse(decode, unmarshal), 反正这个方法名字随便你怎么叫了, 就是从string搞成object的意思b) 为了简单, 我这里抽象了几个json的方法来完成重复调用和获取对应路径的值
func ParseJsonObject(jsonString string) (map[string]interface{}, error) { var jsonMap map[string]interface{} decoder := json.NewDecoder(bytes.NewBuffer([]byte(jsonString))) decoder.UseNumber() err := decoder.Decode(&jsonMap) if err != nil { return nil, err } return jsonMap, nil } func GetJsonObjectValueMap(jsonMap *map[string]interface{}, key string) (interface{}, error) { jsonObjectValueMap := (*jsonMap)[key] if jsonObjectValueMap == nil { return nil, errors.New("key is not exists") } return jsonObjectValueMap, nil } func GetJsonValue(jsonMap *map[string]interface{}, key string) (interface{}, error) { value := (*jsonMap)[key] if value == nil { return "", errors.New("key is not exists") } return value, nil } func GetJsonStringValue(jsonMap *map[string]interface{}, key string) (string, error) { value, err := GetJsonValue(jsonMap, key) if err != nil { return "", nil } ret, ok := value.(string) if !ok { return "", errors.New("value type is not string") } return ret, nil } func GetJsonValueViaPath(jsonMap *map[string]interface{}, keys ...string) (interface{}, error) { jsonObjectValueMap := jsonMap for index, key := range keys { if index == len(keys)-1 { break } jsonObjectMap, err := GetJsonObjectValueMap(jsonObjectValueMap, key) if err != nil { return nil, err } a := jsonObjectMap.(map[string]interface{}) jsonObjectValueMap = &a } if jsonObjectValueMap == nil { return nil, errors.New("key is not exists") } return GetJsonValue(jsonObjectValueMap, keys[len(keys)-1]) } func GetJsonStringValueViaPath(jsonMap *map[string]interface{}, keys ...string) (string, error) { value, err := GetJsonValueViaPath(jsonMap, keys...) if err != nil { return "", err } ret, ok := value.(string) if !ok { return "", errors.New("value type is not string") } return ret, nil }
c) 为了后期使用数据的方便, 我把host的ip和host的gateway放入了docker inspect返回的json中, 因此需要先获取ip和gateway
func GetGateway() (string, error) { dat, err := ioutil.ReadFile("/proc/net/route") if err != nil { return "", err } routes := strings.Split(string(dat), "\n") for index, route := range routes { if index == 0 { continue } fields := strings.Split(route, "\t") if len(fields) > 3 { gateway := fields[2] if gateway != "00000000" { ipSegs, _ := hex.DecodeString(gateway) return fmt.Sprintf("%v.%v.%v.%v", ipSegs[3], ipSegs[2], ipSegs[1], ipSegs[0]), nil } } } return "", errors.New("can't get gateway") } func GetIps() ([]string, error) { ifaces, err := net.Interfaces() if err != nil { return nil, err } ips := []string{} // handle err for _, i := range ifaces { // fmt.Println(ifaces) addrs, err := i.Addrs() if err != nil { return nil, err } // handle err for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { ips = append(ips, ipnet.IP.String()) } } } } return ips, nil }
3) 注册docker container信息到etcd服务
b) etcd API 中 PUT http://127.0.0.1:2379/v2/keys/xxxx, form body是需要set的key和valueetcd的地址根据你自己服务来定, 一般都是一个host一个etcd, 然后注册到cluster中去;
xxxx为key, 这里你可以根据自己业务来定义一个详细路径; 在这里我使用docker container id来作为key, 然后前面加上prefix, 例如: abcd.com/docker/container/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
http请求中, header肯定需要该条: Content-Type: application/x-www-form-urlencoded
b) golang是默认包含http put方法的, 因此我们可以偷懒直接用net/http模块来实现, 在这里我简单封装了一个etcd Set的方法
func Set(address, key, value string) error { client := http.Client{ Timeout: time.Duration(5 * time.Second), } req, err := http.NewRequest("PUT", fmt.Sprintf("%s/v2/keys/%s", address, key), strings.NewReader("value="+value)) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") if err != nil { return err } res, err := client.Do(req) if err != nil { return err } defer res.Body.Close() content, err := ioutil.ReadAll(res.Body) if err != nil { return err } var jsonMap map[string]interface{} err = json.Unmarshal(content, &jsonMap) if err != nil { return err } errorCode := jsonMap["errorCode"] message := jsonMap["message"] if errorCode != nil { return fmt.Errorf("set failed: [%v] %v", errorCode, message) } return nil }
c) 注册方法
func registerContainer(id, info string) error { detailJsonMap, err := jsonutil.ParseJsonObject(info) if err != nil { catchErr() log("error", "register", "unmarshal info failed: "+err.Error(), &logDetailStruct{ Container: id, }) return err } ip, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "IPAddress") if err != nil { catchErr() log("error", "register", "get ip address failed: "+err.Error(), &logDetailStruct{ Container: id, }) return errors.New("network settings is missing in docker info") } gateway, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "Gateway") if err != nil { catchErr() log("error", "register", "get gateway failed: "+err.Error(), &logDetailStruct{ Container: id, }) return errors.New("network settings is missing in docker info") } image, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "Config", "Image") if err != nil { catchErr() log("error", "register", "get image failed: "+err.Error(), &logDetailStruct{ Container: id, }) return errors.New("network settings is missing in docker info") } newInfo, err := json.Marshal(struct { HostGateway string HostIP []string IP string Gateway string Image string Detail map[string]interface{} }{ HostGateway: _gateway, HostIP: _hostIP, IP: ip, Gateway: gateway, Image: image, Detail: detailJsonMap, }) if err != nil { catchErr() log("error", "register", "add addon info failed: "+err.Error(), &logDetailStruct{ Container: id, }) return err } if err := etcd.Set(etcdAPIUrl, "gs.io/docker/containers/"+id, string(newInfo)); err != nil { catchErr() log("error", "register", "etcd set failed: "+err.Error(), &logDetailStruct{ Container: id, }) return err } log("info", "register", "register container success", &logDetailStruct{ Container: id, IP: ip, }) return nil }
4) 取消注册docker container
a) etcd API 中 DELETE http://127.0.0.1:2379/v2/keys/xxxxb) golang是默认包含http delete方法的, 因此在这里我也简单封装了一个etcd Del的方法
func Del(address string, key string) error { client := http.Client{ Timeout: time.Duration(5 * time.Second), } req, err := http.NewRequest("DELETE", fmt.Sprintf("%s/v2/keys/%s", address, key), nil) if err != nil { return err } res, err := client.Do(req) if err != nil { return err } defer res.Body.Close() content, err := ioutil.ReadAll(res.Body) if err != nil { return err } var jsonMap map[string]interface{} err = json.Unmarshal(content, &jsonMap) if err != nil { return err } errorCode := jsonMap["errorCode"] message := jsonMap["message"] if errorCode != nil { return fmt.Errorf("delete failed: [%v] %v", errorCode, message) } return nil }
c) 取消注册方法
func deregisterContainer(id string) error { if err := etcd.Del(etcdAPIUrl, "gs.io/docker/containers/"+id); err != nil { // Key not found is valid if !strings.Contains(err.Error(), "100") { catchErr() log("error", "register", "etcd delete failed: "+err.Error(), &logDetailStruct{ Container: id, }) return err } } log("info", "deregister", "deregister container success", &logDetailStruct{ Container: id, }) return nil }
5. 源码
整理之后会发布源码到码云6. 构造
1) 先做一个docker文件
FROM alpine:3.3 MAINTAINER Docker Containers Registrator Maintainers "kekuer@gmail.com" ADD ./regd /usr/bin RUN chmod +x /usr/bin/regd ENTRYPOINT ["/usr/bin/regd"]
2) 做一个build.sh
这里使用了一个我做的go-build的镜像FROM alpine:3.3 MAINTAINER Docker Golang build Maintainers "kekuer@gmail.com" RUN apk --update add curl git mercurial bzr go && rm -rf /var/cache/apk/* WORKDIR "/app" ENV GOROOT /usr/lib/go ENV GOPATH /gopath ENV GOBIN /gopath/bin ENV PATH $PATH:$GOROOT/bin:$GOPATH/bin
通过shell构建注册镜像
#!/bin/bash VER=1.1 docker run --rm -v $(pwd):/app funwun.io/go-build:1.0 go build regd.go docker build -t funwun.io/regd:${VER} -f ${VER}.Dockerfile .
7. 结语
此项目, 我们牵涉到了几个点, 1) docker api, 2) etcd, 3) golang, 4) http client, 当然都是基础下的基础还有就是提供一个bash的方案, 比较轻, 功能也比较弱, 但是基本能完成:
#!/bin/bash /usr/bin/docker events -f event=disconnect -f event=connect | while IFS= read -r line do ID=$(echo $line | grep -o "container=[0-9a-f]\{64\}" | sed -e "s/container=//g") REMOVE=false if [[ $line == *"disconnect"* ]]; then REMOVE=true fi if [ ! -z "$ID" -a "$ID" != " " ]; then ETCD_KEY=funwun.io/docker/containers/$ID if [ $REMOVE = true ]; then RET=$(etcdctl rm $ETCD_KEY) if [[ $RET == "Error*" ]]; then echo $RET else echo "[DOWN] /funwun.io/docker/containers/$ID" fi else GATEWAY=$(route -n | grep '^0\.0\.0\.0' | awk '{print $2}') INFO=$(docker inspect $ID) RET=$(etcdctl set $ETCD_KEY "{\"Info\": ${INFO}, \"Gateway\": \"$GATEWAY\"}") if [[ $RET == "Error*" ]]; then echo $RET else echo "[UP ] /funwun.io/docker/containers/$ID" fi fi fi done
相关文章推荐
- centos7 docker使用https_proxy 代理配置
- 配置Docker beta for Mac访问私有镜像仓库
- centos7安装docker最新版
- 基于Docker的分布式服务研发实践
- 有容云——窥探Docker中的Volume Plugin内幕
- (OK) scratch/manet-docker.cc
- docker toolbox 安装哪些坑坑洼洼
- 搭建一个私有registry服务(基础版本)
- Docker的另外15个命令
- Docker 安全
- Dockerfile文件指令
- Docker学习系列(五):Dockerfile文件
- Docker学习系列(四):Docker下安装Jupyter
- Docker学习一(安装docker并构建一个可ssh登录的镜像)
- Docker学习系列(三):Ubuntu下使用Docker的基本指令记录及一些注意事项
- Docker学习系列(二):Ubuntu下安装Docker
- docker命令总结
- docker 概述
- CentOS7上安装和使用Docker
- Docker中一些常用的命令