您的位置:首页 > 运维架构

自定义hadoop map/reduce输入文件切割InputFormat

2012-12-04 18:22 309 查看
那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。




[/b] 1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {


public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {

return new SearchRecordReader("\b");



protected boolean isSplitable(FileSystem fs, Path filename) {

// 输入文件不分片

return false;




public class IsearchRecordReader extends RecordReader<LongWritable, Text> {

private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;

private long start;

private long pos;

private long end;

private LineReader in;

private int maxLineLength;

private LongWritable key = null;

private Text value = null;


private byte[] separator = {'\b'};

private int sepLength = 1;

public IsearchRecordReader(){


public IsearchRecordReader(String seps){

this.separator = seps.getBytes();

sepLength = separator.length;


public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {

FileSplit split = (FileSplit) genericSplit;

Configuration job = context.getConfiguration();

this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

this.start = split.getStart();

this.end = (this.start + split.getLength());

Path file = split.getPath();

this.compressionCodecs = new CompressionCodecFactory(job);

CompressionCodec codec = this.compressionCodecs.getCodec(file);

// open the file and seek to the start of the split

FileSystem fs = file.getFileSystem(job);

FSDataInputStream fileIn = fs.open(split.getPath());

boolean skipFirstLine = false;

if (codec != null) {

this.in = new LineReader(codec.createInputStream(fileIn), job);

this.end = Long.MAX_VALUE;

} else {

if (this.start != 0L) {

skipFirstLine = true;

this.start -= sepLength;



this.in = new LineReader(fileIn, job);


if (skipFirstLine) { // skip first line and re-establish "start".

int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));

if(newSize > 0){

start += newSize;



this.pos = this.start;


public boolean nextKeyValue() throws IOException {

if (this.key == null) {

this.key = new LongWritable();



if (this.value == null) {

this.value = new Text();


int newSize = 0;

while (this.pos < this.end) {

newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(

(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

if (newSize == 0) {



this.pos += newSize;

if (newSize < this.maxLineLength) {



LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));


if (newSize == 0) {


this.key = null;

this.value = null;

return false;



return true;


public LongWritable getCurrentKey() {

return this.key;


public Text getCurrentValue() {

return this.value;


public float getProgress() {

if (this.start == this.end) {

return 0.0F;


return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));


public synchronized void close() throws IOException {

if (this.in != null)





public class LineReader {


//private static final byte CR = 13;


//private static final byte LF = 10;


private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;

private int bufferSize = DEFAULT_BUFFER_SIZE;

private InputStream in;

private byte[] buffer;

private int bufferLength = 0;

private int bufferPosn = 0;

LineReader(InputStream in, int bufferSize) {

this.bufferLength = 0;

this.bufferPosn = 0;

this.in = in;

this.bufferSize = bufferSize;

this.buffer = new byte[this.bufferSize];


public LineReader(InputStream in, Configuration conf) throws IOException {

this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));


public void close() throws IOException {



public int readLine(Text str, int maxLineLength) throws IOException {

return readLine(str, maxLineLength, Integer.MAX_VALUE);


public int readLine(Text str) throws IOException {

return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);



public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{


Text record = new Text();

int txtLength = 0;

long bytesConsumed = 0L;

boolean newline = false;

int sepPosn = 0;

do {


if (this.bufferPosn >= this.bufferLength) {

bufferPosn = 0;

bufferLength = in.read(buffer);


if (bufferLength <= 0) {




int startPosn = this.bufferPosn;

for (; bufferPosn < bufferLength; bufferPosn ++) {


if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){

sepPosn = 0;



if (buffer[bufferPosn] == separator[sepPosn]) {

bufferPosn ++;

int i = 0;


for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){


if(bufferPosn + i >= bufferLength){

bufferPosn += i - 1;




if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){

sepPosn = 0;





if(sepPosn == sepLength){

bufferPosn += i;

newline = true;

sepPosn = 0;





int readLength = this.bufferPosn - startPosn;

bytesConsumed += readLength;


//int appendLength = readLength - newlineLength;

if (readLength > maxLineLength - txtLength) {

readLength = maxLineLength - txtLength;


if (readLength > 0) {

record.append(this.buffer, startPosn, readLength);

txtLength += readLength;



str.set(record.getBytes(), 0, record.getLength() - sepLength);



} while (!newline && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) {

throw new IOException("Too many bytes before newline: " + bytesConsumed);


return (int) bytesConsumed;




public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{


int txtLength = 0;

int newlineLength = 0;

boolean prevCharCR = false;

long bytesConsumed = 0L;

do {

int startPosn = this.bufferPosn;

if (this.bufferPosn >= this.bufferLength) {

startPosn = this.bufferPosn = 0;

if (prevCharCR) bytesConsumed ++;

this.bufferLength = this.in.read(this.buffer);

if (this.bufferLength <= 0) break;


for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {

if (this.buffer[this.bufferPosn] == LF) {

newlineLength = (prevCharCR) ? 2 : 1;

this.bufferPosn ++;



if (prevCharCR) {

newlineLength = 1;



prevCharCR = this.buffer[this.bufferPosn] == CR;


int readLength = this.bufferPosn - startPosn;

if ((prevCharCR) && (newlineLength == 0))


bytesConsumed += readLength;

int appendLength = readLength - newlineLength;

if (appendLength > maxLineLength - txtLength) {

appendLength = maxLineLength - txtLength;


if (appendLength > 0) {

str.append(this.buffer, startPosn, appendLength);

txtLength += appendLength; }


while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);

return (int)bytesConsumed;





内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息