您的位置:首页 > 大数据 > 人工智能

Mesos源码分析(13): MesosContainerier运行一个Task

2016-08-06 22:32 549 查看
MesosContainerizer的实现在文件src/slave/containerizer/mesos/containerizer.cpp中

 

Future<bool> MesosContainerizer::launch(

    const ContainerID& containerId,

    const TaskInfo& taskInfo,

    const ExecutorInfo& executorInfo,

    const string& directory,

    const Option<string>& user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)

{

  return dispatch(process.get(),

                  &MesosContainerizerProcess::launch,

                  containerId,

                  taskInfo,

                  executorInfo,

                  directory,

                  user,

                  slaveId,

                  slavePid,

                  checkpoint);

}

 

转而调用MesosContainerizerProcess::launch,只有executorInfo.has_container()是Mesos的时候,才使用MesosContainerizer.

// Launching an executor involves the following steps:

// 1. Call prepare on each isolator.

// 2. Fork the executor. The forked child is blocked from exec'ing until it has

// been isolated.

// 3. Isolate the executor. Call isolate with the pid for each isolator.

// 4. Fetch the executor.

// 5. Exec the executor. The forked child is signalled to continue. It will

// first execute any preparation commands from isolators and then exec the

// executor.

Future<bool> MesosContainerizerProcess::launch(

    const ContainerID& containerId,

    const Option<TaskInfo>& taskInfo,

    const ExecutorInfo& _executorInfo,

    const string& directory,

    const Option<string>&
user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint)

