您的位置:首页 > 运维架构 > Docker

Mesos源码分析(14): DockerContainerier运行一个Task

2016-08-06 22:33 357 查看
DockerContainerizer的实现在文件src/slave/containerizer/docker.cpp中

 

Future<bool> DockerContainerizer::launch(

    const ContainerID& containerId,

    const ExecutorInfo& executorInfo,

    const string& directory,

    const Option<string>&
user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)

{

  return dispatch(

      process.get(),

      &DockerContainerizerProcess::launch,

      containerId,

      None(),

      executorInfo,

      directory,

      user,

      slaveId,

      slavePid,

      checkpoint);

}

 

转而调用DockerContainerizerProcess::launch,无论是TaskInfo里面有ContainerInfo,还是ExecutorInfo里面有ContainerInfo,都由这个函数处理,只不过分支不同。

Future<bool> DockerContainerizerProcess::launch(

    const ContainerID& containerId,

    const Option<TaskInfo>& taskInfo,

    const ExecutorInfo& executorInfo,

    const string& directory,

    const Option<string>& user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)

{

  Option<ContainerInfo> containerInfo;

 

  if (taskInfo.isSome() && taskInfo.get().has_container())
{

    containerInfo = taskInfo.get().container();

  } else if (executorInfo.has_container())
{

    containerInfo = executorInfo.container();

  }

 

  if (containerInfo.isNone()) {

    LOG(INFO) << "No container info found, skipping launch";

    return false;

  }

 

  if (containerInfo.get().type() != ContainerInfo::DOCKER)
{

    LOG(INFO) << "Skipping non-docker container";

    return false;

  }

 

  Try<Container*> container = Container::create(

      containerId,

      taskInfo,

      executorInfo,

      directory,

      user,

      slaveId,

      slavePid,

      checkpoint,

      flags);

 

  if (container.isError()) {

    return Failure("Failed to
create container: " + container.error());

  }

 

  containers_[containerId] = container.get();

 

  if (taskInfo.isSome()) {

    LOG(INFO) << "Starting container '" << containerId

              << "' for task '" << taskInfo.get().task_id()

              << "' (and executor '" << executorInfo.executor_id()

              << "') of framework '" << executorInfo.framework_id() << "'";

  } else {

    LOG(INFO) << "Starting container '" << containerId

              << "' for executor '" << executorInfo.executor_id()

              << "' and framework '" << executorInfo.framework_id() << "'";

  }

 

  if (HookManager::hooksAvailable()) {

    HookManager::slavePreLaunchDockerHook(

        container.get()->container,

        container.get()->command,

        taskInfo,

        executorInfo,

        container.get()->name(),

        container.get()->directory,

        flags.sandbox_directory,

        container.get()->resources,

        container.get()->environment);

  }

 

  if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) {

    // Launching task by forking a subprocess to run docker executor.

    return container.get()->launch = fetch(containerId,
slaveId)

      .then(defer(self(), [=]() { return pull(containerId); }))

      .then(defer(self(), [=]() {

        return mountPersistentVolumes(containerId);

      }))

      .then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))

      .then(defer(self(), [=](pid_t pid) {

        return reapExecutor(containerId, pid);

      }));

  }

 

  string containerName = container.get()->name();

 

  if (container.get()->executorName().isSome())
{

    // Launch the container with the executor name as we expect the

    // executor will launch the docker container.

    containerName = container.get()->executorName().get();

  }

 

  // Launching task or executor by launching a seperate docker

  // container to run the executor.

  // We need to do so for launching a task because as the slave

  // is running in a container (via docker_mesos_image flag)

  // we want the executor to keep running when the slave container

  // dies.

  return container.get()->launch
= fetch(containerId, slaveId)

    .then(defer(self(), [=]() { return pull(containerId); }))

    .then(defer(self(), [=]() {

      return mountPersistentVolumes(containerId);

    }))

    .then(defer(self(), [=]() {

      return launchExecutorContainer(containerId, containerName);

    }))

    .then(defer(self(), [=](const Docker::Container& dockerContainer) {

      return checkpointExecutor(containerId, dockerContainer);

    }))

    .then(defer(self(), [=](pid_t pid) {

      return reapExecutor(containerId, pid);

    }));

}

 

如果是TaskInfo里面的ContainerInfo,则调用launchExecutorProcess(containerId)。

如果是ExecutorInfo里面的ContainerInfo,则调用launchExecutorContainer(containerId, containerName)。

 

DockerContainerizerProcess::launchExecutorProcess实现如下:

Future<pid_t> DockerContainerizerProcess::launchExecutorProcess(

    const ContainerID& containerId)

