您的位置:首页 > 大数据 > 人工智能

HDFS-RAID从Hadoop-0.22.0迁移到Hadoop-2.2.0

2014-05-26 10:39 162 查看
HDFS-RAID在Hadoop-0.22.0版本上作为一个附加组件发布,与0.22.0版本兼容使用。奇怪的是后续Haddoop团队并未继续对HDFS-RAID维护。</span><span style="font-size: 18px; font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">要想在最新的2.2.0版本上使用HDFS-RAID功能,要涉及许多代码的修改。主要分为以下三点:

第一:重写RaidBlockSender类。

RaidBlockSender是HDFS-RAID内部的Block发送器,当用过EC恢复一个corrupt的Block后,要把此Block发往目标节点。由于0.22.0到2.2.0版本跨度太大,Hadoop自身新加了很多新特性,主要体现在HA和Fedearation两个功能,所以底层代码修改比较多。主要体现在由之前的Block扩展到了ExtendedBlock,以及之前的Writable序列机制过度的到Google的Protobuf。前者主要简单的扩展字段,后者涉及到BlockSender的实现不同。而之前HDFS-RAID的RaidBlockSender是模仿Hadoop自身的BlockSender写的,所以迁移到2.2.0时,要模范2.2.0的BlockSender重写一个RaidBlockSender。重写后如下:

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

/**
* in order to update the HDFS-RAID from Hadoop-0.22.0 to Hadoop-2.2.0,
* we need to modify the class (@see RaidBlockSender) through chaning
* the way of translating one block to another DataNode.
* Basically, this class is totally different from the traditional class
* (@see RaidBlockSender),we just change some code of (@see BlockSender)
* in order to cater to our requirement after analyzing the principle
* behind (@see BlockSender).
*
* @author Deng Changchun
* @version 2
* @since 2014-05-24
*/
public class RaidBlockSender2 implements java.io.Closeable {
static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;

/**
* Minimum buffer used while sending data to clients. Used only if
* transferTo() is enabled. 64KB is not that large. It could be larger, but
* not sure if there will be much more improvement.
*/
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64 * 1024;
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);

/** the block to read from */
private final ExtendedBlock block;
/** Stream to read block data from */
private InputStream blockIn;
/** updated while using transferTo() */
private long blockInPosition = -1;
/** Stream to read checksum */
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
/** Initial position to read */
private long initialOffset;
/** Current position of read */
private long offset;
/** Position of last byte to read from block file */
private final long endOffset;
/** Number of bytes in chunk used for computing checksum */
private final int chunkSize;
/** Number bytes of checksum computed for a chunk */
private final int checksumSize;
/** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk;
/** Sequence number of packet being sent */
private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */
private final boolean transferToAllowed;
/** Set to true once entire requested byte range has been sent to the client */
private boolean sentEntireByteRange;
/** When true, verify checksum while reading from checksum file */
private final boolean verifyChecksum;
/** Format used to print client trace log messages */
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;

/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;

// Cache-management related fields
private final long readaheadLength;

private ReadaheadRequest curReadahead;

private final boolean alwaysReadahead;

private final boolean dropCacheBehindLargeReads;

private final boolean dropCacheBehindAllReads;

private long lastCacheDropOffset;

@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB

/**
* See {{@link BlockSender#isLongRead()}
*/
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;

