您的位置:首页 > 大数据 > Hadoop

HDFS的读数据过程分析

2014-05-21 12:57 162 查看
我们继续在 FileSystem 类分析,读数据使用的是 open(…)方法,我们可以看到源码

FSDataInputStream in = fileSystem.open(new Path("/d100/f1"));
点击open

/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}


在return open 的open按ctrl+t 选择第一个DistributedFileSystem

public FSDataInputStream open(Path f, int bufferSize) throws IOException {
statistics.incrementReadOps(1);
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}


返回的是 DFSClient 类中 DFSDataInputStream 类,显而易见,这是一个内部类。这个内部类的构造函数,有两个形参,第一个参数是 dfs.open(…)创建的对象。我们

看一下方法的源码,

/**
* Create an input stream that obtains a nodelist from the
* namenode, and then reads from all the right places.  Creates
* inner subclass of InputStream that does the right out-of-band
* work.
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
FileSystem.Statistics stats
) throws IOException {
checkOpen();
//    Get block info from namenode
return new DFSInputStream(src, buffersize, verifyChecksum);
}

返回的是一个 DFSInputStream 对象。该对象中含有 NameNode 中的数据块信息。我们看一下这个类的构造方法源码

DFSInputStream(String src, int buffersize, boolean verifyChecksum
) throws IOException {
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
openInfo();
}


openInfo()打开信息,openInfo()的源代码是获取数据块的信息的。我们查看这一行的源代码

/**
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException {
for (int retries = 3; retries > 0; retries--) {
if (fetchLocatedBlocks()) {
// fetch block success
return;
} else {
// Last block location unavailable. When a cluster restarts,
// DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length.
// Lets retry a few times to get the length.
DFSClient.LOG.warn("Last block locations unavailable. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retries + " times");
waitFor(4000);
}
}
throw new IOException("Could not obtain the last block locations.");
}


进去fetchLocatedBlocks()

private boolean fetchLocatedBlocks() throws IOException,
FileNotFoundException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
prefetchSize);
if (newInfo == null) {
throw new FileNotFoundException("File does not exist: " + src);
}

if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
&& !newInfo.isUnderConstruction()) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
.iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
this.locatedBlocks = newInfo;
this.currentNode = null;
return isBlkInfoUpdated;
}
进入:callGetBlockLocations(namenode, src, 0, prefetchSize);

static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length) throws IOException {
try {
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class);
}
}


可以看到,获取数据块信息的方法也是通过调用 namenode 取得的。这里的 namenode 属性还是位于 DFSClient 中的。通过前面的分析,我们已经知道,在 DFSClient类中的 namenode 属性是 ClientProtocal。

可以参考上一篇文章 写数据的分析
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: