您的位置:首页 > 其它

自定义InputFormat /InputSplit/RecordReader

2016-08-21 09:19 330 查看
在编写MapReduce的时候,自带的输入格式有时候满足不了我们的需求,这个时候就要自己来定制InputFprmat、InputSplit和RecordReader。
MapReduce自带的输入类型都是基于HDFS的,这个例子的功能是,不从HDFS上面读取输入内容,在内存中随机生成100个(0-1)float型的小数,然后求这100个小数的最大值。

1、类 FindMaxValueInputFormat  继承抽象类 InputFormat: 

主要实现的方法:

public abstract List<InputSplit> getSplits(JobContext context)

public abstract  RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)

[java]
view plain
copy
print?

package inputformat;  
  
  
import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
  
  
import org.apache.hadoop.io.ArrayWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.mapreduce.InputFormat;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  
  
public class FindMaxValueInputFormat extends InputFormat<IntWritable, ArrayWritable>{  
    public static float[] floatvalues;  
    /** 
     * 返回一个InputSplit 集合 
     * 这个例子一共有两个InputSplit,两个map 
     * 随机产生100个 0-1 的数组,放到float数组里面 
     */  
    @Override  
    public List<InputSplit> getSplits(JobContext context) throws IOException,  
            InterruptedException {  
        int NumOfValues = context.getConfiguration().getInt("NumOfValues", 100);  
        floatvalues = new float[NumOfValues];  
        Random rand =new Random();  
          
        for (int i = 0; i < NumOfValues; i++) {  
            floatvalues[i]=rand.nextFloat();  
        }  
        int NumSplits = context.getConfiguration().getInt("mapred.map.tasks", 2);  
        int beg=0;  
        int length =(int) Math.floor(NumOfValues/NumSplits);  
          
        ArrayList<InputSplit> splits =new ArrayList<InputSplit>();  
        int end = length-1;  
          
        for (int i = 0; i < NumSplits-1; i++) {  
            FindMaxValueInputSplit split =new FindMaxValueInputSplit(beg,end);  
            splits.add(split);  
              
            beg=end+1;  
            end=end+length-1;  
        }  
          
        FindMaxValueInputSplit split=new FindMaxValueInputSplit(beg,NumOfValues-1);  
        splits.add(split);  
          
        return splits;  
    }  
      
    /** 
     * 自定义 RecordReader 
     */  
    @Override  
    public RecordReader<IntWritable, ArrayWritable> createRecordReader(  
            InputSplit split, TaskAttemptContext context) throws IOException,  
            InterruptedException {  
        return new FindMaxValueRecordReader();  
    }  
      
}  

2、类 FindMaxValueInputSplit 继承 InputSplit 实现 Writable :

[java]
view plain
copy
print?

package inputformat;  
  
  
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
  
  
import org.apache.hadoop.io.ArrayWritable;  
import org.apache.hadoop.io.FloatWritable;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.InputSplit;  
  
  
public class FindMaxValueInputSplit extends InputSplit implements Writable{  
  
  
    private int m_StartIndex;  
    private int m_EndIndex;  
    private ArrayWritable m_FloatArray=new ArrayWritable(FloatWritable.class);  
      
    public FindMaxValueInputSplit(){};  
    /** 
     * 这个自定义分类主要记录了Map函数的开始索引和结束索引,第一个map处理前50个小数,第二个map后50个小数 
     * @param start 开始位置 
     * @param end 结束位置 
     */  
    public FindMaxValueInputSplit(int start,int end){  
        m_StartIndex=start;  
        m_EndIndex=end;  
        int len=m_EndIndex-m_StartIndex+1;  
        int index=m_StartIndex;  
          
        FloatWritable[] result = new FloatWritable[len];  
          
        for (int i = 0; i < result.length; i++) {  
            float f=FindMaxValueInputFormat.floatvalues[index];  
            FloatWritable fW =new FloatWritable();  
            fW.set(f);  
            result[i]=fW;  
            index++;  
        }  
        m_FloatArray.set(result);  
    }  
      
    public void write(DataOutput out) throws IOException {  
        out.writeInt(this.m_StartIndex);  
        out.writeInt(this.m_EndIndex);  
        this.m_FloatArray.write(out);  
    }  
  
  
    public void readFields(DataInput in) throws IOException {  
        this.m_StartIndex=in.readInt();  
        this.m_EndIndex=in.readInt();  
        this.m_FloatArray.readFields(in);  
    }  
  
  
    @Override  
    public long getLength() throws IOException, InterruptedException {  
        return (this.m_EndIndex-this.m_StartIndex+1);  
    }  
  
  
    @Override  
    public String[] getLocations() throws IOException, InterruptedException {  
        return new String[]{"hadoop-2","hadoop-1"};  
    }  
      
    public int getM_StartIndex(){   
        return m_StartIndex;  
    }  
      
    public int getM_EndIndex(){  
        return m_EndIndex;  
    }  
      
    public ArrayWritable getM_FloatArray(){   
        return m_FloatArray;  
    }  
      
}  

3、类 FindMaxValueRecordReader 继承 RecordReader :

[java]
view plain
copy
print?

package inputformat;  
  
  
import java.io.IOException;  
  
  
import org.apache.hadoop.io.ArrayWritable;  
import org.apache.hadoop.io.FloatWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
/** 
 * 这个自定义 RecordReader 类,定义输入到map函数的输入格式 
 * Key为偏移量 
 * Value为float数组,长度为50 
 * @author Administrator 
 * 
 */  
public class FindMaxValueRecordReader extends RecordReader<IntWritable,ArrayWritable>{  
    private int m_End;  
    private int m_Index;  
    private int m_Start;  
    private IntWritable key=null;  
    private ArrayWritable value=null;  
    private FindMaxValueInputSplit fmvsplit=null;  
      
    @Override  
    public void initialize(InputSplit split, TaskAttemptContext context)  
            throws IOException, InterruptedException {  
        fmvsplit=(FindMaxValueInputSplit)split;  
        this.m_Start=fmvsplit.getM_StartIndex();  
        this.m_End=fmvsplit.getM_EndIndex();  
        this.m_Index=this.m_Start;  
    }  
    /** 
     * 输出的key为  IntWritable 
     * 输出的value为  ArrayWritable 
     */  
    @Override  
    public boolean nextKeyValue() throws IOException, InterruptedException {  
        if (key == null) {  
            key=new IntWritable();  
        }  
        if (value == null) {  
            value = new ArrayWritable(FloatWritable.class);  
        }  
        if (m_Index <= m_End) {  
            key.set(m_Index);  
            value=fmvsplit.getM_FloatArray();  
            m_Index=m_End+1;  
            return true;  
        }else {  
            return false;     
        }  
    }  
  
  
    @Override  
    public IntWritable getCurrentKey() throws IOException, InterruptedException {  
        return key;  
    }  
  
  
    @Override  
    public ArrayWritable getCurrentValue() throws IOException,  
            InterruptedException {  
        return value;  
    }  
  
  
    @Override  
    public float getProgress() throws IOException, InterruptedException {  
          
        if (this.m_Index == this.m_End) {  
            return 0.0f;  
        }else {  
            return Math.min(1.0f, (this.m_Index - this.m_Start)/(float)(this.m_End-this.m_Start));  
        }  
    }  
  
  
    @Override  
    public void close() throws IOException {  
    }  
      
}  

4、mapper类 FindMaxValueMapper:

[java]
view plain
copy
print?

package inputformat;  
  
  
import java.io.IOException;  
  
  
import org.apache.hadoop.io.ArrayWritable;  
import org.apache.hadoop.io.FloatWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.mapreduce.Mapper;  
/** 
 * Map函数的输入格式有所改变 
 * @author Administrator 
 * 
 */  
public class FindMaxValueMapper extends Mapper<IntWritable, ArrayWritable,IntWritable, FloatWritable>{  
    private final static IntWritable one =new IntWritable(1);  
    @Override  
    protected void map(  
            IntWritable key,  
            ArrayWritable value,  
            Mapper<IntWritable, ArrayWritable, IntWritable, FloatWritable>.Context context)  
            throws IOException, InterruptedException {  
          
        FloatWritable[] floatArray =(FloatWritable[])value.toArray();  
        float maxfloat=floatArray[0].get();  
        float tmp;  
        /** 
         * 求一个InputSplit中的最大值 
         */  
        for (int i = 0; i < floatArray.length; i++) {  
            tmp=floatArray[i].get();  
            if (tmp>maxfloat) {  
                maxfloat=tmp;  
            }  
        }  
        /** 
         * 把一个map中的最大值输出出来 
         */  
        context.write(one, new FloatWritable(maxfloat));  
    }  
  
  
}  