/**
* Constructor
*
* @param block
*            Block that is being read
* @param startOffset
*            starting offset to read from
* @param length
*            length of data to read
* @param corruptChecksumOk
* @param verifyChecksum
*            verify checksum while reading the data
* @param sendChecksum
*            send checksum to client.
* @param datanode
*            datanode from which the block is being read
* @param clientTraceFmt
*            format string used to print client trace logs
* @throws IOException
*/
public RaidBlockSender2(ExtendedBlock block, long blockLength,
long startOffset,long length,
boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum, boolean transferToAllowed,
DataInputStream metadataIn, InputStreamFactory streamFactory,
String clientTraceFmt, CachingStrategy cachingStrategy)
throws IOException {
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;

/*
* If the client asked for the cache to be dropped behind all reads,
* we honor that. Otherwise, we use the DataNode defaults. When
* using DataNode defaults, we use a heuristic where we only drop
* the cache for large reads.
*/
if (cachingStrategy.getDropBehind() == null) {
this.dropCacheBehindAllReads = false;
this.dropCacheBehindLargeReads = DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT;
} else {
this.dropCacheBehindAllReads = this.dropCacheBehindLargeReads = cachingStrategy
.getDropBehind().booleanValue();
}
/*
* Similarly, if readahead was explicitly requested, we always do
* it. Otherwise, we read ahead based on the DataNode settings, and
* only when the reads are large.
*/
if (cachingStrategy.getReadahead() == null) {
this.alwaysReadahead = false;
this.readaheadLength = DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
} else {
this.alwaysReadahead = true;
this.readaheadLength = cachingStrategy.getReadahead()
.longValue();
}

if (verifyChecksum) {
// To simplify implementation, callers may not specify
// verification
// without sending.
Preconditions.checkArgument(sendChecksum,
"If verifying checksum, currently must also send it.");
}

final Replica replica;
final long replicaVisibleLength = blockLength;
;

// transferToFully() fails on 32 bit platforms for block sizes >=
// 2GB,
// use normal transfer in those cases
this.transferToAllowed = transferToAllowed;

/*
* (corruptChecksumOK, meta_file_exist): operation True, True: will
* verify checksum True, False: No verify, e.g., need to read data
* from a corrupted file False, True: will verify checksum False,
* False: throws IOException file not found
*/
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
final InputStream metaIn = metadataIn;
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
// need checksum but meta-data not found
throw new FileNotFoundException(
"Meta-data not found for " + block);
}

checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));

// read and handle the common header here. For now just a
// version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(checksumIn);
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version
+ ") for metadata file for " + block
+ " ignoring ...");
}
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
}
}
if (csum == null) {
// The number of bytes per checksum here determines the
// alignment
// of reads: we always start reading at a checksum chunk
// boundary,
// even if the checksum type is NULL. So, choosing too big of a
// value
// would risk sending too much unnecessary data. 512 (1 disk
// sector)
// is likely to result in minimal extra IO.
csum = DataChecksum
.newDataChecksum(DataChecksum.Type.NULL, 512);
}

/*
* If chunkSize is very large, then the metadata file is mostly
* corrupted. For now just truncate bytesPerchecksum to blockLength.
*/
int size = csum.getBytesPerChecksum();
if (size > 10 * 1024 * 1024 && size > replicaVisibleLength) {
csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
Math.max((int) replicaVisibleLength, 10 * 1024 * 1024));
size = csum.getBytesPerChecksum();
}
chunkSize = size;
checksum = csum;
checksumSize = checksum.getChecksumSize();
length = length < 0 ? replicaVisibleLength : length;

// end is either last byte on disk or the length for which we have a
// checksum
ChunkChecksum chunkChecksum = null;
long end = chunkChecksum != null ? chunkChecksum.getDataLength()
: blockLength;
if (startOffset < 0 || startOffset > end
|| (length + startOffset) > end) {
String msg = " Offset " + startOffset + " and length " + length
+ " don't match block " + block + " ( blockLen " + end
+ " )";
LOG.warn(this.getClass().getName() + ":sendBlock() : " + msg);
throw new IOException(msg);
}

// Ensure read offset is position at the beginning of chunk
offset = startOffset - (startOffset % chunkSize);
if (length >= 0) {
// Ensure endOffset points to end of chunk.
long tmpLen = startOffset + length;
if (tmpLen % chunkSize != 0) {
tmpLen += (chunkSize - tmpLen % chunkSize);
}
if (tmpLen < end) {
// will use on-disk checksum here since the end is a stable
// chunk
end = tmpLen;
} else if (chunkChecksum != null) {
// last chunk is changing. flag that we need to use
// in-memory checksum
this.lastChunkChecksum = chunkChecksum;
}
}
endOffset = end;

// seek to the right offsets
if (offset > 0) {
long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
// Should we use seek() for checksum file as well?
IOUtils.skipFully(checksumIn, checksumSkip);
}
}
seqno = 0;

blockIn = streamFactory.createStream(offset); // seek to offset
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream) blockIn).getFD();
} else {
blockInFd = null;
}
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
throw ioe;
}
}

/**
* close opened files.
*/
@Override
public void close() throws IOException {
if (blockInFd != null
&& ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) {
try {
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, offset
- lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Exception e) {
LOG.warn("Unable to drop cache on file close", e);
}
}
if (curReadahead != null) {
curReadahead.cancel();
}

IOException ioe = null;
if (checksumIn != null) {
try {
checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
}
if (blockIn != null) {
try {
blockIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
blockIn = null;
blockInFd = null;
}
// throw IOException if there is any
if (ioe != null) {
throw ioe;
}
}

/**
* Wait for rbw replica to reach the length
*
* @param rbw
*            replica that is being written to
* @param len
*            minimum length to reach
* @throws IOException
*             on failing to reach the len in given wait time
*/
private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
throws IOException {
// Wait for 3 seconds for rbw replica to reach the minimum length
for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
long bytesOnDisk = rbw.getBytesOnDisk();
if (bytesOnDisk < len) {
throw new IOException(String.format(
"Need %d bytes, but only %d bytes available", len,
bytesOnDisk));
}
}

/**
* Converts an IOExcpetion (not subclasses) to SocketException. This is
* typically done to indicate to upper layers that the error was a socket
* error rather than often more serious exceptions like disk errors.
*/
private static IOException ioeToSocketException(IOException ioe) {
if (ioe.getClass().equals(IOException.class)) {
// "se" could be a new class in stead of SocketException.
IOException se = new SocketException("Original Exception : " + ioe);
se.initCause(ioe);
/*
* Change the stacktrace so that original trace is not truncated
* when printed.
*/
se.setStackTrace(ioe.getStackTrace());
return se;
}
// otherwise just return the same exception.
return ioe;
}

/**
* @param datalen
*            Length of data
* @return number of chunks for data of given size
*/
private int numberOfChunks(long datalen) {
return (int) ((datalen + chunkSize - 1) / chunkSize);
}

/**
* Sends a packet with up to maxChunks chunks of data.
*
* @param pkt
*            buffer used for writing packet data
* @param maxChunks
*            maximum number of chunks to send
* @param out
*            stream to send data to
* @param transferTo
*            use transferTo to send data
* @param throttler
*            used for throttling data transfer bandwidth
*/
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler)
throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));

int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in
// the packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;

// The packet buffer is organized as follows:
// _______HHHHCCCCD?D?D?D?
// ^ ^
// | \ checksumOff
// \ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.

int headerLen = writePacketHeader(pkt, dataLen, packetLen);

// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;

int checksumOff = pkt.position();
byte[] buf = pkt.array();

if (checksumSize > 0 && checksumIn != null) {
readChecksum(buf, checksumOff, checksumDataLen);

// write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();

if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start,
checksumSize);
}
}
}

int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen);

if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
}
}

try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream) out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);

// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream) blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
/*
* writing to client timed out. This happens if the client reads
* part of a block and then decides not to read the rest (but
* leaves the socket open).
*/
LOG.info("exception: ", e);
} else {
/*
* Exception while writing to the client. Connection closure
* from the other end is mostly the case and we do not care much
* about it. But other things can go wrong, especially in
* transferTo(), which we do not want to ignore.
*
* The message parsing below should not be considered as a good
* coding example. NEVER do it to drive a program logic. NEVER.
* It was done here because the NIO throws an IOException for
* EPIPE.
*/
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe")
&& !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
}
throw ioeToSocketException(e);
}

if (throttler != null) { // rebalancing so throttle
throttler.throttle(packetLen);
}

return dataLen;
}

/**
* Read checksum into given buffer
*
* @param buf
*            buffer to read the checksum into
* @param checksumOffset
*            offset at which to write the checksum into buf
* @param checksumLen
*            length of checksum to write
* @throws IOException
*             on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}

/**
* Compute checksum for chunks and verify the checksum that is read from the
* metadata file is correct.
*
* @param buf
*            buffer that has checksum and data
* @param dataOffset
*            position where data is written in the buf
* @param datalen
*            length of data
* @param numChunks
*            number of chunks corresponding to data
* @param checksumOffset
*            offset where checksum is written in the buf
* @throws ChecksumException
*             on failed checksum verification
*/
public void verifyChecksum(final byte[] buf, final int dataOffset,
final int datalen, final int numChunks, final int checksumOffset)
throws ChecksumException {
int dOff = dataOffset;
int cOff = checksumOffset;
int dLeft = datalen;

for (int i = 0; i < numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, chunkSize);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + datalen - dLeft;
throw new ChecksumException("Checksum failed at " + failedPos,
failedPos);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}

