Mapreduce如何实现自己的InputFormat
2011-09-30 16:18
211 查看
在mapreduce程序运行的开始阶段,hadoop需要将待处理的输入文件进行分割,按预定义的格式对文件读取等操作,这些操作都在InputFormat中进行。主要工作有以下3个:
1.Validatetheinput-specificationofthejob.
2.Split-uptheinputfile(s)intologicalInputSplits,eachofwhichisthenassignedtoanindividualMapper.
3.Providethe
InputFormat是一个抽象类,他含有getSplits()和createRecordReader()抽象方法,在子类中必须被实现。这两个就是InputFormat的基本方法。getSplits()确定输入对象的切分原则,而createRecordReader()则可以按一定格式读取相应数据。通常默认情况下,不是直接实现InputFormat类,而是直接继承FileInputFormat类,这个类提供了很多对文件操作的方法,其中比较常用的就是isSpiitable()方法,该方法决定该文件是否进行分片操作。另外还有就是createRecordReader方法,该方法是为文件的分片定制一个recordreader,可以根据自己的需求来进行定制,只需要重写该函数。
下面我就以http://developer.yahoo.com/hadoop/tutorial/module5.html#types中的例子来实现自己的MyInputFormat,根据自己的需求定制自己的InputFormat。
比如数据格式如下:
下面我们以这样的一种形式读取数据,分割每一行数据,前面的比如ball作为key,后面的3个浮点数读入到Point3D对象中,那么该如何实现呢?以下是我在学习过程中的实现。
首先是对Point3D数据类型的定制,首先定制数据类型为了能够在网络中以流的形式进行传输,必须实现Writable接口,同时在mapreduce编程的过程中,需要根据key来对数据进行排序与分区,所以必须实现Writable接口,因此就实现了一个更加高级的接口Writableomprable,同时可以满足上面两个要求。
其次就是定制自己的Fortmat了
测试的时候写的map函数,没有reudce,必须要在后面设置job的时候设置reduce的个数为0,job.setNumReduceTasks(0)。
1.Validatetheinput-specificationofthejob.
2.Split-uptheinputfile(s)intologicalInputSplits,eachofwhichisthenassignedtoanindividualMapper
3.Providethe
RecordReaderimplementationtobeusedtogleaninputrecordsfromthelogical
InputSplitforprocessingbythe
Mapper.
InputFormat是一个抽象类,他含有getSplits()和createRecordReader()抽象方法,在子类中必须被实现。这两个就是InputFormat的基本方法。getSplits()确定输入对象的切分原则,而createRecordReader()则可以按一定格式读取相应数据。通常默认情况下,不是直接实现InputFormat类,而是直接继承FileInputFormat类,这个类提供了很多对文件操作的方法,其中比较常用的就是isSpiitable()方法,该方法决定该文件是否进行分片操作。另外还有就是createRecordReader方法,该方法是为文件的分片定制一个recordreader,可以根据自己的需求来进行定制,只需要重写该函数。
下面我就以
比如数据格式如下:
ball3.5,12.7,9.0
car15,23.76,42.23
device0.0,12.4,-67.1
下面我们以这样的一种形式读取数据,分割每一行数据,前面的比如ball作为key,后面的3个浮点数读入到Point3D对象中,那么该如何实现呢?以下是我在学习过程中的实现。
首先是对Point3D数据类型的定制,首先定制数据类型为了能够在网络中以流的形式进行传输,必须实现Writable接口,同时在mapreduce编程的过程中,需要根据key来对数据进行排序与分区,所以必须实现Writable接口,因此就实现了一个更加高级的接口Writableomprable,同时可以满足上面两个要求。
importjava.io.DataInput;
importjava.io.DataOutput;
importjava.io.IOException;
importorg.apache.hadoop.io.WritableComparable;
publicclassPoint3DimplementsWritableComparable{
publicfloatx;
publicfloaty;
publicfloatz;
publicPoint3D(floatx,floaty,floatz){
super();
this.x=x;
this.y=y;
this.z=z;
}
publicPoint3D(){
this(0.0f,0.0f,0.0f);
}
publicvoidset(floatx,floaty,floatz){
this.x=x;
this.y=y;
this.z=z;
}
@Override
publicvoidreadFields(DataInputin)throwsIOException{
//TODOAuto-generatedmethodstub
x=in.readFloat();
y=in.readFloat();
z=in.readFloat();
}
@Override
publicvoidwrite(DataOutputout)throwsIOException{
//TODOAuto-generatedmethodstub
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}
publicfloatdistanceFromOrigin(){
return(float)Math.sqrt(x*x+y*y+z*z);
}
@Override
publicbooleanequals(Objectobj){
//TODOAuto-generatedmethodstub
if(!(objinstanceofPoint3D))
returnfalse;
Point3Dother=(Point3D)obj;
returnthis.x==other.x&&this.y==other.y&&this.z==other.z;
}
@Override
publicinthashCode(){
//TODOAuto-generatedmethodstub
returnFloat.floatToIntBits(x)
^Float.floatToIntBits(y)
^Float.floatToIntBits(z);
}
@Override
publicStringtoString(){
//TODOAuto-generatedmethodstub
returnFloat.toString(x)+","+Float.toString(y)+","+Float.toString(z);
}
@Override
publicintcompareTo(Objectot){
//TODOAuto-generatedmethodstub
Point3Dother=(Point3D)ot;
floatmyDistance=this.distanceFromOrigin();
floatotherDistance=other.distanceFromOrigin();
returnFloat.compare(myDistance,otherDistance);
}
}
其次就是定制自己的Fortmat了
importjava.io.IOException;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
importorg.apache.hadoop.util.LineReader;
publicclassMyInputFormatextendsFileInputFormat<Text,Point3D>{
@Override
protectedbooleanisSplitable(JobContextcontext,Pathfilename){
//TODOAuto-generatedmethodstub
returnfalse;
}
@Override
publicRecordReader<Text,Point3D>createRecordReader(InputSplitinputsplit,
TaskAttemptContextcontext)throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
returnnewobjPosRecordReader();
}
publicstaticclassobjPosRecordReaderextendsRecordReader<Text,Point3D>{
publicLineReaderin;
publicTextlineKey;
publicPoint3DlineValue;
publicStringTokenizertoken=null;
publicTextline;
@Override
publicvoidclose()throwsIOException{
//TODOAuto-generatedmethodstub
}
@Override
publicTextgetCurrentKey()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
System.out.println("key");
//lineKey.set(token.nextToken());
System.out.println("hello");
returnlineKey;
}
@Override
publicPoint3DgetCurrentValue()throwsIOException,
InterruptedException{
//TODOAuto-generatedmethodstub
returnlineValue;
}
@Override
publicfloatgetProgress()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
return0;
}
@Override
publicvoidinitialize(InputSplitinput,TaskAttemptContextcontext)
throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
FileSplitsplit=(FileSplit)input;
Configurationjob=context.getConfiguration();
Pathfile=split.getPath();
FileSystemfs=file.getFileSystem(job);
FSDataInputStreamfilein=fs.open(file);
in=newLineReader(filein,job);
line=newText();
lineKey=newText();
lineValue=newPoint3D();
}
@Override
publicbooleannextKeyValue()throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
intlinesize=in.readLine(line);
if(linesize==0)
returnfalse;
token=newStringTokenizer(line.toString());
String[]temp=newString[2];
if(token.hasMoreElements()){
temp[0]=token.nextToken();
if(token.hasMoreElements()){
temp[1]=token.nextToken();
}
}
System.out.println(temp[0]);
System.out.println(temp[1]);
String[]points=temp[1].split(",");
System.out.println(points[0]);
System.out.println(points[1]);
System.out.println(points[2]);
lineKey.set(temp[0]);
lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]),Float.parseFloat(points[2]));
System.out.println("pp");
returntrue;
}
}
}
测试的时候写的map函数,没有reudce,必须要在后面设置job的时候设置reduce的个数为0,job.setNumReduceTasks(0)。
importjava.io.IOException;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Mapper;
publicclassTestMapperextendsMapper<Text,Point3D,Text,Point3D>{
@Override
protectedvoidmap(Textkey,Point3Dvalue,
org.apache.hadoop.mapreduce.Mapper.Contextcontext)
throwsIOException,InterruptedException{
//TODOAuto-generatedmethodstub
context.write(key,value);
}
}
importjava.io.IOException;
importjava.net.URI;
importjavax.xml.soap.Text;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclassTestMyInputFormat{
/**
*@paramargs
*@throwsIOException
*@throwsClassNotFoundException
*@throwsInterruptedException
*/
publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{
//TODOAuto-generatedmethodstub
System.out.println("nihao");
Jobjob=newJob();
Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(URI.create(args[1]),conf);
fs.delete(newPath(args[1]));
job.setJobName("测试MyInputFormat程序。。。。。");
FileInputFormat.addInputPath(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));
job.setInputFormatClass(MyInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Point3D.class);
job.setMapperClass(TestMapper.class);
job.setNumReduceTasks(0);
job.waitForCompletion(false);
}
}
相关文章推荐
- [C/C++标准库]_[初级]_[如何实现std::string自己的Format(sprintf)函数]
- [C/C++标准库]_[初级]_[如何实现std::string自己的Format(sprintf)函数]
- MapReduce小文件处理之CombineFileInputFormat实现
- MapReduce小文件处理之CombineFileInputFormat实现
- hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件
- Mapreduce中的RCFile输入RCFileInputFormat实现及其应用
- hadoop学习;自己定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- 如何实现通过自己的程序发送邮件
- 如何用一天时间实现自己的RPC框架
- MapReduce InputFormat之FileInputFormat
- 如何在自己的网页中实现调用百度搜索引擎
- Windows 2000下如何在自己的程序中实现关机!
- 如何实现自己的ClassLoader
- MapReduce的InputFormat过程的学习
- Android应用如何监听自己是否被卸载及卸载反馈功能的实现
- MapReduce的SequenceFileInputFormat使用
- 自己写的,用onpropertychange 和 oninput时间实现对文本框的实时触发
- Android应用如何监听自己是否被卸载及卸载反馈功能的实现(第三版)
- Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)