5、Reducer 类 FindMaxValueReducer :

[java]
view plain
copy
print?

package inputformat;  
  
  
import java.io.IOException;  
import java.util.Iterator;  
  
  
import org.apache.hadoop.io.FloatWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
/** 
 * Ruducer比较两个Map函数输出的最大值,结果输出在HDFS上面 
 * 这个例子就比较两个值,有几个Map比较几个 
 * @author Administrator 
 * 
 */  
public class FindMaxValueReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable>{  
      
    @SuppressWarnings("rawtypes")  
    @Override  
    protected void reduce(  
            IntWritable k2,  
            Iterable<FloatWritable> v2s,  
            Reducer<IntWritable, FloatWritable, Text, FloatWritable>.Context context)  
            throws IOException, InterruptedException {  
          
        Iterator it = v2s.iterator();  
        float maxfloat=0,tmp;  
        /** 
         * 取第一个数 
         */  
        if (it.hasNext()) {  
            maxfloat=((FloatWritable)(it.next())).get();  
        }else {  
            //集合为空时,输出迭代失败信息  
            context.write(new Text("Max float value : "), null);  
            return;  
        }  
        /** 
         * 求最大值 
         */  
        while (it.hasNext()) {  
            tmp=((FloatWritable)(it.next())).get();  
            if (tmp > maxfloat) {  
                maxfloat = tmp;  
            }  
        }  
        //把最大的那个值输出来  
        context.write(new Text("Max float value : "), new FloatWritable(maxfloat));  
    }  
      
}  

6、驱动类 MaxValueDriver :

[java]
view plain
copy
print?

package inputformat;  
  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.FloatWritable;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  
  
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;  
/** 
 * 驱动类 
 * @author Administrator 
 * 
 */  
public class MaxValueDriver {  
    public static void main(String[] args) throws Exception {  
        Configuration conf = new Configuration();  
        Job job=Job.getInstance(conf, MaxValueDriver.class.getSimpleName());  
        job.setJarByClass(MaxValueDriver.class);  
          
        job.setNumReduceTasks(1);  
          
        job.setMapperClass(FindMaxValueMapper.class);  
        job.setReducerClass(FindMaxValueReducer.class);  
          
        job.setMapOutputKeyClass(IntWritable.class);  
        job.setMapOutputValueClass(FloatWritable.class);  
          
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(FloatWritable.class);  
          
        job.setInputFormatClass(FindMaxValueInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
          
    //  FileInputFormat.setInputPaths(job, args[0]);  
        FileOutputFormat.setOutputPath(job, new Path(args[0]));  
          
        job.waitForCompletion(true);  
          
    }  
}  

第一次运行:

[java]
view plain
copy
print?

[hadoop@hadoop ~]$ hdfs dfs -ls  /user/hadoop/output_inputformat1  
Found 2 items  
-rw-r--r--   3 hadoop supergroup          0 2015-09-04 15:09 /user/hadoop/output_inputformat1/_SUCCESS  
-rw-r--r--   3 hadoop supergroup         29 2015-09-04 15:09 /user/hadoop/output_inputformat1/part-r-00000  
[hadoop@hadoop ~]$ hdfs dfs -text  /user/hadoop/output_inputformat1/part-r-00000  
Max float value :       0.9981093  
[hadoop@hadoop ~]$  

第二次运行:

[java]
view plain
copy
print?

[hadoop@hadoop ~]$ hdfs dfs -ls  /user/hadoop/output_inputformat2  
Found 2 items  
-rw-r--r--   3 hadoop supergroup          0 2015-09-05 11:43 /user/hadoop/output_inputformat2/_SUCCESS  
-rw-r--r--   3 hadoop supergroup         29 2015-09-05 11:43 /user/hadoop/output_inputformat2/part-r-00000  
[hadoop@hadoop ~]$  

查看两次运行内容:

[java]
view plain
copy
print?

[hadoop@hadoop ~]$ hdfs dfs -text  /user/hadoop/output_inputformat1/part-r-00000  
Max float value :       0.9981093  
[hadoop@hadoop ~]$ hdfs dfs -text  /user/hadoop/output_inputformat2/part-r-00000  
Max float value :       0.9966892  
[hadoop@hadoop ~]$  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: