您的位置:首页 > 运维架构 > Docker

docker读取容器日志关键代码分析

2017-03-19 17:24 645 查看
通过api获取jsonfilelog格式的日志信息时,时不时的发现读取速度比较慢,于是试着去理解docker内部的实现原理,
Container logs的api的使用方法:
https://docs.docker.com/engine/api/v1.26/#operation/ContainerLogs
https://docs.docker.com/engine/admin/logging/overview/#options
Docker daemon会把容器日志所在路径下的所有日志文件拼接成一个ReadSeeker,然后从中按顺序读取解析。
由于日志文件都是一个是由一条一条的json格式的日志记录组成,每条日志都得先被解析,然后才能发给客户端;
当指定since参数的值时,会把每一条日志解析完之后与since参数做对比,只有满足条件才会发给客户端
但不指定tail参数时,默认是读取所有的日志;
当tail指定一个值时,如果这个值相对比较大时,比如100000, docker daemon的响应速度会比较慢,可以通过增大代码里的blockSize来提高响应速度。
blockSize的默认值是1024字节,自己曾经试着把这个值改成32K,发现增速效果明显,下面是自己的一组测试数据:
blockSize=1k
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 ab8e412c37eb5d > test.log 2>&1;date
Fri Mar 17 17:32:30 CST 2017
Fri Mar 17 17:34:23 CST 2017
 
 blockSize=10k
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 c76b9481028cdf0b > test.log 2>&1;date
Fri Mar 17 17:53:07 CST 2017
Fri Mar 17 17:53:19 CST 2017
 
 blockSize=32K
[root@kube-node80 ~]# date;docker logs -t=true --tail=100000 3e82f16be274443b > test.log 2>&1;date
Fri Mar 17 18:07:40 CST 2017
Fri Mar 17 18:07:45 CST 2017
 
  
 下面是关键代码的分析:
container/
container.go
func (container *Container) startLogging() error {
    if container.HostConfig.LogConfig.Type == "none" {
        return nil // do not start logging routines
    }
    l, err := container.StartLogger()
    if err != nil {
        return fmt.Errorf("failed to initialize logging driver: %v", err)
    }
    copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
    container.LogCopier = copier
    copier.Run()
    container.LogDriver = l
    // set LogPath field only for json-file logdriver
    if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
        container.LogPath = jl.LogPath()
    }
    return nil
}
 
 // initRoutes initializes the routes in container router
func (r *containerRouter) initRoutes() {
    r.routes = []router.Route{
        // HEAD
        router.NewHeadRoute("/containers/{name:.*}/archive", r.headContainersArchive),
        // GET
        router.NewGetRoute("/containers/json", r.getContainersJSON),
        router.NewGetRoute("/containers/{name:.*}/export", r.getContainersExport),
        router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
        router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
        router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
        router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs)),
        router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats)),
        router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
        router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
        router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
        // POST
        router.NewPostRoute("/containers/create", r.postContainersCreate),
        router.NewPostRoute("/containers/{name:.*}/kill", r.postContainersKill),
        router.NewPostRoute("/containers/{name:.*}/pause", r.postContainersPause),
        router.NewPostRoute("/containers/{name:.*}/unpause", r.postContainersUnpause),
        router.NewPostRoute("/containers/{name:.*}/restart", r.postContainersRestart),
        router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),
        router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),
        router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait),
        router.NewPostRoute("/containers/{name:.*}/resize", r.postContainersResize),
        router.NewPostRoute("/containers/{name:.*}/attach", r.postContainersAttach),
        router.NewPostRoute("/containers/{name:.*}/copy", r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12
        router.NewPostRoute(
12b6c
"/containers/{name:.*}/exec", r.postContainerExecCreate),
        router.NewPostRoute("/exec/{name:.*}/start", r.postContainerExecStart),
        router.NewPostRoute("/exec/{name:.*}/resize", r.postContainerExecResize),
        router.NewPostRoute("/containers/{name:.*}/rename", r.postContainerRename),
        router.NewPostRoute("/containers/{name:.*}/update", r.postContainerUpdate),
        router.NewPostRoute("/containers/prune", r.postContainersPrune),
        // PUT
        router.NewPutRoute("/containers/{name:.*}/archive", r.putContainersArchive),
        // DELETE
        router.NewDeleteRoute("/containers/{name:.*}", r.deleteContainers),
    }
}
api/
server/
router/
container/
container_routes.go
func (s *containerRouter) getContainersLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
    if err := httputils.ParseForm(r); err != nil {
        return err
    }
    // Args are validated before the stream starts because when it starts we're
    // sending HTTP 200 by writing an empty chunk of data to tell the client that
    // daemon is going to stream. By sending this initial HTTP 200 we can't report
    // any error after the stream starts (i.e. container not found, wrong parameters)
    // with the appropriate status code.
    stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr")
    if !(stdout || stderr) {
        return fmt.Errorf("Bad parameters: you must choose at least one stream")
    }
    containerName := vars["name"]
    logsConfig := &backend.ContainerLogsConfig{
        ContainerLogsOptions: types.ContainerLogsOptions{
            Follow: httputils.BoolValue(r, "follow"),
            Timestamps: httputils.BoolValue(r, "timestamps"),
            Since: r.Form.Get("since"),
            Tail: r.Form.Get("tail"),
            ShowStdout: stdout,
            ShowStderr: stderr,
            Details: httputils.BoolValue(r, "details"),
        },
        OutStream: w,
    }
    chStarted := make(chan struct{})
    if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil {
        select {
        case <-chStarted:
            // The client may be expecting all of the data we're sending to
            // be multiplexed, so mux it through the Systemerr stream, which
            // will cause the client to throw an error when demuxing
            stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr)
            fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err)
        default:
            return err
        }
    }
    return nil
}
 
  
 daemon/
logs.go
// ContainerLogs hooks up a container's stdout and stderr streams
// configured with the given struct.
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
    if !(config.ShowStdout || config.ShowStderr) {
        return errors.New("You must choose at least one stream")
    }
    container, err := daemon.GetContainer(containerName)
    if err != nil {
        return err
    }
………..
    logs := logReader.ReadLogs(readConfig)
    // Close logWatcher on exit
    defer func() {
        logs.Close()
        if cLog != container.LogDriver {
            // Since the logger isn't cached in the container, which
            // occurs if it is running, it must get explicitly closed
            // here to avoid leaking it and any file handles it has.
            if err := cLog.Close(); err != nil {
                logrus.Errorf("Error closing logger: %v", err)
            }
        }
    }()
}
logger/
jsonfilelog/
read.go
// ReadLogs implements the logger's LogReader interface for the logs
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
    logWatcher := logger.NewLogWatcher()
    go l.readLogs(logWatcher, config)
    return logWatcher
}
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
    defer close(logWatcher.Msg)
……..

followLogs()
}
 
 func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
    dec := json.NewDecoder(f)
    l := &jsonlog.JSONLog{}
    // main loop
    for {
        msg, err := decodeLogLine(dec, l)
        if err != nil {
            if err := handleDecodeErr(err); err != nil {
                if err == errDone {
                    return
                }
                // we got an unrecoverable error, so return
                logWatcher.Err <- err
                return
            }
            // ready to try again
            continue
        }
}
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: