hadoop的源码分析之RPC(Remote Procedure Call Protocol)
2013-01-12 16:49
471 查看
理解这个RPC是不是的先去理解哈动态代理 好多invoke,还有Socket网络编程
先来张eclipse下IPC源码图:
先来看看RPC.java,既然是动态代理,自然会想到Invoke()方法了,先来看看RPC中的Invoker中的invoke()方法
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
private Client client;
private boolean isClosed = false;
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
method.getDeclaringClass(), ticket);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
}
可以看出动态代理里的invoke()方法其实是RPC里的invocation对方法名,参数做了封装,可以看invocation类的
封装后给client调用,这时就转向client调用了
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
}
/** The name of the method invoked. */
public String getMethodName() { return methodName; }
/** The parameter classes. */
public Class[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
methodName = UTF8.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(parameters[i]);
}
buffer.append(")");
return buffer.toString();
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
}
先来张eclipse下IPC源码图:
先来看看RPC.java,既然是动态代理,自然会想到Invoke()方法了,先来看看RPC中的Invoker中的invoke()方法
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
private Client client;
private boolean isClosed = false;
public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
this.address = address;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
method.getDeclaringClass(), ticket);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
}
可以看出动态代理里的invoke()方法其实是RPC里的invocation对方法名,参数做了封装,可以看invocation类的
封装后给client调用,这时就转向client调用了
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
}
/** The name of the method invoked. */
public String getMethodName() { return methodName; }
/** The parameter classes. */
public Class[] getParameterClasses() { return parameterClasses; }
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
methodName = UTF8.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
if (i != 0)
buffer.append(", ");
buffer.append(parameters[i]);
}
buffer.append(")");
return buffer.toString();
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
}
相关文章推荐
- RPC(Remote Procedure Call Protocol,远程过程调用协议),是什么?不就是webservice吗?
- Hadoop源码分析之二(RPC机制之Call处理)
- Hadoop源码分析之二(RPC机制之Call处理)
- RPC:远程过程调用协议(Remote Procedure Call protocol)
- rpc(Remote Procedure Call Protocol)简易代码实现
- RPC调用流程(Remote Procedure Call Protocol)
- RPC(Remote Procedure Call Protocol)——远程过程调用协议
- RPC(Remote Procedure Call Protocol)——远程过程调用协议 学习总结
- RPC(Remote Procedure Call Protocol)——远程过程调用协议
- RPC(Remote Procedure Call Protocol)
- RPC(RemoteProcedureCallProtocol)
- RPC:远程过程调用协议(Remote Procedure Call protocol)
- RPC(Remote Procedure Call Protocol)——远程过程调用协议
- 远程过程调用协议(Remote Procedure Call Protocol,RPC)
- Hadoop RPC的机制分析和源码解读
- 远程过程调用(Remote Procedure Call,RPC)
- 细水长流Hadoop源码分析(3)RPC Server初始化启动过程
- hadoop的RPC机制源码分析
- Hadoop源码分析之一(RPC机制之Server)