您的位置:首页 > 理论基础 > 计算机网络

Apache Mesos 底层网络通信库 libprocess 分析

2016-07-27 22:07 288 查看
背景

LibProcess 是一套基于 Socket 实现的通信协议库,它支持 Protocal Buffer,通过两者结合,可实现一套很高效的基于消息传递的通信协议库,而 Mesos 底层通信协议正是采用了该库。

简单剖析

有两个服务 Master 和 Slave,Slave 周期性向 Master 汇报自己的进度,而 Master 则不定期地向 Slave 下达任务,采用 LibProcess 实现如下:

(1)Master 类设计

步骤 1 继承 ProtobufProcess 类。让 Master 类继承 LibProcess 中的 ProtobufProcess 类,该类实际上维护了一个 socket server,该 server 可以处理实现注册好的各种 Protocal Buffer 定义的 Message

class Master : public ProtobufProcess<Master> {

//……

}


从该类中可以看出 Socket 的影子,原生的 Socket 的一些基本的通讯函数模型都可以找到。

步骤 2 初始化资源分配模块 allocator->initialize 如下:

allocator->initialize(
flags.allocation_interval,
defer(self(), &Master::offer, lambda::_1, lambda::_2),
defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2),
weights);


步骤 3 注册消息处理器。 在初始化函数 initialize() 中(使用 install 函数)会注册各种类型的 Message 类型(比如:SubmitSchedulerRequest, RegisterFrameworkMessage 等等)消息(需使用 Protocal buffer 定义)的消息处理器, 各种消息处理器对应的响应函数也不一样, 一般一一对应。 这样, Master 内部的 socket server 会监听来自外部的各种消息包, 一旦发现相应 Message 类型的消息包,则会调用相应的函数进行处理。

// Install handler functions for certain messages.
install<SubmitSchedulerRequest>(
&Master::submitScheduler,
&SubmitSchedulerRequest::name);

install<RegisterFrameworkMessage>(
&Master::registerFramework,
&RegisterFrameworkMessage::framework);

install<ReregisterFrameworkMessage>(
&Master::reregisterFramework,
&ReregisterFrameworkMessage::framework,
&ReregisterFrameworkMessage::failover);


步骤 4 安装所有的 HTTP router,如下例举部分。

route("/create-volumes",
DEFAULT_HTTP_AUTHENTICATION_REALM,
Http::CREATE_VOLUMES_HELP(),
[this](const process::http::Request& request,
const Option<string>& principal) {
Http::log(request);
return http.createVolumes(request, principal);
});
route("/destroy-volumes",
DEFAULT_HTTP_AUTHENTICATION_REALM,
Http::DESTROY_VOLUMES_HELP(),
[this](const process::http::Request& request,
const Option<string>& principal) {
Http::log(request);
return http.destroyVolumes(request, principal);
});


步骤 5 编写 main 函数启动 Master。

int main(int argc, char** argv)

{

process::initialize(“master”); // 初始化一个名为 master 的进程

Master* master = new Master();

process::spawn(master); // 启动 master,实际上是一个 socket server

process::wait(master->self());

delete master;

return 0;

}


(2) Slave 类设计

Slave 设计与 Master 类似,具体如下:

class Slave : public ProtobufProcess<Slave>

{

Slave(): ProcessBase(“slave”) {}

void initialize() [

install<LaunchTasksMessage>(

&Master::launchTasks,

&LaunchTasksMessage::id,

&LaunchTasksMessage::tasks);

}

void launchTasks(const int id,

const vector<TaskInfo>& tasks) {

for (int i = 0; i < tasks.size(); i++) {

//launch tasks[i];

}

}

}


LibProcess 事件驱动模型

LibProcess 采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个 socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。整个过程如下图所示:


)

Libprocess 用法

(1)Process 类

可通过继承该类,实现一个服务。该类主要包含以下几个方法:

1) Install 函数

void install(

const std::string& name,

const MessageHandler& handler)


注册名为 name 的消息处理器。

2) Send 函数

void send(

const UPID& to,

const std::string& name,

const char* data = NULL,

size_t length = 0)


向参数 to 标识的服务发送名为 name 的消息,该消息中包含数据 data,长度为 length。

(2)使用 ProtobufProcess 类

ProtobufProcess 类实现了 Process 类,与 Protocal buffer 紧密结合,该类主要有以下几个方法:

1) Install 函数

使用 ProtobufProcess 类,可以很容易创建一个处理 Protocal buffer 类型消息的消息处理器,可通过 install 函数注册各种消息处理器,比如如果一个新消息中有 2 个字段(x1,x2),则可这样注册该消息:

install<XXXMessage>(  // 使用 install 函数进行注册

&messageHandler,

&x1,&x2);

}

XXXMessage 的 protocal buffer 定义如下:

message XXXMessage {

required X x1 = 1;

required Y x2 = 2;

}


2) send 函数

定义如下:

void send(const process::UPID& to,

const google::protobuf::Message& message);


它的功能是向参数 to 标识的服务发送 message 消息,其中 to 是 UPID 类型,它包含了服务 ip 和端口号。

3) reply 函数

定义如下:

void reply(const google::protobuf::Message& message);


功能:向最近发送消息的服务返回 message。

(3)全局函数

1) dispatch

存在多种定义方式,一种方式如下:

template <typename R, typename T>

Future<R> dispatch(

const PID<T>& pid,

Future<R> (T::*method)(void))


将函数分配给 pid 进程(服务)执行。注意,函数分配给进程后,不一定会马上执行完后,需要需要等待执行完成,可以结合 wait 函数使用:

Future<bool> added = dispatch(slavesManager, add);

added.await();

if (!added.isReady() || !added.get()) {

…..

}


2) spawn

定义如下:

UPID spawn(ProcessBase* process, bool manage=false);


启动进程(服务)process,参数 manage 表示是否启用垃圾收集机制。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mesos libprocess