您的位置:首页 > 运维架构

hadoop的源码分析之RPC(Remote Procedure Call Protocol)

2013-01-12 16:49 471 查看
理解这个RPC是不是的先去理解哈动态代理 好多invoke,还有Socket网络编程

先来张eclipse下IPC源码图:




先来看看RPC.java,既然是动态代理,自然会想到Invoke()方法了,先来看看RPC中的Invoker中的invoke()方法

private static class Invoker implements InvocationHandler {

private InetSocketAddress address;

private UserGroupInformation ticket;

private Client client;

private boolean isClosed = false;

public Invoker(InetSocketAddress address, UserGroupInformation ticket,

Configuration conf, SocketFactory factory) {

this.address = address;

this.ticket = ticket;

this.client = CLIENTS.getClient(conf, factory);

}

public Object invoke(Object proxy, Method method, Object[] args)

throws Throwable {

final boolean logDebug = LOG.isDebugEnabled();

long startTime = 0;

if (logDebug) {

startTime = System.currentTimeMillis();

}

ObjectWritable value = (ObjectWritable)

client.call(new Invocation(method, args), address,

method.getDeclaringClass(), ticket);


if (logDebug) {

long callTime = System.currentTimeMillis() - startTime;

LOG.debug("Call: " + method.getName() + " " + callTime);

}

return value.get();

}



/* close the IPC client that's responsible for this invoker's RPCs */

synchronized private void close() {

if (!isClosed) {

isClosed = true;

CLIENTS.stopClient(client);

}

}

}

可以看出动态代理里的invoke()方法其实是RPC里的invocation对方法名,参数做了封装,可以看invocation类的

封装后给client调用,这时就转向client调用了

private static class Invocation implements Writable, Configurable {

private String methodName;

private Class[] parameterClasses;

private Object[] parameters;

private Configuration conf;

public Invocation() {}

public Invocation(Method method, Object[] parameters) {

this.methodName = method.getName();

this.parameterClasses = method.getParameterTypes();

this.parameters = parameters;

}

/** The name of the method invoked. */

public String getMethodName() { return methodName; }

/** The parameter classes. */

public Class[] getParameterClasses() { return parameterClasses; }

/** The parameter instances. */

public Object[] getParameters() { return parameters; }

public void readFields(DataInput in) throws IOException {

methodName = UTF8.readString(in);

parameters = new Object[in.readInt()];

parameterClasses = new Class[parameters.length];

ObjectWritable objectWritable = new ObjectWritable();

for (int i = 0; i < parameters.length; i++) {

parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);

parameterClasses[i] = objectWritable.getDeclaredClass();

}

}

public void write(DataOutput out) throws IOException {

UTF8.writeString(out, methodName);

out.writeInt(parameterClasses.length);

for (int i = 0; i < parameterClasses.length; i++) {

ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],

conf);

}

}

public String toString() {

StringBuffer buffer = new StringBuffer();

buffer.append(methodName);

buffer.append("(");

for (int i = 0; i < parameters.length; i++) {

if (i != 0)

buffer.append(", ");

buffer.append(parameters[i]);

}

buffer.append(")");

return buffer.toString();

}

public void setConf(Configuration conf) {

this.conf = conf;

}

public Configuration getConf() {

return this.conf;

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: