Apache Mesos 底层网络通信库 libprocess 分析
2016-07-27 22:07
288 查看
背景
LibProcess 是一套基于 Socket 实现的通信协议库,它支持 Protocal Buffer,通过两者结合,可实现一套很高效的基于消息传递的通信协议库,而 Mesos 底层通信协议正是采用了该库。
简单剖析
有两个服务 Master 和 Slave,Slave 周期性向 Master 汇报自己的进度,而 Master 则不定期地向 Slave 下达任务,采用 LibProcess 实现如下:
(1)Master 类设计
步骤 1 继承 ProtobufProcess 类。让 Master 类继承 LibProcess 中的 ProtobufProcess 类,该类实际上维护了一个 socket server,该 server 可以处理实现注册好的各种 Protocal Buffer 定义的 Message
从该类中可以看出 Socket 的影子,原生的 Socket 的一些基本的通讯函数模型都可以找到。
步骤 2 初始化资源分配模块 allocator->initialize 如下:
步骤 3 注册消息处理器。 在初始化函数 initialize() 中(使用 install 函数)会注册各种类型的 Message 类型(比如:SubmitSchedulerRequest, RegisterFrameworkMessage 等等)消息(需使用 Protocal buffer 定义)的消息处理器, 各种消息处理器对应的响应函数也不一样, 一般一一对应。 这样, Master 内部的 socket server 会监听来自外部的各种消息包, 一旦发现相应 Message 类型的消息包,则会调用相应的函数进行处理。
步骤 4 安装所有的 HTTP router,如下例举部分。
步骤 5 编写 main 函数启动 Master。
(2) Slave 类设计
Slave 设计与 Master 类似,具体如下:
LibProcess 事件驱动模型
LibProcess 采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个 socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。整个过程如下图所示:
)
Libprocess 用法
(1)Process 类
可通过继承该类,实现一个服务。该类主要包含以下几个方法:
1) Install 函数
注册名为 name 的消息处理器。
2) Send 函数
向参数 to 标识的服务发送名为 name 的消息,该消息中包含数据 data,长度为 length。
(2)使用 ProtobufProcess 类
ProtobufProcess 类实现了 Process 类,与 Protocal buffer 紧密结合,该类主要有以下几个方法:
1) Install 函数
使用 ProtobufProcess 类,可以很容易创建一个处理 Protocal buffer 类型消息的消息处理器,可通过 install 函数注册各种消息处理器,比如如果一个新消息中有 2 个字段(x1,x2),则可这样注册该消息:
2) send 函数
定义如下:
它的功能是向参数 to 标识的服务发送 message 消息,其中 to 是 UPID 类型,它包含了服务 ip 和端口号。
3) reply 函数
定义如下:
功能:向最近发送消息的服务返回 message。
(3)全局函数
1) dispatch
存在多种定义方式,一种方式如下:
将函数分配给 pid 进程(服务)执行。注意,函数分配给进程后,不一定会马上执行完后,需要需要等待执行完成,可以结合 wait 函数使用:
2) spawn
定义如下:
启动进程(服务)process,参数 manage 表示是否启用垃圾收集机制。
LibProcess 是一套基于 Socket 实现的通信协议库,它支持 Protocal Buffer,通过两者结合,可实现一套很高效的基于消息传递的通信协议库,而 Mesos 底层通信协议正是采用了该库。
简单剖析
有两个服务 Master 和 Slave,Slave 周期性向 Master 汇报自己的进度,而 Master 则不定期地向 Slave 下达任务,采用 LibProcess 实现如下:
(1)Master 类设计
步骤 1 继承 ProtobufProcess 类。让 Master 类继承 LibProcess 中的 ProtobufProcess 类,该类实际上维护了一个 socket server,该 server 可以处理实现注册好的各种 Protocal Buffer 定义的 Message
class Master : public ProtobufProcess<Master> { //…… }
从该类中可以看出 Socket 的影子,原生的 Socket 的一些基本的通讯函数模型都可以找到。
步骤 2 初始化资源分配模块 allocator->initialize 如下:
allocator->initialize( flags.allocation_interval, defer(self(), &Master::offer, lambda::_1, lambda::_2), defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2), weights);
步骤 3 注册消息处理器。 在初始化函数 initialize() 中(使用 install 函数)会注册各种类型的 Message 类型(比如:SubmitSchedulerRequest, RegisterFrameworkMessage 等等)消息(需使用 Protocal buffer 定义)的消息处理器, 各种消息处理器对应的响应函数也不一样, 一般一一对应。 这样, Master 内部的 socket server 会监听来自外部的各种消息包, 一旦发现相应 Message 类型的消息包,则会调用相应的函数进行处理。
// Install handler functions for certain messages. install<SubmitSchedulerRequest>( &Master::submitScheduler, &SubmitSchedulerRequest::name); install<RegisterFrameworkMessage>( &Master::registerFramework, &RegisterFrameworkMessage::framework); install<ReregisterFrameworkMessage>( &Master::reregisterFramework, &ReregisterFrameworkMessage::framework, &ReregisterFrameworkMessage::failover);
步骤 4 安装所有的 HTTP router,如下例举部分。
route("/create-volumes", DEFAULT_HTTP_AUTHENTICATION_REALM, Http::CREATE_VOLUMES_HELP(), [this](const process::http::Request& request, const Option<string>& principal) { Http::log(request); return http.createVolumes(request, principal); }); route("/destroy-volumes", DEFAULT_HTTP_AUTHENTICATION_REALM, Http::DESTROY_VOLUMES_HELP(), [this](const process::http::Request& request, const Option<string>& principal) { Http::log(request); return http.destroyVolumes(request, principal); });
步骤 5 编写 main 函数启动 Master。
int main(int argc, char** argv) { process::initialize(“master”); // 初始化一个名为 master 的进程 Master* master = new Master(); process::spawn(master); // 启动 master,实际上是一个 socket server process::wait(master->self()); delete master; return 0; }
(2) Slave 类设计
Slave 设计与 Master 类似,具体如下:
class Slave : public ProtobufProcess<Slave> { Slave(): ProcessBase(“slave”) {} void initialize() [ install<LaunchTasksMessage>( &Master::launchTasks, &LaunchTasksMessage::id, &LaunchTasksMessage::tasks); } void launchTasks(const int id, const vector<TaskInfo>& tasks) { for (int i = 0; i < tasks.size(); i++) { //launch tasks[i]; } } }
LibProcess 事件驱动模型
LibProcess 采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个 socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。整个过程如下图所示:
)
Libprocess 用法
(1)Process 类
可通过继承该类,实现一个服务。该类主要包含以下几个方法:
1) Install 函数
void install( const std::string& name, const MessageHandler& handler)
注册名为 name 的消息处理器。
2) Send 函数
void send( const UPID& to, const std::string& name, const char* data = NULL, size_t length = 0)
向参数 to 标识的服务发送名为 name 的消息,该消息中包含数据 data,长度为 length。
(2)使用 ProtobufProcess 类
ProtobufProcess 类实现了 Process 类,与 Protocal buffer 紧密结合,该类主要有以下几个方法:
1) Install 函数
使用 ProtobufProcess 类,可以很容易创建一个处理 Protocal buffer 类型消息的消息处理器,可通过 install 函数注册各种消息处理器,比如如果一个新消息中有 2 个字段(x1,x2),则可这样注册该消息:
install<XXXMessage>( // 使用 install 函数进行注册 &messageHandler, &x1,&x2); } XXXMessage 的 protocal buffer 定义如下: message XXXMessage { required X x1 = 1; required Y x2 = 2; }
2) send 函数
定义如下:
void send(const process::UPID& to, const google::protobuf::Message& message);
它的功能是向参数 to 标识的服务发送 message 消息,其中 to 是 UPID 类型,它包含了服务 ip 和端口号。
3) reply 函数
定义如下:
void reply(const google::protobuf::Message& message);
功能:向最近发送消息的服务返回 message。
(3)全局函数
1) dispatch
存在多种定义方式,一种方式如下:
template <typename R, typename T> Future<R> dispatch( const PID<T>& pid, Future<R> (T::*method)(void))
将函数分配给 pid 进程(服务)执行。注意,函数分配给进程后,不一定会马上执行完后,需要需要等待执行完成,可以结合 wait 函数使用:
Future<bool> added = dispatch(slavesManager, add); added.await(); if (!added.isReady() || !added.get()) { ….. }
2) spawn
定义如下:
UPID spawn(ProcessBase* process, bool manage=false);
启动进程(服务)process,参数 manage 表示是否启用垃圾收集机制。
相关文章推荐
- mesos + marathon + docker部署
- 【转】Singularity:基于Apache Mesos构建的服务部署和作业调度平台
- Mesos 配置(译)
- 在生产环境中使用Apache Mesos和Docker
- 译 | 像使用一台主机一样管理集群
- 持续交付的Mesos与Docker导入篇
- Mesos Framework Failover
- Mesos Task Killed by OOM Killer
- 基于Mesos和Docker的分布式计算平台
- Apache Mesos make errors
- mesos/apache aurora实战-安装手记
- Mesos+Kubernetes集成安装部署
- Mesosphere DCOS快速部署手册
- 安装mesos,报错:cannot find libsasl2【解决】
- 在MacOs上配置Mesos+Zookeeper
- 在CentOs7上配置Mesos+Zookeeper
- Ubuntu Server14.04下MESOS部署
- Spark的三种分布式部署模式:Standalone, Mesos,Yarn
- mesos+marathon环境简单搭建
- Mesos实战总结