Mesos源码分析(15): Test Executor的运行
2016-08-02 00:23
232 查看
Test Executor的代码在src/examples/test_executor.cpp中
Executor的运行主要依赖于MesosExecutorDriver作为封装,和mesos-slave进行通信。
MesosExecutorDriver的实现在src/exec/exec.cpp中
启动另一个线程ExecutorProcess,它的构造函数如下,注册了很多消息处理函数。
在ExecutorProcess的initiailize的函数中,向mesos-slave发送消息进行注册。
当Mesos-slave向TestExecutor发送RunTaskMessage消息的时候,ExecutorProcess调用runTask函数。
最终调用executor的launchTask
在src/examples/test_executor.cpp中
int main(int argc, char** argv) { TestExecutor executor; MesosExecutorDriver driver(&executor); return driver.run() == DRIVER_STOPPED ? 0 : 1; } |
Executor的运行主要依赖于MesosExecutorDriver作为封装,和mesos-slave进行通信。
MesosExecutorDriver的实现在src/exec/exec.cpp中
Status MesosExecutorDriver::run() { Status status = start(); return status != DRIVER_RUNNING ? status : join(); } |
Status MesosExecutorDriver::start() { synchronized (mutex) { if (status != DRIVER_NOT_STARTED) { return status; } // Set stream buffering mode to flush on newlines so that we // capture logs from user processes even when output is redirected // to a file. setvbuf(stdout, 0, _IOLBF, 0); setvbuf(stderr, 0, _IOLBF, 0); bool local; UPID slave; SlaveID slaveId; FrameworkID frameworkId; ExecutorID executorId; string workDirectory; bool checkpoint; Option<string> value; std::istringstream iss; // Check if this is local (for example, for testing). local = os::getenv("MESOS_LOCAL").isSome(); // Get slave PID from environment. value = os::getenv("MESOS_SLAVE_PID"); if (value.isNone()) { EXIT(EXIT_FAILURE) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment"; } slave = UPID(value.get()); CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'"; // Get slave ID from environment. value = os::getenv("MESOS_SLAVE_ID"); if (value.isNone()) { EXIT(EXIT_FAILURE) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment"; } slaveId.set_value(value.get()); // Get framework ID from environment. value = os::getenv("MESOS_FRAMEWORK_ID"); if (value.isNone()) { EXIT(EXIT_FAILURE) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment"; } frameworkId.set_value(value.get()); // Get executor ID from environment. value = os::getenv("MESOS_EXECUTOR_ID"); if (value.isNone()) { EXIT(EXIT_FAILURE) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment"; } executorId.set_value(value.get()); // Get working directory from environment. value = os::getenv("MESOS_DIRECTORY"); if (value.isNone()) { EXIT(EXIT_FAILURE) << "Expecting 'MESOS_DIRECTORY' to be set in the environment"; } workDirectory = value.get(); // Get checkpointing status from environment. value = os::getenv("MESOS_CHECKPOINT"); checkpoint = value.isSome() && value.get() == "1"; Duration recoveryTimeout = RECOVERY_TIMEOUT; // Get the recovery timeout if checkpointing is enabled. if (checkpoint) { value = os::getenv("MESOS_RECOVERY_TIMEOUT"); if (value.isSome()) { Try<Duration> _recoveryTimeout = Duration::parse(value.get()); if (_recoveryTimeout.isError()) { EXIT(EXIT_FAILURE) << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': " << _recoveryTimeout.error(); } recoveryTimeout = _recoveryTimeout.get(); } } CHECK(process == NULL); process = new ExecutorProcess( slave, this, executor, slaveId, frameworkId, executorId, local, workDirectory, checkpoint, recoveryTimeout, &mutex, latch); spawn(process); return status = DRIVER_RUNNING; } } |
启动另一个线程ExecutorProcess,它的构造函数如下,注册了很多消息处理函数。
ExecutorProcess(const UPID& _slave, MesosExecutorDriver* _driver, Executor* _executor, const SlaveID& _slaveId, const FrameworkID& _frameworkId, const ExecutorID& _executorId, bool _local, const string& _directory, bool _checkpoint, Duration _recoveryTimeout, std::recursive_mutex* _mutex, Latch* _latch) : ProcessBase(ID::generate("executor")), slave(_slave), driver(_driver), executor(_executor), slaveId(_slaveId), frameworkId(_frameworkId), executorId(_executorId), connected(false), connection(UUID::random()), local(_local), aborted(false), mutex(_mutex), latch(_latch), directory(_directory), checkpoint(_checkpoint), recoveryTimeout(_recoveryTimeout) { LOG(INFO) << "Version: " << MESOS_VERSION; install<ExecutorRegisteredMessage>( &ExecutorProcess::registered, &ExecutorRegisteredMessage::executor_info, &ExecutorRegisteredMessage::framework_id, &ExecutorRegisteredMessage::framework_info, &ExecutorRegisteredMessage::slave_id, &ExecutorRegisteredMessage::slave_info); install<ExecutorReregisteredMessage>( &ExecutorProcess::reregistered, &ExecutorReregisteredMessage::slave_id, &ExecutorReregisteredMessage::slave_info); install<ReconnectExecutorMessage>( &ExecutorProcess::reconnect, &ReconnectExecutorMessage::slave_id); install<RunTaskMessage>( &ExecutorProcess::runTask, &RunTaskMessage::task); install<KillTaskMessage>( &ExecutorProcess::killTask, &KillTaskMessage::task_id); install<StatusUpdateAcknowledgementMessage>( &ExecutorProcess::statusUpdateAcknowledgement, &StatusUpdateAcknowledgementMessage::slave_id, &StatusUpdateAcknowledgementMessage::framework_id, &StatusUpdateAcknowledgementMessage::task_id, &StatusUpdateAcknowledgementMessage::uuid); install<FrameworkToExecutorMessage>( &ExecutorProcess::frameworkMessage, &FrameworkToExecutorMessage::slave_id, &FrameworkToExecutorMessage::framework_id, &FrameworkToExecutorMessage::executor_id, &FrameworkToExecutorMessage::data); install<ShutdownExecutorMessage>( &ExecutorProcess::shutdown); } |
在ExecutorProcess的initiailize的函数中,向mesos-slave发送消息进行注册。
virtual void initialize() { VLOG(1) << "Executor started at: " << self() << " with pid " << getpid(); link(slave); // Register with slave. RegisterExecutorMessage message; message.mutable_framework_id()->MergeFrom(frameworkId); message.mutable_executor_id()->MergeFrom(executorId); send(slave, message); } |
当Mesos-slave向TestExecutor发送RunTaskMessage消息的时候,ExecutorProcess调用runTask函数。
void runTask(const TaskInfo& task) { if (aborted.load()) { VLOG(1) << "Ignoring run task message for task " << task.task_id() << " because the driver is aborted!"; return; } CHECK(!tasks.contains(task.task_id())) << "Unexpected duplicate task " << task.task_id(); tasks[task.task_id()] = task; VLOG(1) << "Executor asked to run task '" << task.task_id() << "'"; Stopwatch stopwatch; if (FLAGS_v >= 1) { stopwatch.start(); } executor->launchTask(driver, task); VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed(); } |
最终调用executor的launchTask
在src/examples/test_executor.cpp中
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task) { cout << "Starting task " << task.task_id().value() << endl; TaskStatus status; status.mutable_task_id()->MergeFrom(task.task_id()); status.set_state(TASK_RUNNING); driver->sendStatusUpdate(status); // This is where one would perform the requested task. cout << "Finishing task " << task.task_id().value() << endl; status.mutable_task_id()->MergeFrom(task.task_id()); status.set_state(TASK_FINISHED); driver->sendStatusUpdate(status); } |
相关文章推荐
- Mesos源码分析(15): Test Executor的运行
- Mesos源码分析(16): mesos-docker-executor的运行
- Mesos源码分析(16): mesos-docker-executor的运行
- Mesos源码分析(13): MesosContainerier运行一个Task
- Mesos源码分析(14): DockerContainerier运行一个Task
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- spark 1.6.0 core源码分析7 Spark executor的运行
- Mesos源码分析(13): MesosContainerier运行一个Task
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- spark core源码分析7 Executor的运行
- 蔡军生先生第二人生的源码分析(七十六)判断程序运行多个实例
- g723源码详细分析-15-静音检测
- 分析CSLA.Net 4.* 开源框架的源码,深入理解框架内部运行机制
- JSP 编译和运行过程与JSP源码简单分析
- Cherokee HTTP Server源码分析(一) – cherokee运行模型
- Test recorder的源码分析(1)
- hbase源码分析.客户端.预备知识.ExecutorService
- 开源工作流Fireflow源码分析之运行流程一
- Test recorder的源码分析(4)
- Mangos源码分析(15):游戏对象的实现