/**
* sendBlock() is used to read block and its metadata and stream the data to
* either a client or to another datanode.
*
* @param out
*            stream to which the block is written to
* @param baseStream
*            optional. if non-null, <code>out</code> is assumed to be a
*            wrapper over this stream. This enables optimizations for
*            sending the data, e.g.
*            {@link SocketOutputStream#transferToFully(FileChannel, long, int)}
*            .
* @param throttler
*            for sending data.
* @return total bytes read, including checksum data.
*/
public long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
if (out == null) {
throw new IOException("out stream is null");
}
initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;

lastCacheDropOffset = initialOffset;

if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}

// Trigger readahead of beginning of file if configured.
manageOsCache();

final long startTime = ClientTraceLog.isInfoEnabled() ? System
.nanoTime() : 0;
try {
int maxChunksPerPacket;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream) blockIn)
.getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

// Smaller packet size to only hold checksum when doing
// transferTo
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}

ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);

while (endOffset > offset
&& !Thread.currentThread().isInterrupted()) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket,
streamForSendChunks, transferTo, throttler);
offset += len;
totalRead += len + (numberOfChunks(len) * checksumSize);
seqno++;
}
// If this thread was interrupted, then it did not send the full
// block.
if (!Thread.currentThread().isInterrupted()) {
try {
// send an empty packet to mark the end of the block
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
out.flush();
} catch (IOException e) { // socket error
throw ioeToSocketException(e);
}

sentEntireByteRange = true;
}
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();
}
return totalRead;
}

/**
* Manage the OS buffer cache by performing read-ahead and drop-behind.
*/
private void manageOsCache() throws IOException {
// We can't manage the cache for this block if we don't have a file
// descriptor to work with.
if (blockInFd == null)
return;

// Drop what we've just read from cache, since we aren't
// likely to need it again
if (dropCacheBehindAllReads
|| (dropCacheBehindLargeReads && isLongRead())) {
long nextCacheDropOffset = lastCacheDropOffset
+ CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, dropLength,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
}
}
}

/**
* Returns true if we have done a long enough read for this block to qualify
* for the DataNode-wide cache management defaults. We avoid applying the
* cache management defaults to smaller reads because the overhead would be
* too high.
*
* Note that if the client explicitly asked for dropBehind, we will do it
* even on short reads.
*
* This is also used to determine when to invoke
* posix_fadvise(POSIX_FADV_SEQUENTIAL).
*/
private boolean isLongRead() {
return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
}

/**
* Write packet header into {@code pkt}, return the length of the header
* written.
*/
private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
// both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen, false);

int size = header.getSerializedSize();
pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
header.putInBuffer(pkt);
return size;
}

boolean didSendEntireByteRange() {
return sentEntireByteRange;
}

/**
* @return the checksum type that will be used with this block transfer.
*/
DataChecksum getChecksum() {
return checksum;
}

/**
* @return the offset into the block file where the sender is currently
*         reading.
*/
long getOffset() {
return offset;
}

public static interface InputStreamFactory {
public InputStream createStream(long offset) throws IOException;
}

private static class BlockInputStreamFactory implements InputStreamFactory {
private final ExtendedBlock block;
private final FsDatasetSpi data;

private BlockInputStreamFactory(ExtendedBlock block, FsDatasetSpi data) {
this.block = block;
this.data = data;
}

@Override
public InputStream createStream(long offset) throws IOException {
return data.getBlockInputStream(block, offset);
}
}
}


第二:重写DFSck,新的类为

HDFS-RAID调用HDFS自身提供的DFSck检查corrupt的Block,然后执行修复。但在2.2.0版本的DFSck中存在一个小的缺缺陷:

/**
* Derive the namenode http address from the current file system,
* either default or as set by "-fs" in the generic options.
* @return Returns http address or null if failure.
* @throws IOException if we can't determine the active NN address
*/
private String getCurrentNamenodeAddress() throws IOException {
//String nnAddress = null;
Configuration conf = getConf();

//get the filesystem object to verify it is an HDFS system
FileSystem fs;
try {
fs = FileSystem.get(conf);
} catch (IOException ioe) {
System.err.println("FileSystem is inaccessible due to:\n"
+ StringUtils.stringifyException(ioe));
return null;
}
if (!(fs instanceof DistributedFileSystem)) {
System.err.println("FileSystem is " + fs.getUri());
return null;
}

return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf, false);
}


上面代码存在问题:就是fs只能是DistributedFileSystem时,DFSck才能正常工作,而在HDFS-RAID的实现上启用了对DistributedFileSystem的封装RaidDistributedFileSystem,它继承FilterFileSystem,包含了一个对HDFS自带的DistributedFileSystem的引用。这里DFSck实现上比较死,很多方法为private不支持通过继承的手段达到与HDFS-RAID的兼容,所以只能模范从写一个新类:

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.security.PrivilegedExceptionAction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* In order to be compatible with HDFS-RAID, we should modify <code>DFSck<code>.
* We just modify the method <code>DFSck.getCurrentNamenodeAddress()<code>.
*
* @author Deng Changchun
* @version 2
* @since 2014-05-24
*/
@InterfaceAudience.Private
public class FilterDFSck extends Configured implements Tool {
static{
HdfsConfiguration.init();
}

private static final String USAGE = "Usage: DFSck <path> "
+ "[-list-corruptfileblocks | "
+ "[-move | -delete | -openforwrite] "
+ "[-files [-blocks [-locations | -racks]]]]\n"
+ "\t<path>\tstart checking from this path\n"
+ "\t-move\tmove corrupted files to /lost+found\n"
+ "\t-delete\tdelete corrupted files\n"
+ "\t-files\tprint out files being checked\n"
+ "\t-openforwrite\tprint out files opened for write\n"
+ "\t-list-corruptfileblocks\tprint out list of missing "
+ "blocks and files they belong to\n"
+ "\t-blocks\tprint out block report\n"
+ "\t-locations\tprint out locations for every block\n"
+ "\t-racks\tprint out network topology for data-node locations\n"
+ "\t\tBy default fsck ignores files opened for write, "
+ "use -openforwrite to report such files. They are usually "
+ " tagged CORRUPT or HEALTHY depending on their block "
+ "allocation status";

private final UserGroupInformation ugi;
private final PrintStream out;

/**
* Filesystem checker.
* @param conf current Configuration
*/
public FilterDFSck(Configuration conf) throws IOException {
this(conf, System.out);
}

public FilterDFSck(Configuration conf, PrintStream out) throws IOException {
super(conf);
this.ugi = UserGroupInformation.getCurrentUser();
this.out = out;
}

/**
* Print fsck usage information
*/
static void printUsage(PrintStream out) {
out.println(USAGE + "\n");
ToolRunner.printGenericCommandUsage(out);
}
/**
* @param args
*/
@Override
public int run(final String[] args) throws IOException {
if (args.length == 0) {
printUsage(System.err);
return -1;
}

try {
return UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws Exception {
return doWork(args);
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}

/*
* To get the list, we need to call iteratively until the server says
* there is no more left.
*/
private Integer listCorruptFileBlocks(String dir, String baseUrl)
throws IOException {
int errCode = -1;
int numCorrupt = 0;
int cookie = 0;
final String noCorruptLine = "has no CORRUPT files";
final String noMoreCorruptLine = "has no more CORRUPT files";
final String cookiePrefix = "Cookie:";
boolean allDone = false;
while (!allDone) {
final StringBuffer url = new StringBuffer(baseUrl);
if (cookie > 0) {
url.append("&startblockafter=").append(String.valueOf(cookie));
}
URL path = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
InputStream stream = connection.getInputStream();
BufferedReader input = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
try {
String line = null;
while ((line = input.readLine()) != null) {
if (line.startsWith(cookiePrefix)){
try{
cookie = Integer.parseInt(line.split("\t")[1]);
} catch (Exception e){
allDone = true;
break;
}
continue;
}
if ((line.endsWith(noCorruptLine)) ||
(line.endsWith(noMoreCorruptLine)) ||
(line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
allDone = true;
break;
}
if ((line.isEmpty())
|| (line.startsWith("FSCK started by"))
|| (line.startsWith("The filesystem under path")))
continue;
numCorrupt++;
if (numCorrupt == 1) {
out.println("The list of corrupt files under path '"
+ dir + "' are:");
}
out.println(line);
}
} finally {
input.close();
}
}
out.println("The filesystem under path '" + dir + "' has "
+ numCorrupt + " CORRUPT files");
if (numCorrupt == 0)
errCode = 0;
return errCode;
}

/**
* Derive the namenode http address from the current file system,
* either default or as set by "-fs" in the generic options.
* @return Returns http address or null if failure.
* @throws IOException if we can't determine the active NN address
*/
private String getCurrentNamenodeAddress() throws IOException {
// String nnAddress = null;
Configuration conf = getConf();

// get the filesystem object to verify it is an HDFS system
FileSystem fs;
try {
fs = FileSystem.get(conf);
} catch (IOException ioe) {
System.err.println("FileSystem is inaccessible due to:\n"
+ StringUtils.stringifyException(ioe));
return null;
}
// We extend the DFSck to adapt to FilterFileSystem.
// --Begin--
fs = fs instanceof FilterFileSystem ? ((FilterFileSystem) fs)
.getRawFileSystem() : fs;
// --End--
if (!(fs instanceof DistributedFileSystem)) {
System.err.println("FileSystem is " + fs.getUri());
return null;
}

return DFSUtil
.getInfoServer(HAUtil.getAddressOfActive(fs), conf, false);
}

private int doWork(final String[] args) throws IOException {
final StringBuilder url = new StringBuilder(HttpConfig.getSchemePrefix());

String namenodeAddress = getCurrentNamenodeAddress();
if (namenodeAddress == null) {
//Error message already output in {@link #getCurrentNamenodeAddress()}
System.err.println("DFSck exiting.");
return 0;
}
url.append(namenodeAddress);
System.err.println("Connecting to namenode via " + url.toString());

url.append("/fsck?ugi=").append(ugi.getShortUserName());
String dir = null;
boolean doListCorruptFileBlocks = false;
for (int idx = 0; idx < args.length; idx++) {
if (args[idx].equals("-move")) { url.append("&move=1"); }
else if (args[idx].equals("-delete")) { url.append("&delete=1"); }
else if (args[idx].equals("-files")) { url.append("&files=1"); }
else if (args[idx].equals("-openforwrite")) { url.append("&openforwrite=1"); }
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
else if (args[idx].equals("-list-corruptfileblocks")) {
url.append("&listcorruptfileblocks=1");
doListCorruptFileBlocks = true;
} else if (!args[idx].startsWith("-")) {
if (null == dir) {
dir = args[idx];
} else {
System.err.println("fsck: can only operate on one path at a time '"
+ args[idx] + "'");
printUsage(System.err);
return -1;
}
} else {
System.err.println("fsck: Illegal option '" + args[idx] + "'");
printUsage(System.err);
return -1;
}
}
if (null == dir) {
dir = "/";
}
url.append("&path=").append(URLEncoder.encode(dir, "UTF-8"));
if (doListCorruptFileBlocks) {
return listCorruptFileBlocks(dir, url.toString());
}
URL path = new URL(url.toString());
URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
InputStream stream = connection.getInputStream();
BufferedReader input = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
String line = null;
String lastLine = null;
int errCode = -1;
try {
while ((line = input.readLine()) != null) {
out.println(line);
lastLine = line;
}
} finally {
input.close();
}
if (lastLine.endsWith(NamenodeFsck.HEALTHY_STATUS)) {
errCode = 0;
} else if (lastLine.endsWith(NamenodeFsck.CORRUPT_STATUS)) {
errCode = 1;
} else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
errCode = 0;
}
return errCode;
}

public static void main(String[] args) throws Exception {
// -files option is also used by GenericOptionsParser
// Make sure that is not the first argument for fsck
int res = -1;
if ((args.length == 0) || ("-files".equals(args[0]))) {
printUsage(System.err);
ToolRunner.printGenericCommandUsage(System.err);
} else if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
res = 0;
} else {
res = ToolRunner.run(new FilterDFSck(new HdfsConfiguration()), args);
}
System.exit(res);
}
}


第三:修改RaidDFSUtil以及bin/hdfs和bin/haodop

HDFS-RAID通过RaidDFSUtil的方法getCorruptFiles查询corrupt文件,所以要把之前调用DFScK改为调用重写后的FilterDFScK,修改代码如下:

//    DFSck fsck = new DFSck(conf, out);
FilterDFSck fsck = new FilterDFSck(conf, out);
String[] args = new String[]{"-list-corruptfileblocks"};
try {
ToolRunner.run(fsck, args);
} catch (Exception e) {
//      throw new IOException("DFSck.run exception ", e);
throw new IOException("FilterDFSck.run exception ", e);
}


为了使得部署了HDFS-RAID后的Hadoop能继续使用fsck功能,我们需要修改bin/hdfs和bin/hadoop两个脚本中对fsck的调用类,修改如下:

elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.FilterDFSck
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"


完成上面修改后,基本把HDFS-RAID迁移到了Hadoop-2.2.0上。(不知如何上传附件,索要相关文件,请联系2323162333)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: