您的位置:首页 > 编程语言

MapReduce高级编程——自定义InputFormat

2014-12-03 09:20 323 查看


 0、测试集样例

Java代码  


ball, 3.5, 12.7, 9.0  

car, 15, 23.76, 42.23  

device, 0.0, 12.4, -67.1  

 

1、测试Point3D InputFormat

Java代码  


import java.io.IOException;  

import java.net.URI;  

  

import javax.xml.soap.Text;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

  

/** 

 * desc:Custom Data Types <code>TestPoint3DInputFormat</code> 

 *  

 * @author chenwq 

 */  

public class TestPoint3DInputFormat {  

     /** 

     * @param args 

     * @throws IOException  

     * @throws ClassNotFoundException  

     * @throws InterruptedException  

     */  

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  

        // TODO Auto-generated method stub  

        System.out.println("hello,chenwq!");  

        Job job=new Job();  

        Configuration conf=new Configuration();  

        FileSystem fs=FileSystem.get(URI.create(args[1]), conf);  

        fs.delete(new Path(args[1]));  

        job.setJobName("测试MyInputFormat程序。。。。。");  

        FileInputFormat.addInputPath(job, new Path(args[0]));  

        FileOutputFormat.setOutputPath(job, new Path(args[1]));  

        job.setInputFormatClass(Point3DinputFormat.class);  

        job.setMapOutputKeyClass(Text.class);  

        job.setMapOutputValueClass(Point3D.class);  

        job.setMapperClass(Point3DMapper.class);  

        job.setNumReduceTasks(0);  

        job.waitForCompletion(false);  

    }  

}  

 

2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输

Java代码  


import java.io.DataInput;  

import java.io.DataOutput;  

import java.io.IOException;  

  

import org.apache.hadoop.io.WritableComparable;  

  

/** 

 * desc:Custom Data Types <code>Point</code> 

 *  

 * @author chenwq 

 */  

public class Point3D implements WritableComparable {  

    public float x;  

    public float y;  

    public float z;  

  

    public Point3D(float x, float y, float z) {  

        this.x = x;  

        this.y = y;  

        this.z = z;  

    }  

  

    public Point3D() {  

        this(0.0f, 0.0f, 0.0f);  

    }  

  

    public void set(float x, float y, float z) {  

        this.x = x;  

        this.y = y;  

        this.z = z;  

    }  

  

    public void write(DataOutput out) throws IOException {  

        out.writeFloat(x);  

        out.writeFloat(y);  

        out.writeFloat(z);  

    }  

  

    public void readFields(DataInput in) throws IOException {  

        x = in.readFloat();  

        y = in.readFloat();  

        z = in.readFloat();  

    }  

  

    public String toString() {  

        return Float.toString(x) + ", " + Float.toString(y) + ", "  

                + Float.toString(z);  

    }  

  

    public float distanceFromOrigin() {  

        return (float) Math.sqrt(x * x + y * y + z * z);  

    }  

  

    public int compareTo(Object other) {  

        float myDistance = this.distanceFromOrigin();  

        float otherDistance = ((Point3D) other).distanceFromOrigin();  

  

        return Float.compare(myDistance, otherDistance);  

    }  

  

    public boolean equals(Object o) {  

        Point3D other = (Point3D) o;  

        if (!(other instanceof Point3D)) {  

            return false;  

        }  

  

        return this.x == other.x && this.y == other.y && this.z == other.z;  

    }  

  

    public int hashCode() {  

        return Float.floatToIntBits(x) ^ Float.floatToIntBits(y)  

                ^ Float.floatToIntBits(z);  

    }  

  

}  

 3、自定义Point3DInputFormat类型,供MapReduce编程模型使用

Java代码  


import java.io.IOException;  

  

import java.util.StringTokenizer;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.FSDataInputStream;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.InputSplit;  

import org.apache.hadoop.mapreduce.JobContext;  

import org.apache.hadoop.mapreduce.RecordReader;  

import org.apache.hadoop.mapreduce.TaskAttemptContext;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.input.FileSplit;  

import org.apache.hadoop.util.LineReader;  

  

public class Point3DinputFormat extends FileInputFormat<Text, Point3D> {  

      

    @Override  

    protected boolean isSplitable(JobContext context, Path filename) {  

        // TODO Auto-generated method stub  

        return false;  

    }  

    @Override  

    public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit,  

            TaskAttemptContext context) throws IOException, InterruptedException {  

        // TODO Auto-generated method stub  

        return new objPosRecordReader();  

    }  

    public static class objPosRecordReader extends RecordReader<Text,Point3D>{  

  

        public LineReader in;  

        public Text lineKey;  

        public Point3D lineValue;  

        public StringTokenizer token=null;  

          

        public Text line;  

        

        @Override  

        public void close() throws IOException {  

            // TODO Auto-generated method stub  

              

        }  

  

        @Override  

        public Text getCurrentKey() throws IOException, InterruptedException {  

            //lineKey.set(token.nextToken());  

            return lineKey;  

        }  

  

        @Override  

        public Point3D getCurrentValue() throws IOException,  

                InterruptedException {  

            // TODO Auto-generated method stub  

            return lineValue;  

        }  

  

        @Override  

        public float getProgress() throws IOException, InterruptedException {  

            // TODO Auto-generated method stub  

            return 0;  

        }  

  

        @Override  

        public void initialize(InputSplit input, TaskAttemptContext context)  

                throws IOException, InterruptedException {  

            // TODO Auto-generated method stub  

            FileSplit split=(FileSplit)input;  

            Configuration job=context.getConfiguration();  

            Path file=split.getPath();  

            FileSystem fs=file.getFileSystem(job);  

              

            FSDataInputStream filein=fs.open(file);  

            in=new LineReader(filein,job);  

              

            line=new Text();  

            lineKey=new Text();  

            lineValue=new Point3D();  

        }  

  

        @Override  

        public boolean nextKeyValue() throws IOException, InterruptedException {  

            // TODO Auto-generated method stub  

            int linesize=in.readLine(line);  

            if(linesize==0)  

                return false;  

              

            String[] pieces = line.toString().split(",");  

            if(pieces.length != 4){  

                throw new IOException("Invalid record received");  

            }  

              

            // try to parse floating point components of value  

            float fx, fy, fz;  

            try{  

                fx = Float.parseFloat(pieces[1].trim());  

                fy = Float.parseFloat(pieces[2].trim());  

                fz = Float.parseFloat(pieces[3].trim());  

            }catch(NumberFormatException nfe){  

                throw new IOException("Error parsing floating poing value in record");  

            }  

            lineKey.set(pieces[0]);  

              

            lineValue.set(fx, fy, fz);  

              

            return true;  

        }  

    }  

}  

 

4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer

Java代码  


import java.io.IOException;  

  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Mapper;  

  

  

public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{  

    protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{  

        context.write(key, value);  

    }  

}  

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: