您的位置:首页 > 运维架构 > 网站架构

Cloudera Hadoop架构及Hadoop Common实现原理

2016-06-11 18:16 531 查看
目录:

1.Cloudera Hadoop架构

2.Hadoop RPC

        2.1Hadoop IPC源码解析

        2.2Hadoop IPC调用过程详解

1.Cloudera Hadoop架构



cloudera整体架构图



cloudera端口占用图
默认情况下,Agent每隔15秒向Server发送一次心跳交换。Agent向Server报告本机器上的活动及状态,Server向Agent响应其应该执行的动作。
Cloudera Manager Server and Agent使用一个开源的进程管理工具supervisord来进行进程的启动、停止、监控。
Cloudera包括CDH(Cloudera Hadoop安装包)、Cloudera Mamager(管理所有服务)、Cloudera Navigator(管理和加密HDFS数据)。
Cloudera安装步骤:JDK,Cloudera Manager Server和Agent,数据库相关软件,CDH,Managed servicce。
2.Hadoop RPC
RPC原理:

Client将参数和被调用过程的名字(哪台服务器上的哪个过程)打包并序列化(Hadoop自己的序列化机制)成为一个消息,发送到Server端,然后阻塞自身。
Server接收并解包,然后调用相应过程,最后将返回值打包并序列化发送给Client,Client继续往下执行。
2.1Hadoop IPC源码解析

Hadoop没有使用Java RMI API,而是自己实现了一套节点进程间通信机制IPC(Inter-Process Communication)。
Hadoop IPC核心:NIO,动态代理。

1.服务端接口

package com.baidu;

import org.apache.hadoop.ipc.VersionedProtocol;

//服务端接口需要继承VersionedProtocol
public interface IPCQueryStatus extends VersionedProtocol{
public String getFileStatus(String name);
}
2.服务端接口实现类(NameNode)
package com.baidu;

import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;

public class IPCQueryStatusImpl implements IPCQueryStatus{
/**
* protocol:协议接口对应的接口名称
* clientVersion:客户端期望的协议接口服务的版本号
* 方法返回服务器端的接口实现的版本号。该方法主要用于通信前的版本检查。
*/
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return IPCQueryServer.IPC_VER;
}

@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
return null;
}

@Override
public String getFileStatus(String name) {
return new String("FileStatusOK");
}
}
注:NameNode接口服务类实现了多个IPC接口,所以需要根据IPC客户端发过来的接口名称protocol,确定并返回相应的服务器端版本号。
3.IPC服务器

package com.baidu;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class IPCQueryServer {
public static final long IPC_VER = 123L;
public static final int IPC_PORT = 8899;
public static void main(String[] args) {
IPCQueryStatusImpl queryService = new IPCQueryStatusImpl();
//获得依一个提供IPCQueryStatusImpl接口功能的IPC服务器,IPCQueryStatusImpl可以实现多个接口,如NameNode类,客户端可以通过接口名称protocol调用相应的过程
//第四个参数1表示 Server端的Handler实例(线程)的个数为 1,第五个参数false表示 不 打开调用方法日志
Server server = RPC.getServer(queryService,"127.0.0.1",IPC_PORT,1,false,new Configuration());
//启动服务
server.start();
System.out.println("Server ready, press any key to stop");
System.in.read();
//停止服务
server.stop();
System.out.println("Server stopped");
}
}



4.IPC客户端
package com.baidu;

import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class IPCQueryClient {
public static void main(String[] args) throws IOException {
InetSocketAddress addr = new InetSocketAddress("localhost",IPCQueryServer.IPC_PORT);
//动态代理,获得IPCQueryStatus的一个代理类对象,过程中会执行getProtocolVersion方法检查版本号
IPCQueryStatus query = RPC.getProxy(
IPCQueryStatus.class,//需要调用服务端的具体的协议接口
IPCQueryServer.IPC_VER,//期望的版本号
addr,new Configuration());
//通过代理对象调用远程方法
String status = query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");
System.out.println(status);
RPC.stopProxy(query);
}
}

2.2Hadoop IPC调用过程详解

IPC客户端通过动态代理对象调用远程方法时,会生成一个RPC.Invocation(实现了Writable接口)实例,该实例封装了被调用方法名和参数列表。
通过NIO与IPC Server建立TCP连接,并将序列化后的数据发送给IPC Server。
IPC Server端的Listener工作线程运行NIO选择器循环,监听OP_READ和OP_ACCESS事件,并在Listener.doRead()方法中读取数据,并反序列化,通过得到的原始消息,构造服务端call对象,并放入CallQueue阻塞队列中。
IPC Server端的Handler工作线程循环从阻塞队列CallQueue中消费,并执行此调用请求(通过在RPC.getServer方法中配置Handler处理线程数),最后将返回结果序列化。
IPC Server端的Responser工作线程运行NIO选择器循环,监听OP_WRITE事件,并将Handler线程的序列化结果发送给IPC Client。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Cloudera Hadoop