您的位置:首页 > 大数据 > 人工智能

Mesos源码分析(13): MesosContainerier运行一个Task

2016-08-01 00:07 471 查看


Future<bool> MesosContainerizer::launch(

    const ContainerID& containerId,

    const TaskInfo& taskInfo,

    const ExecutorInfo& executorInfo,

string& directory,

    const Option<string>& user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)


  return dispatch(process.get(),












// Launching an executor involves the following steps:

// 1. Call prepare on each isolator.

// 2. Fork the executor. The forked child is blocked from exec'ing until it has

// been isolated.

// 3. Isolate the executor. Call isolate with the pid for each isolator.

// 4. Fetch the executor.

// 5. Exec the executor. The forked child is signalled to continue. It will

// first execute any preparation commands from isolators and then exec the

// executor.

Future<bool> MesosContainerizerProcess::launch(

    const ContainerID& containerId,

    const Option<TaskInfo>& taskInfo,

    const ExecutorInfo& _executorInfo,

string& directory,

    const Option<string>& user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)


  if (containers_.contains(containerId)) {

    return Failure("Container already started");


  if (taskInfo.isSome() &&

      taskInfo.get().has_container() &&

      taskInfo.get().container().type() != ContainerInfo::MESOS) {



  // NOTE: We make a copy of the executor info because we may mutate

  // it with default container info.

  ExecutorInfo executorInfo = _executorInfo;

  if (executorInfo.has_container() &&

      executorInfo.container().type() != ContainerInfo::MESOS) {



  // Add the default container info to the executor info.

  // TODO(jieyu): Rename the flag to be default_mesos_container_info.

  if (!executorInfo.has_container() &&

      flags.default_container_info.isSome()) {




  LOG(INFO) << "Starting container '" << containerId

            << "' for executor '" << executorInfo.executor_id()

            << "' of framework '" << executorInfo.framework_id() << "'";

  Container* container = new Container();

  container->directory = directory;

  container->state = PROVISIONING;

  container->resources = executorInfo.resources();

  // We need to set the `launchInfos` to be a ready future initially

  // before we starting calling isolator->prepare() because otherwise,

  // the destroy will wait forever trying to wait for this future to

  // be ready , which it never will. See MESOS-4878.

  container->launchInfos = list<Option<ContainerLaunchInfo>>();

  containers_.put(containerId, Owned<Container>(container));

  if (!executorInfo.has_container()) {

    return prepare(containerId, taskInfo, executorInfo, directory, user, None())












  // Provision the root filesystem if needed.

  CHECK_EQ(executorInfo.container().type(), ContainerInfo::MESOS);

  if (!executorInfo.container().mesos().has_image()) {

    return _launch(containerId,










  const Image& image = executorInfo.container().mesos().image();

  Future<ProvisionInfo> future =

    provisioner->provision(containerId, image);


  return future














如果大家看protocol buffer的定义文件include/mesos/mesos.proto里面,ExecutorInfo里面有一个ContainerInfo

message ExecutorInfo {

  required ExecutorID executor_id = 1;

  optional FrameworkID framework_id = 8; // TODO(benh): Make this required.

  required CommandInfo command = 7;

  // Executor provided with a container will launch the container

  // with the executor's CommandInfo and we expect the container to

  // act as a Mesos executor.

  optional ContainerInfo container = 11;

  repeated Resource resources = 5;

  optional string name = 9;

  // 'source' is an identifier style string used by frameworks to

  // track the source of an executor. This is useful when it's

  // possible for different executor ids to be related semantically.


  // NOTE: 'source' is exposed alongside the resource usage of the

  // executor via JSON on the slave. This allows users to import usage

  // information into a time series database for monitoring.

  optional string source = 10;

  optional bytes data = 4;

  // Service discovery information for the executor. It is not

  // interpreted or acted upon by Mesos. It is up to a service

  // discovery system to use this information as needed and to handle

  // executors without service discovery information.

  optional DiscoveryInfo discovery = 12;



那marathon里面的container info放在哪里呢?


message TaskInfo {

  required string name = 1;

  required TaskID task_id = 2;

  required SlaveID slave_id = 3;

  repeated Resource resources = 4;

  optional ExecutorInfo executor = 5;

  optional CommandInfo command = 7;

  // Task provided with a container will launch the container as part

  // of this task paired with the task's CommandInfo.

  optional ContainerInfo container = 9;

  optional bytes data = 6;

  // A health check for the task (currently in *alpha* and initial

  // support will only be for TaskInfo's that have a CommandInfo).

  optional HealthCheck health_check = 8;

  // Labels are free-form key value pairs which are exposed through

  // master and slave endpoints. Labels will not be interpreted or

  // acted upon by Mesos itself. As opposed to the data field, labels

  // will be kept in memory on master and slave processes. Therefore,

  // labels should be used to tag tasks with light-weight meta-data.

  // Labels should not contain duplicate key-value pairs.

  optional Labels labels = 10;

  // Service discovery information for the task. It is not interpreted

  // or acted upon by Mesos. It is up to a service discovery system

  // to use this information as needed and to handle tasks without

  // service discovery information.

  optional DiscoveryInfo discovery = 11;





Future<bool> MesosContainerizerProcess::__launch(

    const ContainerID& containerId,

    const ExecutorInfo& executorInfo,

string& directory,

    const Option<string>& user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint,

    const list<Option<ContainerLaunchInfo>>& launchInfos)



  // Prepare environment variables for the executor.

  map<string, string> environment = executorEnvironment(







  // Determine the root filesystem for the container. Only one

  // isolator should return the container root filesystem.

  Option<string> rootfs;

  // Determine the executor launch command for the container.

  // At most one command can be returned from docker runtime

  // isolator if a docker image is specifed.

  Option<CommandInfo> executorLaunchCommand;

  Option<string> workingDirectory;

  foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {

    if (launchInfo.isSome() && launchInfo->has_rootfs()) {

      if (rootfs.isSome()) {

        return Failure("Only one isolator should return the container rootfs");

      } else {

        rootfs = launchInfo->rootfs();



    if (launchInfo.isSome() && launchInfo->has_environment()) {

      foreach (const Environment::Variable& variable,

               launchInfo->environment().variables()) {

string& name = variable.name();

string& value = variable.value();

        if (environment.count(name)) {

          VLOG(1) << "Overwriting environment variable '"

                  << name << "', original: '"

                  << environment[name] << "', new: '"

                  << value << "', for container "

                  << containerId;


        environment[name] = value;



    if (launchInfo.isSome() && launchInfo->has_command()) {

      if (executorLaunchCommand.isSome()) {

        return Failure("At most one command can be returned from isolators");

      } else {

        executorLaunchCommand = launchInfo->command();



    if (launchInfo.isSome() && launchInfo->has_working_directory()) {

      if (workingDirectory.isSome()) {

        return Failure(

            "At most one working directory can be returned from isolators");

      } else {

        workingDirectory = launchInfo->working_directory();




  // TODO(jieyu): Consider moving this to 'executorEnvironment' and

  // consolidating with docker containerizer.

  environment["MESOS_SANDBOX"] =

    rootfs.isSome() ? flags.sandbox_directory : directory;

  // Include any enviroment variables from CommandInfo.

  foreach (const Environment::Variable& variable,

           executorInfo.command().environment().variables()) {

    environment[variable.name()] = variable.value();


  JSON::Array commandArray;

  int namespaces = 0;

  foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {

    if (!launchInfo.isSome()) {



    // Populate the list of additional commands to be run inside the container

    // context.

    foreach (const CommandInfo& command, launchInfo->commands()) {



    // Process additional environment variables returned by isolators.

    if (launchInfo->has_environment()) {

      foreach (const Environment::Variable& variable,

          launchInfo->environment().variables()) {

        environment[variable.name()] = variable.value();



    if (launchInfo->has_namespaces()) {

      namespaces |= launchInfo->namespaces();



  // TODO(jieyu): Use JSON::Array once we have generic parse support.

  JSON::Object commands;

  commands.values["commands"] = commandArray;

  return logger->prepare(executorInfo, directory)



        [=](const ContainerLogger::SubprocessInfo& subprocessInfo)

          -> Future<bool> {

    // Use a pipe to block the child until it's been isolated.

    int pipes[2];

    // We assume this should not fail under reasonable conditions so

    // we use CHECK.

    CHECK(pipe(pipes) == 0);

    // Prepare the flags to pass to the launch process.

    MesosContainerizerLaunch::Flags launchFlags;

    launchFlags.command = executorLaunchCommand.isSome()

      ? JSON::protobuf(executorLaunchCommand.get())

      : JSON::protobuf(executorInfo.command());

    launchFlags.sandbox = rootfs.isSome()

      ? flags.sandbox_directory

      : directory;

    // NOTE: If the executor shares the host filesystem, we should not

    // allow them to 'cd' into an arbitrary directory because that'll

    // create security issues.

    if (rootfs.isNone() && workingDirectory.isSome()) {

      LOG(WARNING) << "Ignore working directory '" << workingDirectory.get()

                   << "' specified in container launch info for container "

                   << containerId << " since the executor is using the "

                   << "host filesystem";

    } else {

      launchFlags.working_directory = workingDirectory;


    launchFlags.pipe_read = pipes[0];

    launchFlags.pipe_write = pipes[1];

    launchFlags.commands = commands;

    // Fork the child using launcher.

    vector<string> argv(2);


    argv[1] = MesosContainerizerLaunch::NAME;

    Try<pid_t> forked = launcher->fork(


        path::join(flags.launcher_dir, MESOS_CONTAINERIZER),



        (local ? Subprocess::FD(STDOUT_FILENO)

               : Subprocess::IO(subprocessInfo.out)),

        (local ? Subprocess::FD(STDERR_FILENO)

               : Subprocess::IO(subprocessInfo.err)),




        namespaces); // 'namespaces' will be ignored by PosixLauncher.

    if (forked.isError()) {

      return Failure("Failed to fork executor: " + forked.error());


    pid_t pid = forked.get();

    // Checkpoint the executor's pid if requested.

    if (checkpoint) {

string& path = slave::paths::getForkedPidPath(






      LOG(INFO) << "Checkpointing executor's forked pid " << pid

                << " to '" << path << "'";

      Try<Nothing> checkpointed =

        slave::state::checkpoint(path, stringify(pid));

      if (checkpointed.isError()) {

        LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"

                   << path << "': " << checkpointed.error();

        return Failure("Could not checkpoint executor's pid");



    // Monitor the executor's pid. We keep the future because we'll

    // refer to it again during container destroy.

    Future<Option<int>> status = process::reap(pid);

    status.onAny(defer(self(), &Self::reaped, containerId));

    containers_[containerId]->status = status;

    return isolate(containerId, pid)








      .then(defer(self(), &Self::exec, containerId, pipes[1]))

      .onAny(lambda::bind(&os::close, pipes[0]))

      .onAny(lambda::bind(&os::close, pipes[1]));



最终运行的二进制文件为const char MESOS_CONTAINERIZER[] = "mesos-containerizer";



int main(int argc, char** argv)


  return Subcommand::dispatch(




      new MesosContainerizerLaunch(),

      new MesosContainerizerMount());



if (command.get().shell()) {

  // Execute the command using shell.

  execlp("sh", "sh", "-c", command.get().value().c_str(), (char*) NULL);

} else {

  // Use os::execvpe to launch the command.

  char** argv = new
char*[command.get().arguments().size() + 1];

  for (int i = 0; i < command.get().arguments().size(); i++) {

    argv[i] = strdup(command.get().arguments(i).c_str());


  argv[command.get().arguments().size()] = NULL;

  execvp(command.get().value().c_str(), argv);






内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息