您的位置:首页 > 大数据 > Hadoop

HDFS(2)

2015-12-08 19:06 357 查看


HDFS中读文件过程,如上图所示。

1.通过fs(filesystem)的open方法读取文件。因为这里的系统的分布式系统,

所以完成open方法的类是DistributedFileSystem,查看open方法。

@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}

这里返回一个FSDataInputStream。具体如何返回,这里调用了resolve方法,这个方法会调用docall方法,如果docall方法有异常则调用next方法。docall方法主要就执行了一句话,使用DFSClient的open方法,这个方法返回一个DFSInputStream,这个输入流会从nn中得到一个nodelist。因为nn保存了2个重要的表,(1)filename->blocksequence (2)block->machinelist。nn通过filename找到对应block块序列,这些block块都会存在在dn上面,其实就是一组block的地址信息。

/**
* Create an input stream that obtains a nodelist from the
* namenode, and then reads from all the right places.  Creates
* inner subclass of InputStream that does the right out-of-band
* work.
*/
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
//    Get block info from namenode
return new DFSInputStream(this, src, buffersize, verifyChecksum);
}


2.进入DFSClient的open方法,主要是创建一个DFSInputStream。该方法的构造方法也没什么好看的,主要有一个openInfo方法,实现了从nn中获取文件信息的功能。

/**
* Read the entire buffer.
*/
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);

return readWithStrategy(byteArrayReader, off, len);
}


3.仔细学习DFSInputStream这个类,创建一个读策略接口的实现,接口只有一个方法doRead。采用的策略是读字节数组。

private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
dfsClient.checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
if (pos > blockEnd || currentNode == null) {
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
if (locatedBlocks.isLastBlockComplete()) {
realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
}
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);

if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
} finally {
// Check if need to report block replicas corruption either read
// was successful or ChecksumException occured.
reportCheckSumFailure(corruptedBlockMap,
currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}


4.看readWithStrategy方法,首先会移动到第一个block所在的地址,具体的移动过程在blockSeekTo方法中。

/**
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}

// Will be getting a new BlockReader.
if (blockReader != null) {
blockReader.close();
blockReader = null;
}

//
// Connect to best DataNode for desired Block, with potential offset
//
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once

boolean connectFailedOnce = false;

while (true) {
//
// Compute desired block
//
LocatedBlock targetBlock = getBlockAt(target, true);
assert (target==pos) : "Wrong postion " + pos + " expect " + target;
long offsetIntoBlock = target - targetBlock.getStartOffset();

DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;

try {
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(cachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk);
}
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + ex);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
} else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
refetchToken--;
fetchBlockAt(target);
} else {
connectFailedOnce = true;
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
}
}
}
5.整个移动的过程是为了找到最好的block,因为副本机制,同样的block存放在不同dn上面,找到最好的block就是为了找到距离客户端距离最近的可用的dn上面的block。并且,该方法还利用工厂设计模式根据要求创建一个blockReader对象。移动到最好的dn之后,程序开始读内容,具体的读操作在readBuffer方法中。

/* This is a used by regular read() and handles ChecksumExceptions.
* name readBuffer() is chosen to imply similarity to readBuffer() in
* ChecksumFileSystem
*/
private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
IOException ioe;

/* we retry current node only once. So this is set to true only here.
* Intention is to handle one common case of an error that is not a
* failure on datanode or client : when DataNode closes the connection
* since client is idle. If there are other cases of "non-errors" then
* then a datanode might be retried by setting this to true again.
*/
boolean retryCurrentNode = true;

while (true) {
// retry as many times as seekToNewSource allows.
try {
return reader.doRead(blockReader, off, len, readStatistics);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
+ " at " + ce.getPos());
ioe = ce;
retryCurrentNode = false;
// we want to remember which block replicas we have tried
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
corruptedBlockMap);
} catch ( IOException e ) {
if (!retryCurrentNode) {
DFSClient.LOG.warn("Exception while reading from "
+ getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
ioe = e;
}
boolean sourceFound = false;
if (retryCurrentNode) {
/* possibly retry the same node so that transient errors don't
* result in application level failures (e.g. Datanode could have
* closed the connection because the client is idle for too long).
*/
sourceFound = seekToBlockSource(pos);
} else {
addToDeadNodes(currentNode);
sourceFound = seekToNewSource(pos);
}
if (!sourceFound) {
throw ioe;
}
retryCurrentNode = false;
}
}
6.该方法会执行多次的读策略中的doRead方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: