Hadoop的前世今生(3)——ipc中的Client(2)
2012-05-01 17:31
162 查看
接上一回,接下来我们分析ConnectionCuller类,再上一回我接触了一下这个类,它的任务是回收Connection的:
ConnectionCuller
这个类的内容显然少得可怜,这里直接贴出它唯一的一个方法:
关于这个异步的过程我就不多说了,用一个步骤列表展示:
(1)ConnectionCuller调用c.setCloseConnection方法
(2)在Connection在run方法中进入了waitForWork方法(还记得么),然后还回了false,导致代码跳出了死循环,进入finally的回收
(3)回收过程调用了close方法
最后说一下这个类的生命周期,它发生在Client创建的时候,那个时候它就作为一个守护线程开始运行了……然后没有然后了,一直运行到天荒地老……
考虑完了这个类,实际上对于单输入单输出的问题就解决了,于是我们来看多输入的问题:
ParallelCall
这个类继承自Call,所以刚才对Call的东西基本上都可以针对它,同时由于Call中的变量是package权限的,所以它们在这个类中都是可以直接使用的。所以下面直接看ParallelResults:
ParallelResults
它的成员变量如下所示:
是不是以下次觉得豁然开朗?这个就是这两个并行类的需求啊!接下来看代码:
首先定义一个结果。
这样,这个方法也完了,于是,整个的代码分析完了。这一版代码全篇不到600行,可是说清楚它真得不容易。它为我们展示了一个很好的客户端框架,最后我们总结一下其中的一些值得深思的地方:
(1)考虑网络的“不靠谱”。我们必须承认网络的不靠谱,所以所有有关网络的操作我们必须抱着最坏的打算去执行。这个代码里的典型例子就是一些初始化和释放的地方,由于我们将网络考虑成不靠谱,所以一些本来同步的方法改成了异步,一些需要同步的地方把它写成了不同步。
(2)push与pull的位置。最典型的例子就是Connection里面的read和write,这里read是Pull,write是push。push我们采用了多线程同步并行写的方法,pull我们采用了轮询法同步读的方法,实际上两个方法都保证了IO同步,但是它们适应于不同的角色。
(3)回调与异步。这段代码使用了大量的异步。在分析中已经说得很清楚了。
下一步我们将关注Server端的代码,很多新的问题将出现。
ConnectionCuller
这个类的内容显然少得可怜,这里直接贴出它唯一的一个方法:
public void run() { while (running) { try { Thread.sleep(MIN_SLEEP_TIME); } catch (InterruptedException ie) { } synchronized (connections) { Iterator i = connections.values().iterator(); while (i.hasNext()) { Connection c = (Connection) i.next(); if (c.isIdle()) { // We don't actually close the socket here (i.e., // don't invoke // the close() method). We leave that work to the // response receiver // thread. The reason for that is since we have // taken a lock on the // connections table object, we don't want to slow // down the entire // system if we happen to talk to a slow server. i.remove(); synchronized (c) { c.setCloseConnection(); c.notify(); } } } } } }这个类通过一个定时器控制自己的行为,每次它都去调用每一个Connection的isIdle方法,(前面也提到了属于Idle的可能:(1)inUse数为0,一定时间内没有响应)。然后一个比较重点的东西就是它的那段注释,说实话这帮大牛肯在代码里啰啰嗦嗦地写这么多话,无非就是怕自己独具匠心的考虑与设计大家看不懂或者没有注意到,所以大家一定不能错过。这段注释主要说代码将采取异步的方法关闭连接,而不是同步的原因。
关于这个异步的过程我就不多说了,用一个步骤列表展示:
(1)ConnectionCuller调用c.setCloseConnection方法
(2)在Connection在run方法中进入了waitForWork方法(还记得么),然后还回了false,导致代码跳出了死循环,进入finally的回收
(3)回收过程调用了close方法
最后说一下这个类的生命周期,它发生在Client创建的时候,那个时候它就作为一个守护线程开始运行了……然后没有然后了,一直运行到天荒地老……
考虑完了这个类,实际上对于单输入单输出的问题就解决了,于是我们来看多输入的问题:
ParallelCall
这个类继承自Call,所以刚才对Call的东西基本上都可以针对它,同时由于Call中的变量是package权限的,所以它们在这个类中都是可以直接使用的。所以下面直接看ParallelResults:
ParallelResults
它的成员变量如下所示:
private Writable[] values; private int size; private int count;其实这个类写得也很简单,关于callComplete的代码也看得人一知半解,所以我们还是从上层开始,分析它们的生命周期吧。前面我们贴过了一个Client的call代码,下面我们分段把另一个call的代码贴出:
/** * Makes a set of calls in parallel. Each parameter is sent to the * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. */[译]并行调用一系列call。每一个参数都发送到指定的地址。不论所有的结果正确得到(或者说从Server端正确返回),或者超时,或者错误,收集到的结果都将以数组形式正常返回。对于超时或错误的call返回null。
是不是以下次觉得豁然开朗?这个就是这两个并行类的需求啊!接下来看代码:
ParallelResults results = new ParallelResults(params.length);
首先定义一个结果。
synchronized (results) { for (int i = 0; i < params.length; i++) { ParallelCall call = new ParallelCall(params[i], results, i); try { Connection connection = getConnection(addresses[i]); connection.sendParam(call); // send each parameter } catch (IOException e) { LOG.info("Calling " + addresses[i] + " caught: " + StringUtils.stringifyException(e)); // log errors results.size--; // wait for one fewer result } }这一段是创建的过程,我们似乎明白了他们模式。那就是每一个ParallelCall作为一个Call,其实和普通的call没有功能上的区别,但是它要将结果统一返回给result。
try { results.wait(timeout); // wait for all results } catch (InterruptedException e) { } if (results.count == 0) { throw new IOException("no responses"); } else { return results.values; }这一段代码的wait方法和上面的方法不一样。我认为这个应该是这一版代码的一个问题,因为在前面的wait方法的doc翻译中,我们知道了wait可能会意外地被唤醒,正确地编程方法应该是while的循环方法,而这里我们看不出它的巧妙。这个问题我们会进一步跟踪,因为还会关注后面版本的代码。
这样,这个方法也完了,于是,整个的代码分析完了。这一版代码全篇不到600行,可是说清楚它真得不容易。它为我们展示了一个很好的客户端框架,最后我们总结一下其中的一些值得深思的地方:
(1)考虑网络的“不靠谱”。我们必须承认网络的不靠谱,所以所有有关网络的操作我们必须抱着最坏的打算去执行。这个代码里的典型例子就是一些初始化和释放的地方,由于我们将网络考虑成不靠谱,所以一些本来同步的方法改成了异步,一些需要同步的地方把它写成了不同步。
(2)push与pull的位置。最典型的例子就是Connection里面的read和write,这里read是Pull,write是push。push我们采用了多线程同步并行写的方法,pull我们采用了轮询法同步读的方法,实际上两个方法都保证了IO同步,但是它们适应于不同的角色。
(3)回调与异步。这段代码使用了大量的异步。在分析中已经说得很清楚了。
下一步我们将关注Server端的代码,很多新的问题将出现。
相关文章推荐
- Hadoop的前世今生(2)——ipc的Client(1)
- hadoop INFO ipc.Client: Retrying connect to server: master/192.168.0.45:54310. Already tried 0 time
- hadoop错误INFO ipc.Client: Retrying connect to server: localhost/127.0.0.1
- org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:10020. Already tried 0 ti
- hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException:
- hadoop遇到的问题: org.apache.hadoop.ipc.Client: Retrying connect to server异常的解决
- hadoop HA模式重新格式化出现错误:INFO ipc.Client: Retrying connect to server:***:8485. Already
- Hadoop异常 hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException
- hadoop中master能够启动datanode,但是datanode无法连接namenode 报 17/11/16 03:49:13 WARN ipc.Client: Failed to conn
- org.apache.hadoop.ipc.Client: Retrying connect to server异常的解决
- hadoop无法启动 INFO ipc.Client: Retrying connect to server
- Hadoop— ipc.Client: Retrying connect to server: localhost/127.0.0.1:8020. Already tried 5 time(s).
- org.apache.hadoop.ipc.Client: Retrying connect to server
- Hadoop Eclipse Server IPC version 5 cannot communicate with client version 3
- org.apache.hadoop.ipc.Client: Retrying connect to server异常的解决
- 向HDFS上传文件时报错16/07/23 01:13:30 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.Rem
- [sqoop][mysql导入到hadoop]ipc.Client: Retrying connect to server: spark002/10.211.55.12:60587. Already
- 踏着前人的脚印学hadoop——ipc中的Client
- org.apache.hadoop.ipc.Client: Retrying connect to server: localhost/127.0.0.1:7359. Already tried 7
- hadoop数据节点通信异常【启动hadoop集群遇到错误org.apache.hadoop.ipc.Client: Retrying connect to server】