{

  if (containers_.contains(containerId)) {

    return Failure("Container
already started");

  }

 

  if (taskInfo.isSome() &&

      taskInfo.get().has_container() &&

      taskInfo.get().container().type() != ContainerInfo::MESOS) {

    return false;

  }

 

  // NOTE: We make a copy of the executor info because we may mutate

  // it with default container info.

  ExecutorInfo executorInfo = _executorInfo;

 

  if (executorInfo.has_container() &&

      executorInfo.container().type() != ContainerInfo::MESOS) {

    return false;

  }

 

  // Add the default container info to the executor info.

  // TODO(jieyu): Rename the flag to be default_mesos_container_info.

  if (!executorInfo.has_container() &&

      flags.default_container_info.isSome()) {

    executorInfo.mutable_container()->CopyFrom(

        flags.default_container_info.get());

  }

 

  LOG(INFO) << "Starting container '" << containerId

            << "' for executor '" << executorInfo.executor_id()

            << "' of framework '" << executorInfo.framework_id() << "'";

 

  Container* container = new Container();

  container->directory = directory;

  container->state = PROVISIONING;

  container->resources = executorInfo.resources();

 

  // We need to set the `launchInfos` to be a ready future initially

  // before we starting calling isolator->prepare() because otherwise,

  // the destroy will wait forever trying to wait for this future to

  // be ready , which it never will. See MESOS-4878.

  container->launchInfos = list<Option<ContainerLaunchInfo>>();

 

  containers_.put(containerId, Owned<Container>(container));

 

  if (!executorInfo.has_container()) {

    return prepare(containerId, taskInfo, executorInfo, directory, user, None())

      .then(defer(self(),

                  &Self::__launch,

                  containerId,

                  executorInfo,

                  directory,

                  user,

                  slaveId,

                  slavePid,

                  checkpoint,

                  lambda::_1));

  }

 

  // Provision the root filesystem if needed.

  CHECK_EQ(executorInfo.container().type(), ContainerInfo::MESOS);

 

  if (!executorInfo.container().mesos().has_image()) {

    return _launch(containerId,

                   taskInfo,

                   executorInfo,

                   directory,

                   user,

                   slaveId,

                   slavePid,

                   checkpoint,

                   None());

  }

 

  const Image& image = executorInfo.container().mesos().image();

 

  Future<ProvisionInfo> future =

    provisioner->provision(containerId, image);

 

  container->provisionInfos.push_back(future);

 

  return future

    .then(defer(PID<MesosContainerizerProcess>(this),

                &MesosContainerizerProcess::_launch,

                containerId,

                taskInfo,

                executorInfo,

                directory,

                user,

                slaveId,

                slavePid,

                checkpoint,

                lambda::_1));

}

 

大家注意ExecutorInfo里面的ContainerInfo和TaskInfo里面的ContainerInfo不同。

如果大家看protocol buffer的定义文件include/mesos/mesos.proto里面,ExecutorInfo里面有一个ContainerInfo

message ExecutorInfo {

  required ExecutorID executor_id = 1;

  optional FrameworkID framework_id = 8; // TODO(benh): Make this required.

  required CommandInfo command = 7;

  // Executor provided with a container will launch the container

  // with the executor's CommandInfo and we expect the container to

  // act as a Mesos executor.

  optional ContainerInfo container = 11;

  repeated Resource resources = 5;

  optional string name = 9;

 

  // 'source' is an identifier style string used by frameworks to

  // track the source of an executor. This is useful when it's

  // possible for different executor ids to be related semantically.

  //

  // NOTE: 'source' is exposed alongside the resource usage of the

  // executor via JSON on the slave. This allows users to import usage

  // information into a time series database for monitoring.

  optional string source = 10;

 

  optional bytes data = 4;

 

  // Service discovery information for the executor. It is not

  // interpreted or acted upon by Mesos. It is up to a service

  // discovery system to use this information as needed and to handle

  // executors without service discovery information.

  optional DiscoveryInfo discovery = 12;

}

 

如果ExecutorInfo的ContainerInfo有值,则executor会启动在这个container里面。

那marathon里面的container info放在哪里呢?

TaskInfo里面也有一个ContainerInfo

message TaskInfo {

  required string name = 1;

  required TaskID task_id = 2;

  required SlaveID slave_id = 3;

  repeated Resource resources = 4;

  optional ExecutorInfo executor = 5;

  optional CommandInfo command = 7;

  // Task provided with a container will launch the container as part

  // of this task paired with the task's CommandInfo.

  optional ContainerInfo container = 9;

  optional bytes data = 6;

  // A health check for the task (currently in *alpha* and initial

  // support will only be for TaskInfo's that have a CommandInfo).

  optional HealthCheck health_check = 8;

 

  // Labels are free-form key value pairs which are exposed through

  // master and slave endpoints. Labels will not be interpreted or

  // acted upon by Mesos itself. As opposed to the data field, labels

  // will be kept in memory on master and slave processes. Therefore,

  // labels should be used to tag tasks with light-weight meta-data.

  // Labels should not contain duplicate key-value pairs.

  optional Labels labels = 10;

 

  // Service discovery information for the task. It is not interpreted

  // or acted upon by Mesos. It is up to a service discovery system

  // to use this information as needed and to handle tasks without

  // service discovery information.

  optional DiscoveryInfo discovery = 11;

}

 

如果TaskInfo里面的ContainerInfo有值,才是真正的运行容器,容器里面运行任务。

 

最终会调用MesosContainerizerProcess::__launch

Future<bool> MesosContainerizerProcess::__launch(

    const ContainerID& containerId,

    const ExecutorInfo& executorInfo,

    const string& directory,

    const Option<string>&
user,

    const SlaveID& slaveId,

    const PID<Slave>& slavePid,

    bool checkpoint,

    const list<Option<ContainerLaunchInfo>>& launchInfos)

{

……

  // Prepare environment variables for the executor.

  map<string, string> environment = executorEnvironment(

      executorInfo,

      directory,

      slaveId,

      slavePid,

      checkpoint,

      flags);

 

  // Determine the root filesystem for the container. Only one

  // isolator should return the container root filesystem.

  Option<string> rootfs;

 

  // Determine the executor launch command for the container.

  // At most one command can be returned from docker runtime

  // isolator if a docker image is specifed.

  Option<CommandInfo> executorLaunchCommand;

  Option<string> workingDirectory;

 

  foreach (const Option<ContainerLaunchInfo>& launchInfo,
launchInfos) {

    if (launchInfo.isSome() && launchInfo->has_rootfs()) {

      if (rootfs.isSome()) {

        return Failure("Only
one isolator should return the container rootfs");

      } else {

        rootfs = launchInfo->rootfs();

      }

    }

 

    if (launchInfo.isSome() && launchInfo->has_environment()) {

      foreach (const Environment::Variable& variable,

               launchInfo->environment().variables()) {

        const string& name = variable.name();

        const string& value =
variable.value();

 

        if (environment.count(name)) {

          VLOG(1) << "Overwriting environment variable '"

                  << name << "', original: '"

                  << environment[name] << "', new: '"

                  << value << "',
for container "

                  << containerId;

        }

 

        environment[name] = value;

      }

    }

 

    if (launchInfo.isSome() && launchInfo->has_command()) {

      if (executorLaunchCommand.isSome()) {

        return Failure("At most
one command can be returned from isolators");

      } else {

        executorLaunchCommand = launchInfo->command();

      }

    }

 

    if (launchInfo.isSome() && launchInfo->has_working_directory()) {

      if (workingDirectory.isSome()) {

        return Failure(

            "At most one working directory can be returned from isolators");

      } else {

        workingDirectory = launchInfo->working_directory();

      }

    }

  }

 

  // TODO(jieyu): Consider moving this to 'executorEnvironment' and

  // consolidating with docker containerizer.

  environment["MESOS_SANDBOX"] =

    rootfs.isSome() ? flags.sandbox_directory : directory;

 

  // Include any enviroment variables from CommandInfo.

  foreach (const Environment::Variable&
variable,

           executorInfo.command().environment().variables()) {

    environment[variable.name()] = variable.value();

  }

 

  JSON::Array commandArray;

  int namespaces = 0;

  foreach (const Option<ContainerLaunchInfo>& launchInfo,
launchInfos) {

    if (!launchInfo.isSome()) {

      continue;

    }

 

    // Populate the list of additional commands to be run inside the container

    // context.

    foreach (const CommandInfo&
command, launchInfo->commands()) {

      commandArray.values.emplace_back(JSON::protobuf(command));

    }

 

    // Process additional environment variables returned by isolators.

    if (launchInfo->has_environment()) {

      foreach (const Environment::Variable&
variable,

          launchInfo->environment().variables()) {

        environment[variable.name()] = variable.value();

      }

    }

 

    if (launchInfo->has_namespaces()) {

      namespaces |= launchInfo->namespaces();

    }

  }

 

  // TODO(jieyu): Use JSON::Array once we have generic parse support.

  JSON::Object commands;

  commands.values["commands"] = commandArray;

 

  return logger->prepare(executorInfo, directory)

    .then(defer(

        self(),

        [=](const ContainerLogger::SubprocessInfo& subprocessInfo)

          -> Future<bool> {

    // Use a pipe to block the child until it's been isolated.

    int pipes[2];

 

    // We assume this should not fail under reasonable conditions so

    // we use CHECK.

    CHECK(pipe(pipes) == 0);

 

    // Prepare the flags to pass to the launch process.

    MesosContainerizerLaunch::Flags launchFlags;

 

    launchFlags.command = executorLaunchCommand.isSome()

      ? JSON::protobuf(executorLaunchCommand.get())

      : JSON::protobuf(executorInfo.command());

 

    launchFlags.sandbox = rootfs.isSome()

      ? flags.sandbox_directory

      : directory;

 

    // NOTE: If the executor shares the host filesystem, we should not

    // allow them to 'cd' into an arbitrary directory because that'll

    // create security issues.

    if (rootfs.isNone() && workingDirectory.isSome()) {

      LOG(WARNING) << "Ignore working directory '" << workingDirectory.get()

                   << "' specified in container launch info for container "

                   << containerId << " since the executor is using the "

                   << "host filesystem";

    } else {

      launchFlags.working_directory = workingDirectory;

    }

 

    launchFlags.pipe_read = pipes[0];

    launchFlags.pipe_write = pipes[1];

    launchFlags.commands = commands;

 

    // Fork the child using launcher.

    vector<string> argv(2);

    argv[0] = MESOS_CONTAINERIZER;

    argv[1] = MesosContainerizerLaunch::NAME;

 

    Try<pid_t> forked = launcher->fork(

        containerId,

        path::join(flags.launcher_dir, MESOS_CONTAINERIZER),

        argv,

        Subprocess::FD(STDIN_FILENO),

        (local ? Subprocess::FD(STDOUT_FILENO)

               : Subprocess::IO(subprocessInfo.out)),

        (local ? Subprocess::FD(STDERR_FILENO)

               : Subprocess::IO(subprocessInfo.err)),

        launchFlags,

        environment,

        None(),

        namespaces); // 'namespaces' will be ignored by PosixLauncher.

 

    if (forked.isError()) {

      return Failure("Failed
to fork executor: " + forked.error());

    }

    pid_t pid = forked.get();

 

    // Checkpoint the executor's pid if requested.

    if (checkpoint) {

      const string& path
= slave::paths::getForkedPidPath(

          slave::paths::getMetaRootDir(flags.work_dir),

          slaveId,

          executorInfo.framework_id(),

          executorInfo.executor_id(),

          containerId);

 

      LOG(INFO) << "Checkpointing executor's forked pid " << pid

                << " to '" << path << "'";

 

      Try<Nothing> checkpointed =

        slave::state::checkpoint(path, stringify(pid));

 

      if (checkpointed.isError()) {

        LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"

                   << path << "': " << checkpointed.error();

 

        return Failure("Could not checkpoint executor's pid");

      }

    }

 

    // Monitor the executor's pid. We keep the future because we'll

    // refer to it again during container destroy.

    Future<Option<int>> status = process::reap(pid);

    status.onAny(defer(self(), &Self::reaped, containerId));

    containers_[containerId]->status = status;

 

    return isolate(containerId, pid)

      .then(defer(self(),

                  &Self::fetch,

                  containerId,

                  executorInfo.command(),

                  directory,

                  user,

                  slaveId))

      .then(defer(self(), &Self::exec, containerId, pipes[1]))

      .onAny(lambda::bind(&os::close, pipes[0]))

      .onAny(lambda::bind(&os::close, pipes[1]));

  }));

}

 

最终运行的二进制文件为const char MESOS_CONTAINERIZER[] = "mesos-containerizer";

 

Mesos-containerizer是一个独立运行的二进制文件,它的main函数在src/slave/containerizer/mesos/main.c

int main(int argc, char**
argv)

{

  return Subcommand::dispatch(

      None(),

      argc,

      argv,

      new MesosContainerizerLaunch(),

      new MesosContainerizerMount());

}

 

Src/slave/containerizer/mesos/launch.cpp中MesosContainerizerLaunch::execute()函数最终调用

if (command.get().shell())
{

  // Execute the command using shell.

  execlp("sh", "sh",
"-c", command.get().value().c_str(), (char*)
NULL);

} else {

  // Use os::execvpe to launch the command.

  char** argv = new char*[command.get().arguments().size()
+ 1];

  for (int i = 0; i < command.get().arguments().size();
i++) {

    argv[i] = strdup(command.get().arguments(i).c_str());

  }

  argv[command.get().arguments().size()] = NULL;

 

  execvp(command.get().value().c_str(), argv);

}

 

来运行executor的二进制文件。

 

如果是前面叙述的TestFramework,则运行的executor是TestExecutor,也就要求mesos-slave的相应目录下有这个二进制文件。

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: