您的位置:首页 > 运维架构

Hadoop的前世今生(3)——ipc中的Client(2)

2012-05-01 17:31 162 查看
接上一回,接下来我们分析ConnectionCuller类,再上一回我接触了一下这个类,它的任务是回收Connection的:

ConnectionCuller

这个类的内容显然少得可怜,这里直接贴出它唯一的一个方法:

public void run() {
while (running) {
try {
Thread.sleep(MIN_SLEEP_TIME);
} catch (InterruptedException ie) {
}
synchronized (connections) {
Iterator i = connections.values().iterator();
while (i.hasNext()) {
Connection c = (Connection) i.next();
if (c.isIdle()) {
// We don't actually close the socket here (i.e.,
// don't invoke
// the close() method). We leave that work to the
// response receiver
// thread. The reason for that is since we have
// taken a lock on the
// connections table object, we don't want to slow
// down the entire
// system if we happen to talk to a slow server.
i.remove();
synchronized (c) {
c.setCloseConnection();
c.notify();
}
}
}
}
}
}
这个类通过一个定时器控制自己的行为,每次它都去调用每一个Connection的isIdle方法,(前面也提到了属于Idle的可能:(1)inUse数为0,一定时间内没有响应)。然后一个比较重点的东西就是它的那段注释,说实话这帮大牛肯在代码里啰啰嗦嗦地写这么多话,无非就是怕自己独具匠心的考虑与设计大家看不懂或者没有注意到,所以大家一定不能错过。这段注释主要说代码将采取异步的方法关闭连接,而不是同步的原因。

关于这个异步的过程我就不多说了,用一个步骤列表展示:

(1)ConnectionCuller调用c.setCloseConnection方法

(2)在Connection在run方法中进入了waitForWork方法(还记得么),然后还回了false,导致代码跳出了死循环,进入finally的回收

(3)回收过程调用了close方法

最后说一下这个类的生命周期,它发生在Client创建的时候,那个时候它就作为一个守护线程开始运行了……然后没有然后了,一直运行到天荒地老……

考虑完了这个类,实际上对于单输入单输出的问题就解决了,于是我们来看多输入的问题:

ParallelCall

这个类继承自Call,所以刚才对Call的东西基本上都可以针对它,同时由于Call中的变量是package权限的,所以它们在这个类中都是可以直接使用的。所以下面直接看ParallelResults:

ParallelResults

它的成员变量如下所示:

private Writable[] values;
private int size;
private int count;
其实这个类写得也很简单,关于callComplete的代码也看得人一知半解,所以我们还是从上层开始,分析它们的生命周期吧。前面我们贴过了一个Client的call代码,下面我们分段把另一个call的代码贴出:

/**
* Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored.
*/
[译]并行调用一系列call。每一个参数都发送到指定的地址。不论所有的结果正确得到(或者说从Server端正确返回),或者超时,或者错误,收集到的结果都将以数组形式正常返回。对于超时或错误的call返回null。

是不是以下次觉得豁然开朗?这个就是这两个并行类的需求啊!接下来看代码:

ParallelResults results = new ParallelResults(params.length);


首先定义一个结果。

synchronized (results) {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection = getConnection(addresses[i]);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
LOG.info("Calling " + addresses[i] + " caught: "
+ StringUtils.stringifyException(e)); // log errors
results.size--; // wait for one fewer result
}
}
这一段是创建的过程,我们似乎明白了他们模式。那就是每一个ParallelCall作为一个Call,其实和普通的call没有功能上的区别,但是它要将结果统一返回给result。

try {
results.wait(timeout); // wait for all results
} catch (InterruptedException e) {
}

if (results.count == 0) {
throw new IOException("no responses");
} else {
return results.values;
}
这一段代码的wait方法和上面的方法不一样。我认为这个应该是这一版代码的一个问题,因为在前面的wait方法的doc翻译中,我们知道了wait可能会意外地被唤醒,正确地编程方法应该是while的循环方法,而这里我们看不出它的巧妙。这个问题我们会进一步跟踪,因为还会关注后面版本的代码。

这样,这个方法也完了,于是,整个的代码分析完了。这一版代码全篇不到600行,可是说清楚它真得不容易。它为我们展示了一个很好的客户端框架,最后我们总结一下其中的一些值得深思的地方:

(1)考虑网络的“不靠谱”。我们必须承认网络的不靠谱,所以所有有关网络的操作我们必须抱着最坏的打算去执行。这个代码里的典型例子就是一些初始化和释放的地方,由于我们将网络考虑成不靠谱,所以一些本来同步的方法改成了异步,一些需要同步的地方把它写成了不同步。

(2)push与pull的位置。最典型的例子就是Connection里面的read和write,这里read是Pull,write是push。push我们采用了多线程同步并行写的方法,pull我们采用了轮询法同步读的方法,实际上两个方法都保证了IO同步,但是它们适应于不同的角色。

(3)回调与异步。这段代码使用了大量的异步。在分析中已经说得很清楚了。

下一步我们将关注Server端的代码,很多新的问题将出现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