您的位置:首页 > 其它

dubbo源码学习笔记----RPC

2018-01-13 00:00 411 查看

RpcContext

整个RpcContext通过ThreadLocal维持。

public class RpcContext {

private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private final Map<String, String> attachments = new HashMap<String, String>();
private final Map<String, Object> values = new HashMap<String, Object>();
private Future<?> future;

private List<URL> urls;

private URL url;

private String methodName;

private Class<?>[] parameterTypes;

private Object[] arguments;

private InetSocketAddress localAddress;

private InetSocketAddress remoteAddress;
@Deprecated
private List<Invoker<?>> invokers;
@Deprecated
private Invoker<?> invoker;
@Deprecated
private Invocation invocation;

通过isProviderSide,isConsumerSide标记Context属于服务提供方还是服务使用方。

通过Future从Context中获取异步调用结果:

public <T> Future<T> asyncCall(Callable<T> callable) {
try {
try {
setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
final T o = callable.call();
//local invoke will return directly
if (o != null) {
FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
public T call() throws Exception {
return o;
}
});
f.run();
return f;
} else {

}
} catch (Exception e) {
throw new RpcException(e);
} finally {
removeAttachment(Constants.ASYNC_KEY);
}
} catch (final RpcException e) {
return new Future<T>() {
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

public boolean isCancelled() {
return false;
}

public boolean isDone() {
return true;
}

public T get() throws InterruptedException, ExecutionException {
throw new ExecutionException(e.getCause());
}

public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
return get();
}
};
}
return ((Future<T>) getContext().getFuture());
}

服务调用

public class RpcInvocation implements Invocation, Serializable {

private static final long serialVersionUID = -4355285085441097045L;

private String methodName;

private Class<?>[] parameterTypes;

private Object[] arguments;

private Map<String, String> attachments;

private transient Invoker<?> invoker;

代理调用

public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

// doInvoke具体实现交由各自实现类实现。

// Javassist实现
public class JavassistProxyFactory extends AbstractProxyFactory {

@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

}

// JDK实现
public class JdkProxyFactory extends AbstractProxyFactory {

@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}

}

RedisProtocol

比较有意思,是通过对invocation.Arguments作为key放到redis中取值,将值反序列化出来作为执行结果。

resource = jedisPool.getResource();

if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return new RpcResult();
}
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
return new RpcResult(oin.readObject());

Filter

对于每次Context中的Invoke代理执行时,可以将定义的Filter注入,对代理调用进行拦截。

public void testFilter() throws Exception {
MonitorFilter monitorFilter = new MonitorFilter();
monitorFilter.setMonitorFactory(monitorFactory);
Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
monitorFilter.invoke(serviceInvoker, invocation);

// intercepting invocation
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // record start timestamp
getConcurrent(invoker, invocation).incrementAndGet(); // count up
try {
Result result = invoker.invoke(invocation); // proceed invocation chain
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
} else {
return invoker.invoke(invocation);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Dubbo