MapReduce高级编程——自定义InputFormat
2014-12-03 09:20
323 查看
0、测试集样例
Java代码 ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1
1、测试Point3D InputFormat
Java代码
import java.io.IOException;
import java.net.URI;
import javax.xml.soap.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* desc:Custom Data Types <code>TestPoint3DInputFormat</code>
*
* @author chenwq
*/
public class TestPoint3DInputFormat {
/**
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
System.out.println("hello,chenwq!");
Job job=new Job();
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
fs.delete(new Path(args[1]));
job.setJobName("测试MyInputFormat程序。。。。。");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(Point3DinputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Point3D.class);
job.setMapperClass(Point3DMapper.class);
job.setNumReduceTasks(0);
job.waitForCompletion(false);
}
}
2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输
Java代码
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* desc:Custom Data Types <code>Point</code>
*
* @author chenwq
*/
public class Point3D implements WritableComparable {
public float x;
public float y;
public float z;
public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}
public Point3D() {
this(0.0f, 0.0f, 0.0f);
}
public void set(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}
public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}
public String toString() {
return Float.toString(x) + ", " + Float.toString(y) + ", "
+ Float.toString(z);
}
public float distanceFromOrigin() {
return (float) Math.sqrt(x * x + y * y + z * z);
}
public int compareTo(Object other) {
float myDistance = this.distanceFromOrigin();
float otherDistance = ((Point3D) other).distanceFromOrigin();
return Float.compare(myDistance, otherDistance);
}
public boolean equals(Object o) {
Point3D other = (Point3D) o;
if (!(other instanceof Point3D)) {
return false;
}
return this.x == other.x && this.y == other.y && this.z == other.z;
}
public int hashCode() {
return Float.floatToIntBits(x) ^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}
}
3、自定义Point3DInputFormat类型,供MapReduce编程模型使用
Java代码
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class Point3DinputFormat extends FileInputFormat<Text, Point3D> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}
@Override
public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new objPosRecordReader();
}
public static class objPosRecordReader extends RecordReader<Text,Point3D>{
public LineReader in;
public Text lineKey;
public Point3D lineValue;
public StringTokenizer token=null;
public Text line;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
//lineKey.set(token.nextToken());
return lineKey;
}
@Override
public Point3D getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void initialize(InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file);
in=new LineReader(filein,job);
line=new Text();
lineKey=new Text();
lineValue=new Point3D();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize=in.readLine(line);
if(linesize==0)
return false;
String[] pieces = line.toString().split(",");
if(pieces.length != 4){
throw new IOException("Invalid record received");
}
// try to parse floating point components of value
float fx, fy, fz;
try{
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
}catch(NumberFormatException nfe){
throw new IOException("Error parsing floating poing value in record");
}
lineKey.set(pieces[0]);
lineValue.set(fx, fy, fz);
return true;
}
}
}
4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer
Java代码
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{
protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{
context.write(key, value);
}
}
相关文章推荐
- MapReduce高级编程之自定义InputFormat
- [Hadoop] - 自定义Mapreduce InputFormat&OutputFormat
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- MapReduce高级编程之自定义InputFormat
- Hadoop MapReduce编程模型之InputFormat接口学习
- Mapreduce之自定义InputFormat-小文件合并
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- MapReduce高级编程——自定义InputFormat
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- MapReduce中的InputFormat(2)自定义
- MapReduce高级编程之自定义DataType
- 自定义MapReduce的InputFormat,提取指定开始与结束限定符间的内容
- [Hadoop] - 自定义Mapreduce InputFormat&OutputFormat
- Hadoop MapReduce处理海量小文件:自定义InputFormat和RecordReader
- 自定义MapReduce的InputFormat,提取指定开始与结束限定符间的内容
- hadoop编程小技巧(5)---自定义输入文件格式类InputFormat
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- MapReduce高级编程之自定义DataType