Mesos源码分析(13): MesosContainerier运行一个Task
2016-08-06 22:32
549 查看
MesosContainerizer的实现在文件src/slave/containerizer/mesos/containerizer.cpp中
转而调用MesosContainerizerProcess::launch,只有executorInfo.has_container()是Mesos的时候,才使用MesosContainerizer.
大家注意ExecutorInfo里面的ContainerInfo和TaskInfo里面的ContainerInfo不同。
如果大家看protocol buffer的定义文件include/mesos/mesos.proto里面,ExecutorInfo里面有一个ContainerInfo
如果ExecutorInfo的ContainerInfo有值,则executor会启动在这个container里面。
那marathon里面的container info放在哪里呢?
TaskInfo里面也有一个ContainerInfo
如果TaskInfo里面的ContainerInfo有值,才是真正的运行容器,容器里面运行任务。
最终会调用MesosContainerizerProcess::__launch
最终运行的二进制文件为const char MESOS_CONTAINERIZER[] = "mesos-containerizer";
Mesos-containerizer是一个独立运行的二进制文件,它的main函数在src/slave/containerizer/mesos/main.c
Src/slave/containerizer/mesos/launch.cpp中MesosContainerizerLaunch::execute()函数最终调用
来运行executor的二进制文件。
如果是前面叙述的TestFramework,则运行的executor是TestExecutor,也就要求mesos-slave的相应目录下有这个二进制文件。
Future<bool> MesosContainerizer::launch( const ContainerID& containerId, const TaskInfo& taskInfo, const ExecutorInfo& executorInfo, const string& directory, const Option<string>& user, const SlaveID& slaveId, const PID<Slave>& slavePid, bool checkpoint) { return dispatch(process.get(), &MesosContainerizerProcess::launch, containerId, taskInfo, executorInfo, directory, user, slaveId, slavePid, checkpoint); } |
转而调用MesosContainerizerProcess::launch,只有executorInfo.has_container()是Mesos的时候,才使用MesosContainerizer.
// Launching an executor involves the following steps: // 1. Call prepare on each isolator. // 2. Fork the executor. The forked child is blocked from exec'ing until it has // been isolated. // 3. Isolate the executor. Call isolate with the pid for each isolator. // 4. Fetch the executor. // 5. Exec the executor. The forked child is signalled to continue. It will // first execute any preparation commands from isolators and then exec the // executor. Future<bool> MesosContainerizerProcess::launch( const ContainerID& containerId, const Option<TaskInfo>& taskInfo, const ExecutorInfo& _executorInfo, const string& directory, const Option<string>& user, const SlaveID& slaveId, const PID<Slave>& slavePid, bool checkpoint) { if (containers_.contains(containerId)) { return Failure("Container already started"); } if (taskInfo.isSome() && taskInfo.get().has_container() && taskInfo.get().container().type() != ContainerInfo::MESOS) { return false; } // NOTE: We make a copy of the executor info because we may mutate // it with default container info. ExecutorInfo executorInfo = _executorInfo; if (executorInfo.has_container() && executorInfo.container().type() != ContainerInfo::MESOS) { return false; } // Add the default container info to the executor info. // TODO(jieyu): Rename the flag to be default_mesos_container_info. if (!executorInfo.has_container() && flags.default_container_info.isSome()) { executorInfo.mutable_container()->CopyFrom( flags.default_container_info.get()); } LOG(INFO) << "Starting container '" << containerId << "' for executor '" << executorInfo.executor_id() << "' of framework '" << executorInfo.framework_id() << "'"; Container* container = new Container(); container->directory = directory; container->state = PROVISIONING; container->resources = executorInfo.resources(); // We need to set the `launchInfos` to be a ready future initially // before we starting calling isolator->prepare() because otherwise, // the destroy will wait forever trying to wait for this future to // be ready , which it never will. See MESOS-4878. container->launchInfos = list<Option<ContainerLaunchInfo>>(); containers_.put(containerId, Owned<Container>(container)); if (!executorInfo.has_container()) { return prepare(containerId, taskInfo, executorInfo, directory, user, None()) .then(defer(self(), &Self::__launch, containerId, executorInfo, directory, user, slaveId, slavePid, checkpoint, lambda::_1)); } // Provision the root filesystem if needed. CHECK_EQ(executorInfo.container().type(), ContainerInfo::MESOS); if (!executorInfo.container().mesos().has_image()) { return _launch(containerId, taskInfo, executorInfo, directory, user, slaveId, slavePid, checkpoint, None()); } const Image& image = executorInfo.container().mesos().image(); Future<ProvisionInfo> future = provisioner->provision(containerId, image); container->provisionInfos.push_back(future); return future .then(defer(PID<MesosContainerizerProcess>(this), &MesosContainerizerProcess::_launch, containerId, taskInfo, executorInfo, directory, user, slaveId, slavePid, checkpoint, lambda::_1)); } |
大家注意ExecutorInfo里面的ContainerInfo和TaskInfo里面的ContainerInfo不同。
如果大家看protocol buffer的定义文件include/mesos/mesos.proto里面,ExecutorInfo里面有一个ContainerInfo
message ExecutorInfo { required ExecutorID executor_id = 1; optional FrameworkID framework_id = 8; // TODO(benh): Make this required. required CommandInfo command = 7; // Executor provided with a container will launch the container // with the executor's CommandInfo and we expect the container to // act as a Mesos executor. optional ContainerInfo container = 11; repeated Resource resources = 5; optional string name = 9; // 'source' is an identifier style string used by frameworks to // track the source of an executor. This is useful when it's // possible for different executor ids to be related semantically. // // NOTE: 'source' is exposed alongside the resource usage of the // executor via JSON on the slave. This allows users to import usage // information into a time series database for monitoring. optional string source = 10; optional bytes data = 4; // Service discovery information for the executor. It is not // interpreted or acted upon by Mesos. It is up to a service // discovery system to use this information as needed and to handle // executors without service discovery information. optional DiscoveryInfo discovery = 12; } |
如果ExecutorInfo的ContainerInfo有值,则executor会启动在这个container里面。
那marathon里面的container info放在哪里呢?
TaskInfo里面也有一个ContainerInfo
message TaskInfo { required string name = 1; required TaskID task_id = 2; required SlaveID slave_id = 3; repeated Resource resources = 4; optional ExecutorInfo executor = 5; optional CommandInfo command = 7; // Task provided with a container will launch the container as part // of this task paired with the task's CommandInfo. optional ContainerInfo container = 9; optional bytes data = 6; // A health check for the task (currently in *alpha* and initial // support will only be for TaskInfo's that have a CommandInfo). optional HealthCheck health_check = 8; // Labels are free-form key value pairs which are exposed through // master and slave endpoints. Labels will not be interpreted or // acted upon by Mesos itself. As opposed to the data field, labels // will be kept in memory on master and slave processes. Therefore, // labels should be used to tag tasks with light-weight meta-data. // Labels should not contain duplicate key-value pairs. optional Labels labels = 10; // Service discovery information for the task. It is not interpreted // or acted upon by Mesos. It is up to a service discovery system // to use this information as needed and to handle tasks without // service discovery information. optional DiscoveryInfo discovery = 11; } |
如果TaskInfo里面的ContainerInfo有值,才是真正的运行容器,容器里面运行任务。
最终会调用MesosContainerizerProcess::__launch
Future<bool> MesosContainerizerProcess::__launch( const ContainerID& containerId, const ExecutorInfo& executorInfo, const string& directory, const Option<string>& user, const SlaveID& slaveId, const PID<Slave>& slavePid, bool checkpoint, const list<Option<ContainerLaunchInfo>>& launchInfos) { …… // Prepare environment variables for the executor. map<string, string> environment = executorEnvironment( executorInfo, directory, slaveId, slavePid, checkpoint, flags); // Determine the root filesystem for the container. Only one // isolator should return the container root filesystem. Option<string> rootfs; // Determine the executor launch command for the container. // At most one command can be returned from docker runtime // isolator if a docker image is specifed. Option<CommandInfo> executorLaunchCommand; Option<string> workingDirectory; foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) { if (launchInfo.isSome() && launchInfo->has_rootfs()) { if (rootfs.isSome()) { return Failure("Only one isolator should return the container rootfs"); } else { rootfs = launchInfo->rootfs(); } } if (launchInfo.isSome() && launchInfo->has_environment()) { foreach (const Environment::Variable& variable, launchInfo->environment().variables()) { const string& name = variable.name(); const string& value = variable.value(); if (environment.count(name)) { VLOG(1) << "Overwriting environment variable '" << name << "', original: '" << environment[name] << "', new: '" << value << "', for container " << containerId; } environment[name] = value; } } if (launchInfo.isSome() && launchInfo->has_command()) { if (executorLaunchCommand.isSome()) { return Failure("At most one command can be returned from isolators"); } else { executorLaunchCommand = launchInfo->command(); } } if (launchInfo.isSome() && launchInfo->has_working_directory()) { if (workingDirectory.isSome()) { return Failure( "At most one working directory can be returned from isolators"); } else { workingDirectory = launchInfo->working_directory(); } } } // TODO(jieyu): Consider moving this to 'executorEnvironment' and // consolidating with docker containerizer. environment["MESOS_SANDBOX"] = rootfs.isSome() ? flags.sandbox_directory : directory; // Include any enviroment variables from CommandInfo. foreach (const Environment::Variable& variable, executorInfo.command().environment().variables()) { environment[variable.name()] = variable.value(); } JSON::Array commandArray; int namespaces = 0; foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) { if (!launchInfo.isSome()) { continue; } // Populate the list of additional commands to be run inside the container // context. foreach (const CommandInfo& command, launchInfo->commands()) { commandArray.values.emplace_back(JSON::protobuf(command)); } // Process additional environment variables returned by isolators. if (launchInfo->has_environment()) { foreach (const Environment::Variable& variable, launchInfo->environment().variables()) { environment[variable.name()] = variable.value(); } } if (launchInfo->has_namespaces()) { namespaces |= launchInfo->namespaces(); } } // TODO(jieyu): Use JSON::Array once we have generic parse support. JSON::Object commands; commands.values["commands"] = commandArray; return logger->prepare(executorInfo, directory) .then(defer( self(), [=](const ContainerLogger::SubprocessInfo& subprocessInfo) -> Future<bool> { // Use a pipe to block the child until it's been isolated. int pipes[2]; // We assume this should not fail under reasonable conditions so // we use CHECK. CHECK(pipe(pipes) == 0); // Prepare the flags to pass to the launch process. MesosContainerizerLaunch::Flags launchFlags; launchFlags.command = executorLaunchCommand.isSome() ? JSON::protobuf(executorLaunchCommand.get()) : JSON::protobuf(executorInfo.command()); launchFlags.sandbox = rootfs.isSome() ? flags.sandbox_directory : directory; // NOTE: If the executor shares the host filesystem, we should not // allow them to 'cd' into an arbitrary directory because that'll // create security issues. if (rootfs.isNone() && workingDirectory.isSome()) { LOG(WARNING) << "Ignore working directory '" << workingDirectory.get() << "' specified in container launch info for container " << containerId << " since the executor is using the " << "host filesystem"; } else { launchFlags.working_directory = workingDirectory; } launchFlags.pipe_read = pipes[0]; launchFlags.pipe_write = pipes[1]; launchFlags.commands = commands; // Fork the child using launcher. vector<string> argv(2); argv[0] = MESOS_CONTAINERIZER; argv[1] = MesosContainerizerLaunch::NAME; Try<pid_t> forked = launcher->fork( containerId, path::join(flags.launcher_dir, MESOS_CONTAINERIZER), argv, Subprocess::FD(STDIN_FILENO), (local ? Subprocess::FD(STDOUT_FILENO) : Subprocess::IO(subprocessInfo.out)), (local ? Subprocess::FD(STDERR_FILENO) : Subprocess::IO(subprocessInfo.err)), launchFlags, environment, None(), namespaces); // 'namespaces' will be ignored by PosixLauncher. if (forked.isError()) { return Failure("Failed to fork executor: " + forked.error()); } pid_t pid = forked.get(); // Checkpoint the executor's pid if requested. if (checkpoint) { const string& path = slave::paths::getForkedPidPath( slave::paths::getMetaRootDir(flags.work_dir), slaveId, executorInfo.framework_id(), executorInfo.executor_id(), containerId); LOG(INFO) << "Checkpointing executor's forked pid " << pid << " to '" << path << "'"; Try<Nothing> checkpointed = slave::state::checkpoint(path, stringify(pid)); if (checkpointed.isError()) { LOG(ERROR) << "Failed to checkpoint executor's forked pid to '" << path << "': " << checkpointed.error(); return Failure("Could not checkpoint executor's pid"); } } // Monitor the executor's pid. We keep the future because we'll // refer to it again during container destroy. Future<Option<int>> status = process::reap(pid); status.onAny(defer(self(), &Self::reaped, containerId)); containers_[containerId]->status = status; return isolate(containerId, pid) .then(defer(self(), &Self::fetch, containerId, executorInfo.command(), directory, user, slaveId)) .then(defer(self(), &Self::exec, containerId, pipes[1])) .onAny(lambda::bind(&os::close, pipes[0])) .onAny(lambda::bind(&os::close, pipes[1])); })); } |
最终运行的二进制文件为const char MESOS_CONTAINERIZER[] = "mesos-containerizer";
Mesos-containerizer是一个独立运行的二进制文件,它的main函数在src/slave/containerizer/mesos/main.c
int main(int argc, char** argv) { return Subcommand::dispatch( None(), argc, argv, new MesosContainerizerLaunch(), new MesosContainerizerMount()); } |
Src/slave/containerizer/mesos/launch.cpp中MesosContainerizerLaunch::execute()函数最终调用
if (command.get().shell()) { // Execute the command using shell. execlp("sh", "sh", "-c", command.get().value().c_str(), (char*) NULL); } else { // Use os::execvpe to launch the command. char** argv = new char*[command.get().arguments().size() + 1]; for (int i = 0; i < command.get().arguments().size(); i++) { argv[i] = strdup(command.get().arguments(i).c_str()); } argv[command.get().arguments().size()] = NULL; execvp(command.get().value().c_str(), argv); } |
来运行executor的二进制文件。
如果是前面叙述的TestFramework,则运行的executor是TestExecutor,也就要求mesos-slave的相应目录下有这个二进制文件。
相关文章推荐
- Mesos源码分析(13): MesosContainerier运行一个Task
- Mesos源码分析(14): DockerContainerier运行一个Task
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- Spark源码分析之八:Task运行(二)
- spark core 1.6.0 源码分析10 Task的运行
- Spark1.6.3 Driver端 task运行完成源码分析
- Spark源码分析之七:Task运行(一)
- Mesos源码分析(15): Test Executor的运行
- Mesos源码分析(15): Test Executor的运行
- 通过源码分析一个linux进程可以运行多个android应用
- Mesos源码分析(16): mesos-docker-executor的运行
- Mesos源码分析(16): mesos-docker-executor的运行
- Spark源码分析之八:Task运行(二)
- spark core源码分析10 Task的运行
- Kettle7 ( Pentaho Data Integration )源码分析 每个step都有一个线程负责运行
- 第二人生的源码分析(三十八)构造一个消息包并发送
- 第二人生的源码分析(三十八)构造一个消息包并发送
- 第二人生的源码分析(四十一)使用Apache运行库线程
- 一个病毒源码的分析