{

  Container* container = containers_[containerId];

  container->state = Container::RUNNING;

 

  // Prepare environment variables for the executor.

  map<string, string> environment = executorEnvironment(

      container->executor,

      container->directory,

      container->slaveId,

      container->slavePid,

      container->checkpoint,

      flags,

      false);

 

  // Include any enviroment variables from ExecutorInfo.

  foreach (const Environment::Variable& variable,

           container->executor.command().environment().variables()) {

    environment[variable.name()] = variable.value();

  }

 

  // Pass GLOG flag to the executor.

  const Option<string> glog = os::getenv("GLOG_v");

  if (glog.isSome()) {

    environment["GLOG_v"] = glog.get();

  }

 

  vector<string> argv;

  argv.push_back("mesos-docker-executor");

 

  return logger->prepare(container->executor, container->directory)

    .then(defer(

        self(),

        [=](const ContainerLogger::SubprocessInfo& subprocessInfo)

          -> Future<pid_t> {

    // If we are on systemd, then extend the life of the executor. Any

    // grandchildren's lives will also be extended.

    std::vector<Subprocess::Hook> parentHooks;

    // Construct the mesos-docker-executor using the "name" we gave the

    // container (to distinguish it from Docker containers not created

    // by Mesos).

    Try<Subprocess> s = subprocess(

        path::join(flags.launcher_dir, "mesos-docker-executor"),

        argv,

        Subprocess::PIPE(),

        subprocessInfo.out,

        subprocessInfo.err,

        dockerFlags(flags, container->name(), container->directory),

        environment,

        lambda::bind(&setup, container->directory),

        None(),

        parentHooks);

 

    if (s.isError()) {

      return Failure("Failed
to fork executor: " + s.error());

    }

 

    // Checkpoint the executor's pid (if necessary).

    Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid());

 

    if (checkpointed.isError()) {

      return Failure(

          "Failed to checkpoint executor's pid: " + checkpointed.error());

    }

 

    // Checkpoing complete, now synchronize with the process so that it

    // can continue to execute.

    CHECK_SOME(s.get().in());

    char c;

    ssize_t length;

    while ((length = write(s.get().in().get(),
&c, sizeof(c))) == -1 &&

           errno == EINTR);

 

    if (length != sizeof(c)) {

      return Failure("Failed
to synchronize with child process: " +

                     os::strerror(errno));

    }

 

    return s.get().pid();

  }));

}

 

这个函数最终运行一个名为mesos-docker-executor的子进程,这是一个独立的二进制进程。这也是大多数使用mesos运行Docker的方式。

[root@a061f582-9be2-45a8-bda5-2280926f825c ~]# ps aux | grep mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04
| grep -v grep

root 13538 0.2 0.1 802432 18120 ? Ssl Jul27 16:03 mesos-docker-executor --container=mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 --docker=docker
--docker_socket=/var/run/docker.sock --help=false --launcher_dir=/usr/libexec/mesos --mapped_directory=/mnt/mesos/sandbox --sandbox_directory=/tmp/mesos/slaves/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2/frameworks/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-0000/executors/linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa/runs/838a1dc0-cc13-4bd0-9380-77809f95ad04
--stop_timeout=0ns

root 13548 0.0 0.0 126860 14916 ? Sl Jul27 0:00 docker -H unix:///var/run/docker.sock
run --privileged --cpu-shares 102 --memory 536870912 -e MARATHON_APP_VERSION=2016-07-06T10:44:54.554Z -e HOST=10.25.161.248 -e MARATHON_APP_RESOURCE_CPUS=0.1 -e MONGODB_NODES=10.25.161.248 -e MARATHON_APP_DOCKER_IMAGE=linkerrepository/linkerdcos_mongodb_repl:1.0.1
-e PORT_10001=31166 -e MESOS_TASK_ID=linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa -e PORT=31166 -e MARATHON_APP_RESOURCE_MEM=512.0 -e ENNAME=eth0 -e PORTS=31166 -e MARATHON_APP_RESOURCE_DISK=0.0 -e MARATHON_APP_LABELS= -e MARATHON_APP_ID=/linkerdcos/cluster/mongodb
-e PORT0=31166 -e MESOS_SANDBOX=/mnt/mesos/sandbox -e MESOS_CONTAINER_NAME=mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 -v /opt:/data:rw -v /tmp/mesos/slaves/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2/frameworks/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-0000/executors/linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa/runs/838a1dc0-cc13-4bd0-9380-77809f95ad04:/mnt/mesos/sandbox
--net host --name mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 linkerrepository/linkerdcos_mongodb_repl:1.0.1

[root@a061f582-9be2-45a8-bda5-2280926f825c ~]# docker ps | grep mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04

43a45be25f37 linkerrepository/linkerdcos_mongodb_repl:1.0.1 "/scripts/run.sh "
5 days ago Up 5 days mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04

 

DockerContainerizerProcess::launchExecutorContainer实现如下:

Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer(

    const ContainerID& containerId,

    const string& containerName)

{

  if (!containers_.contains(containerId)) {

    return Failure("Container is already destroyed");

  }

 

  Container* container = containers_[containerId];

  container->state = Container::RUNNING;

 

  return logger->prepare(container->executor, container->directory)

    .then(defer(

        self(),

        [=](const ContainerLogger::SubprocessInfo& subprocessInfo)

          -> Future<Docker::Container> {

    // Start the executor in a Docker container.

    // This executor could either be a custom executor specified by an

    // ExecutorInfo, or the docker executor.

    Future<Nothing> run = docker->run(

        container->container,

        container->command,

        containerName,

        container->directory,

        flags.sandbox_directory,

        container->resources,

        container->environment,

        subprocessInfo.out,

        subprocessInfo.err);

 

    Owned<Promise<Docker::Container>> promise(newPromise<Docker::Container>());

    // We like to propogate the run failure when run fails so slave can

    // send this failure back to the scheduler. Otherwise we return

    // inspect's result or its failure, which should not fail when

    // the container isn't launched.

    Future<Docker::Container> inspect =

      docker->inspect(containerName, slave::DOCKER_INSPECT_DELAY)

        .onAny([=](Future<Docker::Container> f) {

            // We cannot associate the promise outside of the callback

            // because we like to propagate run's failure when

            // available.

            promise->associate(f);

        });

 

    run.onFailed([=](const string&
failure) mutable {

      inspect.discard();

      promise->fail(failure);

    });

 

    return promise->future();

  }));

}

 

运行一个Docker,在Docker里面运行Executor
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: