您的位置:首页 > 编程语言

MapReduce高级编程之自定义InputFormat

2013-03-21 18:21 316 查看
转载地址:http://datamining.xmu.edu.cn/bbs/home.php?mod=space&uid=91&do=blog&id=190

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:

public interface InputFormat<K, V> {

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit split,

JobConf job,

Reporter reporter) throws IOException;

}

这两个方法有分别完成着以下工作:

方法getSplits将输入数据切分成splits,splits的个数即为map
tasks的个数,splits的大小默认为块大小,即64M
方法getSplits将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:

InputFile --> splits --> <K,V>

系统常用的 InputFormat 又有哪些呢?




其中TextInputFormat便是最常用的,它的<K,V>就代表<行偏移,该行内容>

然而系统所提供的这几种固定的将 InputFile转换为<K,V>的方式有时候并不能满足我们的需求:

此时需要我们自定义 InputFormat ,从而使Hadoop框架按照我们预设的方式来将

InputFile解析为<K,V>

在领会自定义 InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class),
TextInputFormat(class),
RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.getRecordReader calls LineRecordReader
LineRecordReader implements RecordReader

对于InputFormat接口,上面已经有详细的描述

再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records,
再依次将record解析成<K,V>对"),该方法实现了接口RecordReader

public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;

K createKey();

V createValue();

long getPos() throws IOException;

public void close() throws IOException;

float getProgress() throws IOException;

}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

示例,数据每一行为 “物体,x坐标,y坐标,z坐标”

ball 3.5,12.7,9.0

car 15,23.76,42.23

device 0.0,12.4,-67.1

每一行将要被解析为<Text,
Point3D>(Point3D是我们在上一篇日志中自定义的数据类型)



方式一,自定义的RecordReader使用中LineRecordReader,

public class ObjectPositionInputFormat extends

FileInputFormat<Text, Point3D> {

public RecordReader<Text,
Point3D> getRecordReader(

InputSplit input, JobConf job, Reporter reporter)

throws IOException
{

reporter.setStatus(input.toString());

return new ObjPosRecordReader(job,
(FileSplit)input);

}

}

class ObjPosRecordReader implements RecordReader<Text,
Point3D> {

private LineRecordReader
lineReader;

private LongWritable
lineKey;

private Text
lineValue;

public ObjPosRecordReader(JobConf
job, FileSplit split) throws IOException
{

lineReader = new LineRecordReader(job,
split);

lineKey = lineReader.createKey();

lineValue = lineReader.createValue();

}

public boolean next(Text
key, Point3D value) throws IOException
{

// get the next line

if (!lineReader.next(lineKey,
lineValue)) {

return false;

}

// parse the lineValue which is in
the format:

// objName, x, y, z

String [] pieces = lineValue.toString().split(",");

if (pieces.length
!= 4) {

throw new IOException("Invalid
record received");

}

// try to parse floating point components
of value

float fx,
fy, fz;

try {

fx = Float.parseFloat(pieces[1].trim());

fy = Float.parseFloat(pieces[2].trim());

fz = Float.parseFloat(pieces[3].trim());

} catch (NumberFormatException
nfe) {

throw new IOException("Error
parsing floating point value in record");

}

// now that we know we'll succeed, overwrite
the output objects

key.set(pieces[0].trim()); // objName
is the output key.

value.x = fx;

value.y = fy;

value.z = fz;

return true;

}

public Text createKey()
{

return new Text("");

}

public Point3D createValue()
{

return new Point3D();

}

public long getPos() throws IOException
{

return lineReader.getPos();

}

public void close() throws IOException
{

lineReader.close();

}

public float getProgress() throws IOException
{

return lineReader.getProgress();

}

}

方式二:自定义的RecordReader中使用LineReader,

public class ObjectPositionInputFormat extends FileInputFormat<Text,
Point3D> {

@Override

protected boolean isSplitable(JobContext
context, Path filename) {

// TODO Auto-generated
method stub

return false;

}

@Override

public RecordReader<Text,
Point3D> createRecordReader(InputSplit
inputsplit,

TaskAttemptContext context) throws IOException,
InterruptedException {

// TODO Auto-generated
method stub

return new objPosRecordReader();

}

public static class objPosRecordReader extends RecordReader<Text,Point3D>{

public LineReader
in;

public Text
lineKey;

public Point3D
lineValue;

public StringTokenizer
token=null;


public Text
line;

@Override

public void close() throws IOException
{

// TODO Auto-generated
method stub

}

@Override

public Text getCurrentKey() throws IOException,
InterruptedException {

// TODO Auto-generated
method stub

System.out.println("key");

//lineKey.set(token.nextToken());

System.out.println("hello");

return lineKey;

}

@Override

public Point3D getCurrentValue() throws IOException,

InterruptedException {

// TODO Auto-generated
method stub

return lineValue;

}

@Override

public float getProgress() throws IOException,
InterruptedException {

// TODO Auto-generated
method stub

return 0;

}

@Override

public void initialize(InputSplit
input, TaskAttemptContext context)

throws IOException,
InterruptedException {

// TODO Auto-generated
method stub

FileSplit split=(FileSplit)input;

Configuration job=context.getConfiguration();

Path file=split.getPath();

FileSystem fs=file.getFileSystem(job);



FSDataInputStream filein=fs.open(file);

in=new LineReader(filein,job);



line=new Text();

lineKey=new Text();

lineValue=new Point3D();


}

@Override

public boolean nextKeyValue() throws IOException,
InterruptedException {

// TODO Auto-generated
method stub

int linesize=in.readLine(line);

if(linesize==0)

return false;

token=new StringTokenizer(line.toString());

String []temp=new String[2];

if(token.hasMoreElements()){

temp[0]=token.nextToken();

if(token.hasMoreElements()){

temp[1]=token.nextToken();

}

}

System.out.println(temp[0]);

System.out.println(temp[1]);

String []points=temp[1].split(",");

System.out.println(points[0]);

System.out.println(points[1]);

System.out.println(points[2]);

lineKey.set(temp[0]);

lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]), Float.parseFloat(points[2]));

System.out.println("pp");

return true;

}

}

}

从以上可以看出,自定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader,而在其中可能会到LineReader/LineRecordReader/KeyValueLineRecordReader类

因此,要自定义InputFormat,这三个类的源码就必须很熟悉~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: