您的位置:首页 > 其它

Mesos源码分析(15): Test Executor的运行

2016-08-02 00:23 232 查看
Test Executor的代码在src/examples/test_executor.cpp中

 

int main(int argc, char** argv)

{

  TestExecutor executor;

  MesosExecutorDriver driver(&executor);

  return driver.run() == DRIVER_STOPPED ? 0 : 1;

}
 

Executor的运行主要依赖于MesosExecutorDriver作为封装,和mesos-slave进行通信。

 

MesosExecutorDriver的实现在src/exec/exec.cpp中

 

Status MesosExecutorDriver::run()

{

  Status status = start();

  return status != DRIVER_RUNNING ? status : join();

}
 

Status MesosExecutorDriver::start()

{

  synchronized (mutex) {

    if (status != DRIVER_NOT_STARTED) {

      return status;

    }

 
    // Set stream buffering mode to flush on newlines so that we

    // capture logs from user processes even when output is redirected

    // to a file.

    setvbuf(stdout, 0, _IOLBF, 0);

    setvbuf(stderr, 0, _IOLBF, 0);

 
    bool local;

 
    UPID slave;

    SlaveID slaveId;

    FrameworkID frameworkId;

    ExecutorID executorId;

    string workDirectory;

    bool checkpoint;

 
    Option<string> value;

    std::istringstream iss;

 
    // Check if this is local (for example, for testing).

    local = os::getenv("MESOS_LOCAL").isSome();

 
    // Get slave PID from environment.

    value = os::getenv("MESOS_SLAVE_PID");

    if (value.isNone()) {

      EXIT(EXIT_FAILURE)

        << "Expecting 'MESOS_SLAVE_PID' to be set in the environment";

    }

 
    slave = UPID(value.get());

    CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";

 
    // Get slave ID from environment.

    value = os::getenv("MESOS_SLAVE_ID");

    if (value.isNone()) {

      EXIT(EXIT_FAILURE)

        << "Expecting 'MESOS_SLAVE_ID' to be set in the environment";

    }

    slaveId.set_value(value.get());

 
    // Get framework ID from environment.

    value = os::getenv("MESOS_FRAMEWORK_ID");

    if (value.isNone()) {

      EXIT(EXIT_FAILURE)

        << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";

    }

    frameworkId.set_value(value.get());

 
    // Get executor ID from environment.

    value = os::getenv("MESOS_EXECUTOR_ID");

    if (value.isNone()) {

      EXIT(EXIT_FAILURE)

        << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";

    }

    executorId.set_value(value.get());

 
    // Get working directory from environment.

    value = os::getenv("MESOS_DIRECTORY");

    if (value.isNone()) {

      EXIT(EXIT_FAILURE)

        << "Expecting 'MESOS_DIRECTORY' to be set in the environment";

    }

    workDirectory = value.get();

 
    // Get checkpointing status from environment.

    value = os::getenv("MESOS_CHECKPOINT");

    checkpoint = value.isSome() && value.get() == "1";

 
    Duration recoveryTimeout = RECOVERY_TIMEOUT;

 
    // Get the recovery timeout if checkpointing is enabled.

    if (checkpoint) {

      value = os::getenv("MESOS_RECOVERY_TIMEOUT");

 
      if (value.isSome()) {

        Try<Duration> _recoveryTimeout = Duration::parse(value.get());

 
        if (_recoveryTimeout.isError()) {

          EXIT(EXIT_FAILURE)

            << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "

            << _recoveryTimeout.error();

        }

 
        recoveryTimeout = _recoveryTimeout.get();

      }

    }

 
    CHECK(process == NULL);

 
    process = new ExecutorProcess(

        slave,

        this,

        executor,

        slaveId,

        frameworkId,

        executorId,

        local,

        workDirectory,

        checkpoint,

        recoveryTimeout,

        &mutex,

        latch);

 
    spawn(process);

 
    return status = DRIVER_RUNNING;

  }

}
 

启动另一个线程ExecutorProcess,它的构造函数如下,注册了很多消息处理函数。

 

