HDFS-RAID从Hadoop-0.22.0迁移到Hadoop-2.2.0
2014-05-26 10:39
162 查看
HDFS-RAID在Hadoop-0.22.0版本上作为一个附加组件发布,与0.22.0版本兼容使用。奇怪的是后续Haddoop团队并未继续对HDFS-RAID维护。</span><span style="font-size: 18px; font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">要想在最新的2.2.0版本上使用HDFS-RAID功能,要涉及许多代码的修改。主要分为以下三点:
第一:重写RaidBlockSender类。
RaidBlockSender是HDFS-RAID内部的Block发送器,当用过EC恢复一个corrupt的Block后,要把此Block发往目标节点。由于0.22.0到2.2.0版本跨度太大,Hadoop自身新加了很多新特性,主要体现在HA和Fedearation两个功能,所以底层代码修改比较多。主要体现在由之前的Block扩展到了ExtendedBlock,以及之前的Writable序列机制过度的到Google的Protobuf。前者主要简单的扩展字段,后者涉及到BlockSender的实现不同。而之前HDFS-RAID的RaidBlockSender是模仿Hadoop自身的BlockSender写的,所以迁移到2.2.0时,要模范2.2.0的BlockSender重写一个RaidBlockSender。重写后如下:
第二:重写DFSck,新的类为
HDFS-RAID调用HDFS自身提供的DFSck检查corrupt的Block,然后执行修复。但在2.2.0版本的DFSck中存在一个小的缺缺陷:
上面代码存在问题:就是fs只能是DistributedFileSystem时,DFSck才能正常工作,而在HDFS-RAID的实现上启用了对DistributedFileSystem的封装RaidDistributedFileSystem,它继承FilterFileSystem,包含了一个对HDFS自带的DistributedFileSystem的引用。这里DFSck实现上比较死,很多方法为private不支持通过继承的手段达到与HDFS-RAID的兼容,所以只能模范从写一个新类:
第三:修改RaidDFSUtil以及bin/hdfs和bin/haodop
HDFS-RAID通过RaidDFSUtil的方法getCorruptFiles查询corrupt文件,所以要把之前调用DFScK改为调用重写后的FilterDFScK,修改代码如下:
为了使得部署了HDFS-RAID后的Hadoop能继续使用fsck功能,我们需要修改bin/hdfs和bin/hadoop两个脚本中对fsck的调用类,修改如下:
完成上面修改后,基本把HDFS-RAID迁移到了Hadoop-2.2.0上。(不知如何上传附件,索要相关文件,请联系2323162333)
第一:重写RaidBlockSender类。
RaidBlockSender是HDFS-RAID内部的Block发送器,当用过EC恢复一个corrupt的Block后,要把此Block发往目标节点。由于0.22.0到2.2.0版本跨度太大,Hadoop自身新加了很多新特性,主要体现在HA和Fedearation两个功能,所以底层代码修改比较多。主要体现在由之前的Block扩展到了ExtendedBlock,以及之前的Writable序列机制过度的到Google的Protobuf。前者主要简单的扩展字段,后者涉及到BlockSender的实现不同。而之前HDFS-RAID的RaidBlockSender是模仿Hadoop自身的BlockSender写的,所以迁移到2.2.0时,要模范2.2.0的BlockSender重写一个RaidBlockSender。重写后如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.SocketException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** * in order to update the HDFS-RAID from Hadoop-0.22.0 to Hadoop-2.2.0, * we need to modify the class (@see RaidBlockSender) through chaning * the way of translating one block to another DataNode. * Basically, this class is totally different from the traditional class * (@see RaidBlockSender),we just change some code of (@see BlockSender) * in order to cater to our requirement after analyzing the principle * behind (@see BlockSender). * * @author Deng Changchun * @version 2 * @since 2014-05-24 */ public class RaidBlockSender2 implements java.io.Closeable { static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; /** * Minimum buffer used while sending data to clients. Used only if * transferTo() is enabled. 64KB is not that large. It could be larger, but * not sure if there will be much more improvement. */ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64 * 1024; private static final int TRANSFERTO_BUFFER_SIZE = Math.max( HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); /** the block to read from */ private final ExtendedBlock block; /** Stream to read block data from */ private InputStream blockIn; /** updated while using transferTo() */ private long blockInPosition = -1; /** Stream to read checksum */ private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; /** Initial position to read */ private long initialOffset; /** Current position of read */ private long offset; /** Position of last byte to read from block file */ private final long endOffset; /** Number of bytes in chunk used for computing checksum */ private final int chunkSize; /** Number bytes of checksum computed for a chunk */ private final int checksumSize; /** If true, failure to read checksum is ignored */ private final boolean corruptChecksumOk; /** Sequence number of packet being sent */ private long seqno; /** Set to true if transferTo is allowed for sending data to the client */ private final boolean transferToAllowed; /** Set to true once entire requested byte range has been sent to the client */ private boolean sentEntireByteRange; /** When true, verify checksum while reading from checksum file */ private final boolean verifyChecksum; /** Format used to print client trace log messages */ private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; /** The file descriptor of the block being sent */ private FileDescriptor blockInFd; // Cache-management related fields private final long readaheadLength; private ReadaheadRequest curReadahead; private final boolean alwaysReadahead; private final boolean dropCacheBehindLargeReads; private final boolean dropCacheBehindAllReads; private long lastCacheDropOffset; @VisibleForTesting static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB /** * See {{@link BlockSender#isLongRead()} */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; /** * Constructor * * @param block * Block that is being read * @param startOffset * starting offset to read from * @param length * length of data to read * @param corruptChecksumOk * @param verifyChecksum * verify checksum while reading the data * @param sendChecksum * send checksum to client. * @param datanode * datanode from which the block is being read * @param clientTraceFmt * format string used to print client trace logs * @throws IOException */ public RaidBlockSender2(ExtendedBlock block, long blockLength, long startOffset,long length, boolean corruptChecksumOk, boolean verifyChecksum, boolean sendChecksum, boolean transferToAllowed, DataInputStream metadataIn, InputStreamFactory streamFactory, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; /* * If the client asked for the cache to be dropped behind all reads, * we honor that. Otherwise, we use the DataNode defaults. When * using DataNode defaults, we use a heuristic where we only drop * the cache for large reads. */ if (cachingStrategy.getDropBehind() == null) { this.dropCacheBehindAllReads = false; this.dropCacheBehindLargeReads = DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT; } else { this.dropCacheBehindAllReads = this.dropCacheBehindLargeReads = cachingStrategy .getDropBehind().booleanValue(); } /* * Similarly, if readahead was explicitly requested, we always do * it. Otherwise, we read ahead based on the DataNode settings, and * only when the reads are large. */ if (cachingStrategy.getReadahead() == null) { this.alwaysReadahead = false; this.readaheadLength = DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; } else { this.alwaysReadahead = true; this.readaheadLength = cachingStrategy.getReadahead() .longValue(); } if (verifyChecksum) { // To simplify implementation, callers may not specify // verification // without sending. Preconditions.checkArgument(sendChecksum, "If verifying checksum, currently must also send it."); } final Replica replica; final long replicaVisibleLength = blockLength; ; // transferToFully() fails on 32 bit platforms for block sizes >= // 2GB, // use normal transfer in those cases this.transferToAllowed = transferToAllowed; /* * (corruptChecksumOK, meta_file_exist): operation True, True: will * verify checksum True, False: No verify, e.g., need to read data * from a corrupted file False, True: will verify checksum False, * False: throws IOException file not found */ DataChecksum csum = null; if (verifyChecksum || sendChecksum) { final InputStream metaIn = metadataIn; if (!corruptChecksumOk || metaIn != null) { if (metaIn == null) { // need checksum but meta-data not found throw new FileNotFoundException( "Meta-data not found for " + block); } checksumIn = new DataInputStream(new BufferedInputStream( metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a // version BlockMetadataHeader header = BlockMetadataHeader .readHeader(checksumIn); short version = header.getVersion(); if (version != BlockMetadataHeader.VERSION) { LOG.warn("Wrong version (" + version + ") for metadata file for " + block + " ignoring ..."); } csum = header.getChecksum(); } else { LOG.warn("Could not find metadata file for " + block); } } if (csum == null) { // The number of bytes per checksum here determines the // alignment // of reads: we always start reading at a checksum chunk // boundary, // even if the checksum type is NULL. So, choosing too big of a // value // would risk sending too much unnecessary data. 512 (1 disk // sector) // is likely to result in minimal extra IO. csum = DataChecksum .newDataChecksum(DataChecksum.Type.NULL, 512); } /* * If chunkSize is very large, then the metadata file is mostly * corrupted. For now just truncate bytesPerchecksum to blockLength. */ int size = csum.getBytesPerChecksum(); if (size > 10 * 1024 * 1024 && size > replicaVisibleLength) { csum = DataChecksum.newDataChecksum(csum.getChecksumType(), Math.max((int) replicaVisibleLength, 10 * 1024 * 1024)); size = csum.getBytesPerChecksum(); } chunkSize = size; checksum = csum; checksumSize = checksum.getChecksumSize(); length = length < 0 ? replicaVisibleLength : length; // end is either last byte on disk or the length for which we have a // checksum ChunkChecksum chunkChecksum = null; long end = chunkChecksum != null ? chunkChecksum.getDataLength() : blockLength; if (startOffset < 0 || startOffset > end || (length + startOffset) > end) { String msg = " Offset " + startOffset + " and length " + length + " don't match block " + block + " ( blockLen " + end + " )"; LOG.warn(this.getClass().getName() + ":sendBlock() : " + msg); throw new IOException(msg); } // Ensure read offset is position at the beginning of chunk offset = startOffset - (startOffset % chunkSize); if (length >= 0) { // Ensure endOffset points to end of chunk. long tmpLen = startOffset + length; if (tmpLen % chunkSize != 0) { tmpLen += (chunkSize - tmpLen % chunkSize); } if (tmpLen < end) { // will use on-disk checksum here since the end is a stable // chunk end = tmpLen; } else if (chunkChecksum != null) { // last chunk is changing. flag that we need to use // in-memory checksum this.lastChunkChecksum = chunkChecksum; } } endOffset = end; // seek to the right offsets if (offset > 0) { long checksumSkip = (offset / chunkSize) * checksumSize; // note blockInStream is seeked when created below if (checksumSkip > 0) { // Should we use seek() for checksum file as well? IOUtils.skipFully(checksumIn, checksumSkip); } } seqno = 0; blockIn = streamFactory.createStream(offset); // seek to offset if (blockIn instanceof FileInputStream) { blockInFd = ((FileInputStream) blockIn).getFD(); } else { blockInFd = null; } } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); throw ioe; } } /** * close opened files. */ @Override public void close() throws IOException { if (blockInFd != null && ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) { try { NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(), blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, NativeIO.POSIX.POSIX_FADV_DONTNEED); } catch (Exception e) { LOG.warn("Unable to drop cache on file close", e); } } if (curReadahead != null) { curReadahead.cancel(); } IOException ioe = null; if (checksumIn != null) { try { checksumIn.close(); // close checksum file } catch (IOException e) { ioe = e; } checksumIn = null; } if (blockIn != null) { try { blockIn.close(); // close data file } catch (IOException e) { ioe = e; } blockIn = null; blockInFd = null; } // throw IOException if there is any if (ioe != null) { throw ioe; } } /** * Wait for rbw replica to reach the length * * @param rbw * replica that is being written to * @param len * minimum length to reach * @throws IOException * on failing to reach the len in given wait time */ private static void waitForMinLength(ReplicaBeingWritten rbw, long len) throws IOException { // Wait for 3 seconds for rbw replica to reach the minimum length for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) { try { Thread.sleep(100); } catch (InterruptedException ie) { throw new IOException(ie); } } long bytesOnDisk = rbw.getBytesOnDisk(); if (bytesOnDisk < len) { throw new IOException(String.format( "Need %d bytes, but only %d bytes available", len, bytesOnDisk)); } } /** * Converts an IOExcpetion (not subclasses) to SocketException. This is * typically done to indicate to upper layers that the error was a socket * error rather than often more serious exceptions like disk errors. */ private static IOException ioeToSocketException(IOException ioe) { if (ioe.getClass().equals(IOException.class)) { // "se" could be a new class in stead of SocketException. IOException se = new SocketException("Original Exception : " + ioe); se.initCause(ioe); /* * Change the stacktrace so that original trace is not truncated * when printed. */ se.setStackTrace(ioe.getStackTrace()); return se; } // otherwise just return the same exception. return ioe; } /** * @param datalen * Length of data * @return number of chunks for data of given size */ private int numberOfChunks(long datalen) { return (int) ((datalen + chunkSize - 1) / chunkSize); } /** * Sends a packet with up to maxChunks chunks of data. * * @param pkt * buffer used for writing packet data * @param maxChunks * maximum number of chunks to send * @param out * stream to send data to * @param transferTo * use transferTo to send data * @param throttler * used for throttling data transfer bandwidth */ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, boolean transferTo, DataTransferThrottler throttler) throws IOException { int dataLen = (int) Math.min(endOffset - offset, (chunkSize * (long) maxChunks)); int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in // the packet int checksumDataLen = numChunks * checksumSize; int packetLen = dataLen + checksumDataLen + 4; boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; // The packet buffer is organized as follows: // _______HHHHCCCCD?D?D?D? // ^ ^ // | \ checksumOff // \ headerOff // _ padding, since the header is variable-length // H = header and length prefixes // C = checksums // D? = data, if transferTo is false. int headerLen = writePacketHeader(pkt, dataLen, packetLen); // Per above, the header doesn't start at the beginning of the // buffer int headerOff = pkt.position() - headerLen; int checksumOff = pkt.position(); byte[] buf = pkt.array(); if (checksumSize > 0 && checksumIn != null) { readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum if (lastDataPacket && lastChunkChecksum != null) { int start = checksumOff + checksumDataLen - checksumSize; byte[] updatedChecksum = lastChunkChecksum.getChecksum(); if (updatedChecksum != null) { System.arraycopy(updatedChecksum, 0, buf, start, checksumSize); } } } int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer IOUtils.readFully(blockIn, buf, dataOff, dataLen); if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); } } try { if (transferTo) { SocketOutputStream sockOut = (SocketOutputStream) out; // First write header and checksums sockOut.write(buf, headerOff, dataOff - headerOff); // no need to flush since we know out is not a buffered stream FileChannel fileCh = ((FileInputStream) blockIn).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime); blockInPosition += dataLen; } else { // normal transfer out.write(buf, headerOff, dataOff + dataLen - headerOff); } } catch (IOException e) { if (e instanceof SocketTimeoutException) { /* * writing to client timed out. This happens if the client reads * part of a block and then decides not to read the rest (but * leaves the socket open). */ LOG.info("exception: ", e); } else { /* * Exception while writing to the client. Connection closure * from the other end is mostly the case and we do not care much * about it. But other things can go wrong, especially in * transferTo(), which we do not want to ignore. * * The message parsing below should not be considered as a good * coding example. NEVER do it to drive a program logic. NEVER. * It was done here because the NIO throws an IOException for * EPIPE. */ String ioem = e.getMessage(); if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); } } throw ioeToSocketException(e); } if (throttler != null) { // rebalancing so throttle throttler.throttle(packetLen); } return dataLen; } /** * Read checksum into given buffer * * @param buf * buffer to read the checksum into * @param checksumOffset * offset at which to write the checksum into buf * @param checksumLen * length of checksum to write * @throws IOException * on error */ private void readChecksum(byte[] buf, final int checksumOffset, final int checksumLen) throws IOException { if (checksumSize <= 0 && checksumIn == null) { return; } try { checksumIn.readFully(buf, checksumOffset, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to veirfy checksum for data" + " at offset " + offset + " for block " + block, e); IOUtils.closeStream(checksumIn); checksumIn = null; if (corruptChecksumOk) { if (checksumOffset < checksumLen) { // Just fill the array with zeros. Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0); } } else { throw e; } } } /** * Compute checksum for chunks and verify the checksum that is read from the * metadata file is correct. * * @param buf * buffer that has checksum and data * @param dataOffset * position where data is written in the buf * @param datalen * length of data * @param numChunks * number of chunks corresponding to data * @param checksumOffset * offset where checksum is written in the buf * @throws ChecksumException * on failed checksum verification */ public void verifyChecksum(final byte[] buf, final int dataOffset, final int datalen, final int numChunks, final int checksumOffset) throws ChecksumException { int dOff = dataOffset; int cOff = checksumOffset; int dLeft = datalen; for (int i = 0; i < numChunks; i++) { checksum.reset(); int dLen = Math.min(dLeft, chunkSize); checksum.update(buf, dOff, dLen); if (!checksum.compare(buf, cOff)) { long failedPos = offset + datalen - dLeft; throw new ChecksumException("Checksum failed at " + failedPos, failedPos); } dLeft -= dLen; dOff += dLen; cOff += checksumSize; } } /** * sendBlock() is used to read block and its metadata and stream the data to * either a client or to another datanode. * * @param out * stream to which the block is written to * @param baseStream * optional. if non-null, <code>out</code> is assumed to be a * wrapper over this stream. This enables optimizations for * sending the data, e.g. * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} * . * @param throttler * for sending data. * @return total bytes read, including checksum data. */ public long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException { if (out == null) { throw new IOException("out stream is null"); } initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; lastCacheDropOffset = initialOffset; if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(), blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. manageOsCache(); final long startTime = ClientTraceLog.isInfoEnabled() ? System .nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream && blockIn instanceof FileInputStream; if (transferTo) { FileChannel fileChannel = ((FileInputStream) blockIn) .getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); // Smaller packet size to only hold checksum when doing // transferTo pktBufSize += checksumSize * maxChunksPerPacket; } else { maxChunksPerPacket = Math.max(1, numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE)); // Packet size includes both checksum and data pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize); while (endOffset > offset && !Thread.currentThread().isInterrupted()) { manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); offset += len; totalRead += len + (numberOfChunks(len) * checksumSize); seqno++; } // If this thread was interrupted, then it did not send the full // block. if (!Thread.currentThread().isInterrupted()) { try { // send an empty packet to mark the end of the block sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); out.flush(); } catch (IOException e) { // socket error throw ioeToSocketException(e); } sentEntireByteRange = true; } } finally { if (clientTraceFmt != null) { final long endTime = System.nanoTime(); ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); } return totalRead; } /** * Manage the OS buffer cache by performing read-ahead and drop-behind. */ private void manageOsCache() throws IOException { // We can't manage the cache for this block if we don't have a file // descriptor to work with. if (blockInFd == null) return; // Drop what we've just read from cache, since we aren't // likely to need it again if (dropCacheBehindAllReads || (dropCacheBehindLargeReads && isLongRead())) { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (offset >= nextCacheDropOffset) { long dropLength = offset - lastCacheDropOffset; NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(), blockInFd, lastCacheDropOffset, dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED); lastCacheDropOffset = offset; } } } /** * Returns true if we have done a long enough read for this block to qualify * for the DataNode-wide cache management defaults. We avoid applying the * cache management defaults to smaller reads because the overhead would be * too high. * * Note that if the client explicitly asked for dropBehind, we will do it * even on short reads. * * This is also used to determine when to invoke * posix_fadvise(POSIX_FADV_SEQUENTIAL). */ private boolean isLongRead() { return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES; } /** * Write packet header into {@code pkt}, return the length of the header * written. */ private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { pkt.clear(); // both syncBlock and syncPacket are false PacketHeader header = new PacketHeader(packetLen, offset, seqno, (dataLen == 0), dataLen, false); int size = header.getSerializedSize(); pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size); header.putInBuffer(pkt); return size; } boolean didSendEntireByteRange() { return sentEntireByteRange; } /** * @return the checksum type that will be used with this block transfer. */ DataChecksum getChecksum() { return checksum; } /** * @return the offset into the block file where the sender is currently * reading. */ long getOffset() { return offset; } public static interface InputStreamFactory { public InputStream createStream(long offset) throws IOException; } private static class BlockInputStreamFactory implements InputStreamFactory { private final ExtendedBlock block; private final FsDatasetSpi data; private BlockInputStreamFactory(ExtendedBlock block, FsDatasetSpi data) { this.block = block; this.data = data; } @Override public InputStream createStream(long offset) throws IOException { return data.getBlockInputStream(block, offset); } } }
第二:重写DFSck,新的类为
HDFS-RAID调用HDFS自身提供的DFSck检查corrupt的Block,然后执行修复。但在2.2.0版本的DFSck中存在一个小的缺缺陷:
/** * Derive the namenode http address from the current file system, * either default or as set by "-fs" in the generic options. * @return Returns http address or null if failure. * @throws IOException if we can't determine the active NN address */ private String getCurrentNamenodeAddress() throws IOException { //String nnAddress = null; Configuration conf = getConf(); //get the filesystem object to verify it is an HDFS system FileSystem fs; try { fs = FileSystem.get(conf); } catch (IOException ioe) { System.err.println("FileSystem is inaccessible due to:\n" + StringUtils.stringifyException(ioe)); return null; } if (!(fs instanceof DistributedFileSystem)) { System.err.println("FileSystem is " + fs.getUri()); return null; } return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf, false); }
上面代码存在问题:就是fs只能是DistributedFileSystem时,DFSck才能正常工作,而在HDFS-RAID的实现上启用了对DistributedFileSystem的封装RaidDistributedFileSystem,它继承FilterFileSystem,包含了一个对HDFS自带的DistributedFileSystem的引用。这里DFSck实现上比较死,很多方法为private不支持通过继承的手段达到与HDFS-RAID的兼容,所以只能模范从写一个新类:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hdfs.tools; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * In order to be compatible with HDFS-RAID, we should modify <code>DFSck<code>. * We just modify the method <code>DFSck.getCurrentNamenodeAddress()<code>. * * @author Deng Changchun * @version 2 * @since 2014-05-24 */ @InterfaceAudience.Private public class FilterDFSck extends Configured implements Tool { static{ HdfsConfiguration.init(); } private static final String USAGE = "Usage: DFSck <path> " + "[-list-corruptfileblocks | " + "[-move | -delete | -openforwrite] " + "[-files [-blocks [-locations | -racks]]]]\n" + "\t<path>\tstart checking from this path\n" + "\t-move\tmove corrupted files to /lost+found\n" + "\t-delete\tdelete corrupted files\n" + "\t-files\tprint out files being checked\n" + "\t-openforwrite\tprint out files opened for write\n" + "\t-list-corruptfileblocks\tprint out list of missing " + "blocks and files they belong to\n" + "\t-blocks\tprint out block report\n" + "\t-locations\tprint out locations for every block\n" + "\t-racks\tprint out network topology for data-node locations\n" + "\t\tBy default fsck ignores files opened for write, " + "use -openforwrite to report such files. They are usually " + " tagged CORRUPT or HEALTHY depending on their block " + "allocation status"; private final UserGroupInformation ugi; private final PrintStream out; /** * Filesystem checker. * @param conf current Configuration */ public FilterDFSck(Configuration conf) throws IOException { this(conf, System.out); } public FilterDFSck(Configuration conf, PrintStream out) throws IOException { super(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.out = out; } /** * Print fsck usage information */ static void printUsage(PrintStream out) { out.println(USAGE + "\n"); ToolRunner.printGenericCommandUsage(out); } /** * @param args */ @Override public int run(final String[] args) throws IOException { if (args.length == 0) { printUsage(System.err); return -1; } try { return UserGroupInformation.getCurrentUser().doAs( new PrivilegedExceptionAction<Integer>() { @Override public Integer run() throws Exception { return doWork(args); } }); } catch (InterruptedException e) { throw new IOException(e); } } /* * To get the list, we need to call iteratively until the server says * there is no more left. */ private Integer listCorruptFileBlocks(String dir, String baseUrl) throws IOException { int errCode = -1; int numCorrupt = 0; int cookie = 0; final String noCorruptLine = "has no CORRUPT files"; final String noMoreCorruptLine = "has no more CORRUPT files"; final String cookiePrefix = "Cookie:"; boolean allDone = false; while (!allDone) { final StringBuffer url = new StringBuffer(baseUrl); if (cookie > 0) { url.append("&startblockafter=").append(String.valueOf(cookie)); } URL path = new URL(url.toString()); URLConnection connection = SecurityUtil.openSecureHttpConnection(path); InputStream stream = connection.getInputStream(); BufferedReader input = new BufferedReader(new InputStreamReader( stream, "UTF-8")); try { String line = null; while ((line = input.readLine()) != null) { if (line.startsWith(cookiePrefix)){ try{ cookie = Integer.parseInt(line.split("\t")[1]); } catch (Exception e){ allDone = true; break; } continue; } if ((line.endsWith(noCorruptLine)) || (line.endsWith(noMoreCorruptLine)) || (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) { allDone = true; break; } if ((line.isEmpty()) || (line.startsWith("FSCK started by")) || (line.startsWith("The filesystem under path"))) continue; numCorrupt++; if (numCorrupt == 1) { out.println("The list of corrupt files under path '" + dir + "' are:"); } out.println(line); } } finally { input.close(); } } out.println("The filesystem under path '" + dir + "' has " + numCorrupt + " CORRUPT files"); if (numCorrupt == 0) errCode = 0; return errCode; } /** * Derive the namenode http address from the current file system, * either default or as set by "-fs" in the generic options. * @return Returns http address or null if failure. * @throws IOException if we can't determine the active NN address */ private String getCurrentNamenodeAddress() throws IOException { // String nnAddress = null; Configuration conf = getConf(); // get the filesystem object to verify it is an HDFS system FileSystem fs; try { fs = FileSystem.get(conf); } catch (IOException ioe) { System.err.println("FileSystem is inaccessible due to:\n" + StringUtils.stringifyException(ioe)); return null; } // We extend the DFSck to adapt to FilterFileSystem. // --Begin-- fs = fs instanceof FilterFileSystem ? ((FilterFileSystem) fs) .getRawFileSystem() : fs; // --End-- if (!(fs instanceof DistributedFileSystem)) { System.err.println("FileSystem is " + fs.getUri()); return null; } return DFSUtil .getInfoServer(HAUtil.getAddressOfActive(fs), conf, false); } private int doWork(final String[] args) throws IOException { final StringBuilder url = new StringBuilder(HttpConfig.getSchemePrefix()); String namenodeAddress = getCurrentNamenodeAddress(); if (namenodeAddress == null) { //Error message already output in {@link #getCurrentNamenodeAddress()} System.err.println("DFSck exiting."); return 0; } url.append(namenodeAddress); System.err.println("Connecting to namenode via " + url.toString()); url.append("/fsck?ugi=").append(ugi.getShortUserName()); String dir = null; boolean doListCorruptFileBlocks = false; for (int idx = 0; idx < args.length; idx++) { if (args[idx].equals("-move")) { url.append("&move=1"); } else if (args[idx].equals("-delete")) { url.append("&delete=1"); } else if (args[idx].equals("-files")) { url.append("&files=1"); } else if (args[idx].equals("-openforwrite")) { url.append("&openforwrite=1"); } else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); } else if (args[idx].equals("-locations")) { url.append("&locations=1"); } else if (args[idx].equals("-racks")) { url.append("&racks=1"); } else if (args[idx].equals("-list-corruptfileblocks")) { url.append("&listcorruptfileblocks=1"); doListCorruptFileBlocks = true; } else if (!args[idx].startsWith("-")) { if (null == dir) { dir = args[idx]; } else { System.err.println("fsck: can only operate on one path at a time '" + args[idx] + "'"); printUsage(System.err); return -1; } } else { System.err.println("fsck: Illegal option '" + args[idx] + "'"); printUsage(System.err); return -1; } } if (null == dir) { dir = "/"; } url.append("&path=").append(URLEncoder.encode(dir, "UTF-8")); if (doListCorruptFileBlocks) { return listCorruptFileBlocks(dir, url.toString()); } URL path = new URL(url.toString()); URLConnection connection = SecurityUtil.openSecureHttpConnection(path); InputStream stream = connection.getInputStream(); BufferedReader input = new BufferedReader(new InputStreamReader( stream, "UTF-8")); String line = null; String lastLine = null; int errCode = -1; try { while ((line = input.readLine()) != null) { out.println(line); lastLine = line; } } finally { input.close(); } if (lastLine.endsWith(NamenodeFsck.HEALTHY_STATUS)) { errCode = 0; } else if (lastLine.endsWith(NamenodeFsck.CORRUPT_STATUS)) { errCode = 1; } else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) { errCode = 0; } return errCode; } public static void main(String[] args) throws Exception { // -files option is also used by GenericOptionsParser // Make sure that is not the first argument for fsck int res = -1; if ((args.length == 0) || ("-files".equals(args[0]))) { printUsage(System.err); ToolRunner.printGenericCommandUsage(System.err); } else if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) { res = 0; } else { res = ToolRunner.run(new FilterDFSck(new HdfsConfiguration()), args); } System.exit(res); } }
第三:修改RaidDFSUtil以及bin/hdfs和bin/haodop
HDFS-RAID通过RaidDFSUtil的方法getCorruptFiles查询corrupt文件,所以要把之前调用DFScK改为调用重写后的FilterDFScK,修改代码如下:
// DFSck fsck = new DFSck(conf, out); FilterDFSck fsck = new FilterDFSck(conf, out); String[] args = new String[]{"-list-corruptfileblocks"}; try { ToolRunner.run(fsck, args); } catch (Exception e) { // throw new IOException("DFSck.run exception ", e); throw new IOException("FilterDFSck.run exception ", e); }
为了使得部署了HDFS-RAID后的Hadoop能继续使用fsck功能,我们需要修改bin/hdfs和bin/hadoop两个脚本中对fsck的调用类,修改如下:
elif [ "$COMMAND" = "fsck" ] ; then CLASS=org.apache.hadoop.hdfs.tools.FilterDFSck HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
完成上面修改后,基本把HDFS-RAID迁移到了Hadoop-2.2.0上。(不知如何上传附件,索要相关文件,请联系2323162333)
相关文章推荐
- Apache Hadoop 2.2.0 HDFS HA + YARN多机部署
- hadoop2.2.0 hdfs-site.xml
- 搭建Hadoop 2.2.0版本HDFS的HA配置
- hadoop2.2.0 分布式存储hdfs完全分布式搭建及功能测试记录(一)----架构及原理介绍
- Hadoop 学习笔记 (九) hadoop2.2.0 生产环境部署 HDFS HA部署方法
- Hadoop 2.2.0版本HDFS的HA配置
- 第九章 搭建Hadoop 2.2.0版本HDFS的HA配置
- Hadoop 2.2.0 HDFS的HA配置
- Hadoop2.2.0中HDFS的高可用性实现原理
- Hadoop-2.2.0中文文档—— 从Hadoop 1.x 迁移至 Hadoop 2.x
- hadoop 集群 远程访问 mysql(通过sqoop从远程数据库服务器向hdfs迁移数据) 屡次失败的原因
- 第九章 搭建Hadoop 2.2.0版本HDFS的HA配置
- Apache Hadoop 2.2.0 HDFS HA + YARN多机部署
- Hadoop 2.2.0启动脚本——bin/hdfs
- Apache Hadoop 2.2.0 HDFS HA + YARN多机部署
- Hadoop 2.2.0 HDFS HA高可用安装与配置
- Sqoop1.4.5+hadoop2.2.0进行Mysql到HDFS的数据转换
- Hadoop-2.2.0 HDFS暂时不支持并发追加
- [置顶] Hadoop2.2.0中HDFS的高可用性实现原理
- Hadoop2.2.0中HDFS的高可用性实现原理