您的位置:首页 > 其它

Mapreduce如何实现自己的InputFormat

2011-09-30 16:18 211 查看
在mapreduce程序运行的开始阶段,hadoop需要将待处理的输入文件进行分割,按预定义的格式对文件读取等操作,这些操作都在InputFormat中进行。主要工作有以下3个:

1.Validatetheinput-specificationofthejob.

2.Split-uptheinputfile(s)intologicalInputSplits,eachofwhichisthenassignedtoanindividual
Mapper
.

3.Providethe
RecordReader
implementationtobeusedtogleaninputrecordsfromthelogical
InputSplit
forprocessingbythe
Mapper
.

InputFormat是一个抽象类,他含有getSplits()和createRecordReader()抽象方法,在子类中必须被实现。这两个就是InputFormat的基本方法。getSplits()确定输入对象的切分原则,而createRecordReader()则可以按一定格式读取相应数据。通常默认情况下,不是直接实现InputFormat类,而是直接继承FileInputFormat类,这个类提供了很多对文件操作的方法,其中比较常用的就是isSpiitable()方法,该方法决定该文件是否进行分片操作。另外还有就是createRecordReader方法,该方法是为文件的分片定制一个recordreader,可以根据自己的需求来进行定制,只需要重写该函数。

下面我就以http://developer.yahoo.com/hadoop/tutorial/module5.html#types中的例子来实现自己的MyInputFormat,根据自己的需求定制自己的InputFormat。

比如数据格式如下:

ball3.5,12.7,9.0
car15,23.76,42.23
device0.0,12.4,-67.1


下面我们以这样的一种形式读取数据,分割每一行数据,前面的比如ball作为key,后面的3个浮点数读入到Point3D对象中,那么该如何实现呢?以下是我在学习过程中的实现。

首先是对Point3D数据类型的定制,首先定制数据类型为了能够在网络中以流的形式进行传输,必须实现Writable接口,同时在mapreduce编程的过程中,需要根据key来对数据进行排序与分区,所以必须实现Writable接口,因此就实现了一个更加高级的接口Writableomprable,同时可以满足上面两个要求。

importjava.io.DataInput;
importjava.io.DataOutput;
importjava.io.IOException;

importorg.apache.hadoop.io.WritableComparable;

publicclassPoint3DimplementsWritableComparable{
publicfloatx;
publicfloaty;
publicfloatz;
publicPoint3D(floatx,floaty,floatz){
super();
this.x=x;
this.y=y;
this.z=z;
}
publicPoint3D(){
this(0.0f,0.0f,0.0f);
}
publicvoidset(floatx,floaty,floatz){
this.x=x;
this.y=y;
this.z=z;
}
@Override
publicvoidreadFields(DataInputin)throwsIOException{
//TODOAuto-generatedmethodstub
x=in.readFloat();
y=in.readFloat();
z=in.readFloat();

}
@Override
publicvoidwrite(DataOutputout)throwsIOException{
//TODOAuto-generatedmethodstub
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);

}
publicfloatdistanceFromOrigin(){
return(float)Math.sqrt(x*x+y*y+z*z);

}
@Override
publicbooleanequals(Objectobj){
//TODOAuto-generatedmethodstub
if(!(objinstanceofPoint3D))
returnfalse;
Point3Dother=(Point3D)obj;
returnthis.x==other.x&&this.y==other.y&&this.z==other.z;

}
@Override
publicinthashCode(){
//TODOAuto-generatedmethodstub
returnFloat.floatToIntBits(x)
^Float.floatToIntBits(y)
^Float.floatToIntBits(z);
}
@Override
publicStringtoString(){
//TODOAuto-generatedmethodstub
returnFloat.toString(x)+","+Float.toString(y)+","+Float.toString(z);
}
@Override
publicintcompareTo(Objectot){
//TODOAuto-generatedmethodstub
Point3Dother=(Point3D)ot;
floatmyDistance=this.distanceFromOrigin();
floatotherDistance=other.distanceFromOrigin();
returnFloat.compare(myDistance,otherDistance);

}

}


其次就是定制自己的Fortmat了

importjava.io.IOException;

importjava.util.StringTokenizer;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
importorg.apache.hadoop.util.LineReader;

publicclassMyInputFormatextendsFileInputFormat<Text,Point3D>{

@Override
protectedbooleanisSplitable(JobContextcontext,Pathfilename){
//TODOAuto-generatedmethodstub
returnfalse;
}
@Override
publicRecordReader<Text,Point3D>createRecordReader(InputSplitinputsplit,
TaskAttemptContextcontext)throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
returnnewobjPosRecordReader();
}
publicstaticclassobjPosRecordReaderextendsRecordReader<Text,Point3D>{

publicLineReaderin;
publicTextlineKey;
publicPoint3DlineValue;
publicStringTokenizertoken=null;

publicTextline;

@Override
publicvoidclose()throwsIOException{
//TODOAuto-generatedmethodstub

}

@Override
publicTextgetCurrentKey()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
System.out.println("key");
//lineKey.set(token.nextToken());
System.out.println("hello");
returnlineKey;
}

@Override
publicPoint3DgetCurrentValue()throwsIOException,
InterruptedException{
//TODOAuto-generatedmethodstub

returnlineValue;
}

@Override
publicfloatgetProgress()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
return0;
}

@Override
publicvoidinitialize(InputSplitinput,TaskAttemptContextcontext)
throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
FileSplitsplit=(FileSplit)input;
Configurationjob=context.getConfiguration();
Pathfile=split.getPath();
FileSystemfs=file.getFileSystem(job);

FSDataInputStreamfilein=fs.open(file);
in=newLineReader(filein,job);

line=newText();
lineKey=newText();
lineValue=newPoint3D();

}

@Override
publicbooleannextKeyValue()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
intlinesize=in.readLine(line);
if(linesize==0)
returnfalse;

token=newStringTokenizer(line.toString());
String[]temp=newString[2];
if(token.hasMoreElements()){
temp[0]=token.nextToken();
if(token.hasMoreElements()){
temp[1]=token.nextToken();
}
}
System.out.println(temp[0]);
System.out.println(temp[1]);
String[]points=temp[1].split(",");
System.out.println(points[0]);
System.out.println(points[1]);
System.out.println(points[2]);
lineKey.set(temp[0]);
lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]),Float.parseFloat(points[2]));
System.out.println("pp");
returntrue;
}

}

}


测试的时候写的map函数,没有reudce,必须要在后面设置job的时候设置reduce的个数为0,job.setNumReduceTasks(0)。

importjava.io.IOException;

importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Mapper;

publicclassTestMapperextendsMapper<Text,Point3D,Text,Point3D>{

@Override
protectedvoidmap(Textkey,Point3Dvalue,
org.apache.hadoop.mapreduce.Mapper.Contextcontext)
throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
context.write(key,value);
}

}
importjava.io.IOException;
importjava.net.URI;

importjavax.xml.soap.Text;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclassTestMyInputFormat{

/**
*@paramargs
*@throwsIOException
*@throwsClassNotFoundException
*@throwsInterruptedException
*/
publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{
//TODOAuto-generatedmethodstub
System.out.println("nihao");
Jobjob=newJob();
Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(URI.create(args[1]),conf);
fs.delete(newPath(args[1]));
job.setJobName("测试MyInputFormat程序。。。。。");
FileInputFormat.addInputPath(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));
job.setInputFormatClass(MyInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Point3D.class);
job.setMapperClass(TestMapper.class);
job.setNumReduceTasks(0);
job.waitForCompletion(false);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: