自定义InputFormat /InputSplit/RecordReader
2016-08-21 09:19
330 查看
在编写MapReduce的时候,自带的输入格式有时候满足不了我们的需求,这个时候就要自己来定制InputFprmat、InputSplit和RecordReader。
MapReduce自带的输入类型都是基于HDFS的,这个例子的功能是,不从HDFS上面读取输入内容,在内存中随机生成100个(0-1)float型的小数,然后求这100个小数的最大值。
1、类 FindMaxValueInputFormat 继承抽象类 InputFormat:
主要实现的方法:
public abstract List<InputSplit> getSplits(JobContext context)
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FindMaxValueInputFormat extends InputFormat<IntWritable, ArrayWritable>{
public static float[] floatvalues;
/**
* 返回一个InputSplit 集合
* 这个例子一共有两个InputSplit,两个map
* 随机产生100个 0-1 的数组,放到float数组里面
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
int NumOfValues = context.getConfiguration().getInt("NumOfValues", 100);
floatvalues = new float[NumOfValues];
Random rand =new Random();
for (int i = 0; i < NumOfValues; i++) {
floatvalues[i]=rand.nextFloat();
}
int NumSplits = context.getConfiguration().getInt("mapred.map.tasks", 2);
int beg=0;
int length =(int) Math.floor(NumOfValues/NumSplits);
ArrayList<InputSplit> splits =new ArrayList<InputSplit>();
int end = length-1;
for (int i = 0; i < NumSplits-1; i++) {
FindMaxValueInputSplit split =new FindMaxValueInputSplit(beg,end);
splits.add(split);
beg=end+1;
end=end+length-1;
}
FindMaxValueInputSplit split=new FindMaxValueInputSplit(beg,NumOfValues-1);
splits.add(split);
return splits;
}
/**
* 自定义 RecordReader
*/
@Override
public RecordReader<IntWritable, ArrayWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
return new FindMaxValueRecordReader();
}
}
2、类 FindMaxValueInputSplit 继承 InputSplit 实现 Writable :
[java]
view plain
copy
print?
package inputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
public class FindMaxValueInputSplit extends InputSplit implements Writable{
private int m_StartIndex;
private int m_EndIndex;
private ArrayWritable m_FloatArray=new ArrayWritable(FloatWritable.class);
public FindMaxValueInputSplit(){};
/**
* 这个自定义分类主要记录了Map函数的开始索引和结束索引,第一个map处理前50个小数,第二个map后50个小数
* @param start 开始位置
* @param end 结束位置
*/
public FindMaxValueInputSplit(int start,int end){
m_StartIndex=start;
m_EndIndex=end;
int len=m_EndIndex-m_StartIndex+1;
int index=m_StartIndex;
FloatWritable[] result = new FloatWritable[len];
for (int i = 0; i < result.length; i++) {
float f=FindMaxValueInputFormat.floatvalues[index];
FloatWritable fW =new FloatWritable();
fW.set(f);
result[i]=fW;
index++;
}
m_FloatArray.set(result);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.m_StartIndex);
out.writeInt(this.m_EndIndex);
this.m_FloatArray.write(out);
}
public void readFields(DataInput in) throws IOException {
this.m_StartIndex=in.readInt();
this.m_EndIndex=in.readInt();
this.m_FloatArray.readFields(in);
}
@Override
public long getLength() throws IOException, InterruptedException {
return (this.m_EndIndex-this.m_StartIndex+1);
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[]{"hadoop-2","hadoop-1"};
}
public int getM_StartIndex(){
return m_StartIndex;
}
public int getM_EndIndex(){
return m_EndIndex;
}
public ArrayWritable getM_FloatArray(){
return m_FloatArray;
}
}
3、类 FindMaxValueRecordReader 继承 RecordReader :
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* 这个自定义 RecordReader 类,定义输入到map函数的输入格式
* Key为偏移量
* Value为float数组,长度为50
* @author Administrator
*
*/
public class FindMaxValueRecordReader extends RecordReader<IntWritable,ArrayWritable>{
private int m_End;
private int m_Index;
private int m_Start;
private IntWritable key=null;
private ArrayWritable value=null;
private FindMaxValueInputSplit fmvsplit=null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
fmvsplit=(FindMaxValueInputSplit)split;
this.m_Start=fmvsplit.getM_StartIndex();
this.m_End=fmvsplit.getM_EndIndex();
this.m_Index=this.m_Start;
}
/**
* 输出的key为 IntWritable
* 输出的value为 ArrayWritable
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key=new IntWritable();
}
if (value == null) {
value = new ArrayWritable(FloatWritable.class);
}
if (m_Index <= m_End) {
key.set(m_Index);
value=fmvsplit.getM_FloatArray();
m_Index=m_End+1;
return true;
}else {
return false;
}
}
@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public ArrayWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (this.m_Index == this.m_End) {
return 0.0f;
}else {
return Math.min(1.0f, (this.m_Index - this.m_Start)/(float)(this.m_End-this.m_Start));
}
}
@Override
public void close() throws IOException {
}
}
4、mapper类 FindMaxValueMapper:
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Map函数的输入格式有所改变
* @author Administrator
*
*/
public class FindMaxValueMapper extends Mapper<IntWritable, ArrayWritable,IntWritable, FloatWritable>{
private final static IntWritable one =new IntWritable(1);
@Override
protected void map(
IntWritable key,
ArrayWritable value,
Mapper<IntWritable, ArrayWritable, IntWritable, FloatWritable>.Context context)
throws IOException, InterruptedException {
FloatWritable[] floatArray =(FloatWritable[])value.toArray();
float maxfloat=floatArray[0].get();
float tmp;
/**
* 求一个InputSplit中的最大值
*/
for (int i = 0; i < floatArray.length; i++) {
tmp=floatArray[i].get();
if (tmp>maxfloat) {
maxfloat=tmp;
}
}
/**
* 把一个map中的最大值输出出来
*/
context.write(one, new FloatWritable(maxfloat));
}
}
5、Reducer 类 FindMaxValueReducer :
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Ruducer比较两个Map函数输出的最大值,结果输出在HDFS上面
* 这个例子就比较两个值,有几个Map比较几个
* @author Administrator
*
*/
public class FindMaxValueReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable>{
@SuppressWarnings("rawtypes")
@Override
protected void reduce(
IntWritable k2,
Iterable<FloatWritable> v2s,
Reducer<IntWritable, FloatWritable, Text, FloatWritable>.Context context)
throws IOException, InterruptedException {
Iterator it = v2s.iterator();
float maxfloat=0,tmp;
/**
* 取第一个数
*/
if (it.hasNext()) {
maxfloat=((FloatWritable)(it.next())).get();
}else {
//集合为空时,输出迭代失败信息
context.write(new Text("Max float value : "), null);
return;
}
/**
* 求最大值
*/
while (it.hasNext()) {
tmp=((FloatWritable)(it.next())).get();
if (tmp > maxfloat) {
maxfloat = tmp;
}
}
//把最大的那个值输出来
context.write(new Text("Max float value : "), new FloatWritable(maxfloat));
}
}
6、驱动类 MaxValueDriver :
[java]
view plain
copy
print?
package inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/**
* 驱动类
* @author Administrator
*
*/
public class MaxValueDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job=Job.getInstance(conf, MaxValueDriver.class.getSimpleName());
job.setJarByClass(MaxValueDriver.class);
job.setNumReduceTasks(1);
job.setMapperClass(FindMaxValueMapper.class);
job.setReducerClass(FindMaxValueReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.setInputFormatClass(FindMaxValueInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
job.waitForCompletion(true);
}
}
第一次运行:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -ls /user/hadoop/output_inputformat1
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2015-09-04 15:09 /user/hadoop/output_inputformat1/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 29 2015-09-04 15:09 /user/hadoop/output_inputformat1/part-r-00000
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat1/part-r-00000
Max float value : 0.9981093
[hadoop@hadoop ~]$
第二次运行:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -ls /user/hadoop/output_inputformat2
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2015-09-05 11:43 /user/hadoop/output_inputformat2/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 29 2015-09-05 11:43 /user/hadoop/output_inputformat2/part-r-00000
[hadoop@hadoop ~]$
查看两次运行内容:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat1/part-r-00000
Max float value : 0.9981093
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat2/part-r-00000
Max float value : 0.9966892
[hadoop@hadoop ~]$
MapReduce自带的输入类型都是基于HDFS的,这个例子的功能是,不从HDFS上面读取输入内容,在内存中随机生成100个(0-1)float型的小数,然后求这100个小数的最大值。
1、类 FindMaxValueInputFormat 继承抽象类 InputFormat:
主要实现的方法:
public abstract List<InputSplit> getSplits(JobContext context)
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FindMaxValueInputFormat extends InputFormat<IntWritable, ArrayWritable>{
public static float[] floatvalues;
/**
* 返回一个InputSplit 集合
* 这个例子一共有两个InputSplit,两个map
* 随机产生100个 0-1 的数组,放到float数组里面
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
int NumOfValues = context.getConfiguration().getInt("NumOfValues", 100);
floatvalues = new float[NumOfValues];
Random rand =new Random();
for (int i = 0; i < NumOfValues; i++) {
floatvalues[i]=rand.nextFloat();
}
int NumSplits = context.getConfiguration().getInt("mapred.map.tasks", 2);
int beg=0;
int length =(int) Math.floor(NumOfValues/NumSplits);
ArrayList<InputSplit> splits =new ArrayList<InputSplit>();
int end = length-1;
for (int i = 0; i < NumSplits-1; i++) {
FindMaxValueInputSplit split =new FindMaxValueInputSplit(beg,end);
splits.add(split);
beg=end+1;
end=end+length-1;
}
FindMaxValueInputSplit split=new FindMaxValueInputSplit(beg,NumOfValues-1);
splits.add(split);
return splits;
}
/**
* 自定义 RecordReader
*/
@Override
public RecordReader<IntWritable, ArrayWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
return new FindMaxValueRecordReader();
}
}
2、类 FindMaxValueInputSplit 继承 InputSplit 实现 Writable :
[java]
view plain
copy
print?
package inputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
public class FindMaxValueInputSplit extends InputSplit implements Writable{
private int m_StartIndex;
private int m_EndIndex;
private ArrayWritable m_FloatArray=new ArrayWritable(FloatWritable.class);
public FindMaxValueInputSplit(){};
/**
* 这个自定义分类主要记录了Map函数的开始索引和结束索引,第一个map处理前50个小数,第二个map后50个小数
* @param start 开始位置
* @param end 结束位置
*/
public FindMaxValueInputSplit(int start,int end){
m_StartIndex=start;
m_EndIndex=end;
int len=m_EndIndex-m_StartIndex+1;
int index=m_StartIndex;
FloatWritable[] result = new FloatWritable[len];
for (int i = 0; i < result.length; i++) {
float f=FindMaxValueInputFormat.floatvalues[index];
FloatWritable fW =new FloatWritable();
fW.set(f);
result[i]=fW;
index++;
}
m_FloatArray.set(result);
}
public void write(DataOutput out) throws IOException {
out.writeInt(this.m_StartIndex);
out.writeInt(this.m_EndIndex);
this.m_FloatArray.write(out);
}
public void readFields(DataInput in) throws IOException {
this.m_StartIndex=in.readInt();
this.m_EndIndex=in.readInt();
this.m_FloatArray.readFields(in);
}
@Override
public long getLength() throws IOException, InterruptedException {
return (this.m_EndIndex-this.m_StartIndex+1);
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[]{"hadoop-2","hadoop-1"};
}
public int getM_StartIndex(){
return m_StartIndex;
}
public int getM_EndIndex(){
return m_EndIndex;
}
public ArrayWritable getM_FloatArray(){
return m_FloatArray;
}
}
3、类 FindMaxValueRecordReader 继承 RecordReader :
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* 这个自定义 RecordReader 类,定义输入到map函数的输入格式
* Key为偏移量
* Value为float数组,长度为50
* @author Administrator
*
*/
public class FindMaxValueRecordReader extends RecordReader<IntWritable,ArrayWritable>{
private int m_End;
private int m_Index;
private int m_Start;
private IntWritable key=null;
private ArrayWritable value=null;
private FindMaxValueInputSplit fmvsplit=null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
fmvsplit=(FindMaxValueInputSplit)split;
this.m_Start=fmvsplit.getM_StartIndex();
this.m_End=fmvsplit.getM_EndIndex();
this.m_Index=this.m_Start;
}
/**
* 输出的key为 IntWritable
* 输出的value为 ArrayWritable
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key=new IntWritable();
}
if (value == null) {
value = new ArrayWritable(FloatWritable.class);
}
if (m_Index <= m_End) {
key.set(m_Index);
value=fmvsplit.getM_FloatArray();
m_Index=m_End+1;
return true;
}else {
return false;
}
}
@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public ArrayWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (this.m_Index == this.m_End) {
return 0.0f;
}else {
return Math.min(1.0f, (this.m_Index - this.m_Start)/(float)(this.m_End-this.m_Start));
}
}
@Override
public void close() throws IOException {
}
}
4、mapper类 FindMaxValueMapper:
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Map函数的输入格式有所改变
* @author Administrator
*
*/
public class FindMaxValueMapper extends Mapper<IntWritable, ArrayWritable,IntWritable, FloatWritable>{
private final static IntWritable one =new IntWritable(1);
@Override
protected void map(
IntWritable key,
ArrayWritable value,
Mapper<IntWritable, ArrayWritable, IntWritable, FloatWritable>.Context context)
throws IOException, InterruptedException {
FloatWritable[] floatArray =(FloatWritable[])value.toArray();
float maxfloat=floatArray[0].get();
float tmp;
/**
* 求一个InputSplit中的最大值
*/
for (int i = 0; i < floatArray.length; i++) {
tmp=floatArray[i].get();
if (tmp>maxfloat) {
maxfloat=tmp;
}
}
/**
* 把一个map中的最大值输出出来
*/
context.write(one, new FloatWritable(maxfloat));
}
}
5、Reducer 类 FindMaxValueReducer :
[java]
view plain
copy
print?
package inputformat;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Ruducer比较两个Map函数输出的最大值,结果输出在HDFS上面
* 这个例子就比较两个值,有几个Map比较几个
* @author Administrator
*
*/
public class FindMaxValueReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable>{
@SuppressWarnings("rawtypes")
@Override
protected void reduce(
IntWritable k2,
Iterable<FloatWritable> v2s,
Reducer<IntWritable, FloatWritable, Text, FloatWritable>.Context context)
throws IOException, InterruptedException {
Iterator it = v2s.iterator();
float maxfloat=0,tmp;
/**
* 取第一个数
*/
if (it.hasNext()) {
maxfloat=((FloatWritable)(it.next())).get();
}else {
//集合为空时,输出迭代失败信息
context.write(new Text("Max float value : "), null);
return;
}
/**
* 求最大值
*/
while (it.hasNext()) {
tmp=((FloatWritable)(it.next())).get();
if (tmp > maxfloat) {
maxfloat = tmp;
}
}
//把最大的那个值输出来
context.write(new Text("Max float value : "), new FloatWritable(maxfloat));
}
}
6、驱动类 MaxValueDriver :
[java]
view plain
copy
print?
package inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/**
* 驱动类
* @author Administrator
*
*/
public class MaxValueDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job=Job.getInstance(conf, MaxValueDriver.class.getSimpleName());
job.setJarByClass(MaxValueDriver.class);
job.setNumReduceTasks(1);
job.setMapperClass(FindMaxValueMapper.class);
job.setReducerClass(FindMaxValueReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
job.setInputFormatClass(FindMaxValueInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
job.waitForCompletion(true);
}
}
第一次运行:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -ls /user/hadoop/output_inputformat1
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2015-09-04 15:09 /user/hadoop/output_inputformat1/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 29 2015-09-04 15:09 /user/hadoop/output_inputformat1/part-r-00000
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat1/part-r-00000
Max float value : 0.9981093
[hadoop@hadoop ~]$
第二次运行:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -ls /user/hadoop/output_inputformat2
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2015-09-05 11:43 /user/hadoop/output_inputformat2/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 29 2015-09-05 11:43 /user/hadoop/output_inputformat2/part-r-00000
[hadoop@hadoop ~]$
查看两次运行内容:
[java]
view plain
copy
print?
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat1/part-r-00000
Max float value : 0.9981093
[hadoop@hadoop ~]$ hdfs dfs -text /user/hadoop/output_inputformat2/part-r-00000
Max float value : 0.9966892
[hadoop@hadoop ~]$
相关文章推荐
- 重写RecordReader和InputFormat实现单个文件不分片,整个分片作为一条记录处理。(倒排索引)
- Hadoop 自定义InputFormat实现自定义Split
- Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析
- Hadoop权威指南: InputFormat,RecordReader,OutputFormat和RecordWriter
- (一)MapReduce篇之InputFormat,InputSplit,RecordReader(转)
- Hadoop 自定义InputFormat实现自定义Split
- Hadoop源代码分析(一)——输入(TextInputFormat,FileSplit,LineRecordReader)
- spatialhadoop2.3源码阅读(九) ShapeLineInputFormat & ShapeLineRecordReader & SpatialRecordReader[FileMBR]
- 92课程作业,hive自定义IMFRecordReader,切分^^尖尖头及| 分割符
- HIVE 处理日志,自定义inputformat 完整版
- Hadoop InputFormat定制时必须知道的原理---如何划分split,split如何调度,如何读取
- 自定义 hadoop MapReduce InputFormat 切分输入文件
- 研究MapReduce源码之实现自定义LineRecordReader完成多行读取文件内容
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- 自定义hadoop map/reduce输入文件切割InputFormat
- 自定义MapReduce的InputFormat,提取指定开始与结束限定符间的内容
- Mapreduce之自定义InputFormat-小文件合并
- 4-1、自定义InputFormat 类代码
- spatialhadoop2.1源码阅读(三) 自定义InputFormat(SpatialInputFormat & ShapeInputFormat)
- Hadoop自定义 inputformat 和outputformat 实现图像的读写