ExecutorProcess(const UPID& _slave,

                MesosExecutorDriver* _driver,

                Executor* _executor,

                const SlaveID& _slaveId,

                const FrameworkID& _frameworkId,

                const ExecutorID& _executorId,

                bool _local,

                const
string& _directory,

                bool _checkpoint,

                Duration _recoveryTimeout,

                std::recursive_mutex* _mutex,

                Latch* _latch)

  : ProcessBase(ID::generate("executor")),

    slave(_slave),

    driver(_driver),

    executor(_executor),

    slaveId(_slaveId),

    frameworkId(_frameworkId),

    executorId(_executorId),

    connected(false),

    connection(UUID::random()),

    local(_local),

    aborted(false),

    mutex(_mutex),

    latch(_latch),

    directory(_directory),

    checkpoint(_checkpoint),

    recoveryTimeout(_recoveryTimeout)

{

  LOG(INFO) << "Version: " << MESOS_VERSION;

 
  install<ExecutorRegisteredMessage>(

      &ExecutorProcess::registered,

      &ExecutorRegisteredMessage::executor_info,

      &ExecutorRegisteredMessage::framework_id,

      &ExecutorRegisteredMessage::framework_info,

      &ExecutorRegisteredMessage::slave_id,

      &ExecutorRegisteredMessage::slave_info);

 
  install<ExecutorReregisteredMessage>(

      &ExecutorProcess::reregistered,

      &ExecutorReregisteredMessage::slave_id,

      &ExecutorReregisteredMessage::slave_info);

 
  install<ReconnectExecutorMessage>(

      &ExecutorProcess::reconnect,

      &ReconnectExecutorMessage::slave_id);

 
  install<RunTaskMessage>(

      &ExecutorProcess::runTask,

      &RunTaskMessage::task);

 
  install<KillTaskMessage>(

      &ExecutorProcess::killTask,

      &KillTaskMessage::task_id);

 
  install<StatusUpdateAcknowledgementMessage>(

      &ExecutorProcess::statusUpdateAcknowledgement,

      &StatusUpdateAcknowledgementMessage::slave_id,

      &StatusUpdateAcknowledgementMessage::framework_id,

      &StatusUpdateAcknowledgementMessage::task_id,

      &StatusUpdateAcknowledgementMessage::uuid);

 
  install<FrameworkToExecutorMessage>(

      &ExecutorProcess::frameworkMessage,

      &FrameworkToExecutorMessage::slave_id,

      &FrameworkToExecutorMessage::framework_id,

      &FrameworkToExecutorMessage::executor_id,

      &FrameworkToExecutorMessage::data);

 
  install<ShutdownExecutorMessage>(

      &ExecutorProcess::shutdown);

}
 

在ExecutorProcess的initiailize的函数中,向mesos-slave发送消息进行注册。

virtual
void initialize()

{

  VLOG(1) << "Executor started at: " << self()

          << " with pid " << getpid();

 
  link(slave);

 
  // Register with slave.

  RegisterExecutorMessage message;

  message.mutable_framework_id()->MergeFrom(frameworkId);

  message.mutable_executor_id()->MergeFrom(executorId);

  send(slave, message);

}
 

当Mesos-slave向TestExecutor发送RunTaskMessage消息的时候,ExecutorProcess调用runTask函数。

void runTask(const TaskInfo& task)

{

  if (aborted.load()) {

    VLOG(1) << "Ignoring run task message for task " << task.task_id()

            << " because the driver is aborted!";

    return;

  }

 
  CHECK(!tasks.contains(task.task_id()))

    << "Unexpected duplicate task " << task.task_id();

 
  tasks[task.task_id()] = task;

 
  VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";

 
  Stopwatch stopwatch;

  if (FLAGS_v >= 1) {

    stopwatch.start();

  }

 
  executor->launchTask(driver, task);

 
  VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed();

}
 

最终调用executor的launchTask

在src/examples/test_executor.cpp中

virtual
void launchTask(ExecutorDriver* driver, const TaskInfo& task)

{

  cout << "Starting task " << task.task_id().value() << endl;

 
  TaskStatus status;

  status.mutable_task_id()->MergeFrom(task.task_id());

  status.set_state(TASK_RUNNING);

 
  driver->sendStatusUpdate(status);

 
  // This is where one would perform the requested task.

 
  cout << "Finishing task " << task.task_id().value() << endl;

 
  status.mutable_task_id()->MergeFrom(task.task_id());

  status.set_state(TASK_FINISHED);

 
  driver->sendStatusUpdate(status);

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: