您的位置:首页 > 运维架构

MapReduce-输入数据整块处理

2015-11-12 16:46 375 查看
通常编程时会使用TextInputFormat作为输入方式(记录是按行输入的)

现通过自定义方法实现整个数据文件输入

适用场景:在map阶段检验数据是否有序

运行环境:windows下VM虚拟机,centos 6.5系统,hadoop2.2.0,三节点 ,java 1.7

原数据:



第一步:自定义wholeFileInputFormat方法继承FileInputFormat

//这个例子是以<NullWritable,BytesWritable>为输入<key,value>类型
public class wholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
//重写isSplitable方法是为了对于输入文件不进行分片操作
protected boolean isSplitable(JobContext context,Path filename)
{
//返回false就可以实现不分片
return false;
}
//这个类是创建读取文件内容实例(返回值)
public RecordReader<NullWritable,BytesWritable> createRecordReader(
InputSplit split ,TaskAttemptContext context)throws IOException,InterruptedException
{
//这里需要自定义一个读取方式,代码在下面
wholeFileRecordReader reader=new wholeFileRecordReader();
//传入job配置信息和split分片信息
reader.initialize(split, context);
//
return reader;
}

}


第二步:自定义wholeFileRecordReader类

//指定输入key,value类型
public class wholeFileRecordReader extends RecordReader<NullWritable,BytesWritable>{
//fileSplit是分片信息(文件位置,文件字节数)注:这里的分片只有1个
private FileSplit fileSplit;
private Configuration conf;
//这存的是文件内容,也是输出结果
private BytesWritable value=new BytesWritable();
//这是一个标志,用来表示数据是否被处理过
private boolean processed=false;
//通过context和分片信息初始化
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException
{
this.fileSplit=(FileSplit)split;
this.conf=context.getConfiguration();
}
//相当于迭代器的next()方法,读取文件内容
public boolean nextKeyValue()throws IOException,InterruptedException
{

if(!processed)
{
//根据文件的字节大小创建一个字节数组
byte[] contents =new byte[(int)fileSplit.getLength()];
//获取文件的位置
Path file=fileSplit.getPath();
//获取一个HDFS的系统实例
FileSystem fs=file.getFileSystem(conf);
//文件输入流
FSDataInputStream in=null;
try
{
//文件输入流指向文件
in=fs.open(file);
//IOUtils的readFully方法把文件内容写入contents数组中
IOUtils.readFully(in, contents, 0, contents.length);
//把数据放到上面定义过的BytesWritable类型的value(用于输出)
value.set(contents, 0, contents.length);
}finally
{
IOUtils.closeStream(in);
}
//这里赋值true表示数据被处理过了
processed=true;
//返回true,表示方法执行成功
return true;
}
//数据被处理过了,不用被再次读取数据,方法调用返回false
return false;
}
//下面是继承RecordReader需要实现的几个方法
//key值的返回结果
public NullWritable getCurrentKey()throws IOException,InterruptedException
{

return NullWritable.get();

}
//value的返回结果
public BytesWritable getCurrentValue()throws IOException,InterruptedException
{
return value;
}
//判断数据是否成功处理
public float getProgress()throws IOException
{
return processed?1.0f:0.0f;
}
//关闭
public void close()throws IOException{}
}


第三步:在具体场景中使用

public class test1 extends Configured implements Tool{
public static class Map extends Mapper<NullWritable,BytesWritable,Text,BytesWritable>
{
private Text filenameKey;
//setup类每次在执行map方法之前都会被调用,通常用来初始化
protected void setup(Context context)throws IOException,InterruptedException
{
//这段话其实就是让finenameKey记录输入文件所在的hdfs位置,可有可无
InputSplit split=context.getInputSplit();
Path path=((FileSplit)split).getPath();
filenameKey=new Text(path.toString());
}
public void map(NullWritable key,BytesWritable value,Context context)throws IOException,InterruptedException
{
context.write(filenameKey, value);
//通过这个输出观察map任务调用了几次(TextoutputFormat是一行调用一次)
System.out.println("-------------------------once");

}

}
public int run(String[] args)throws Exception
{
Configuration conf=getConf();
Job job=new Job(conf,"test11");
job.setJarByClass(test1.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(wholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return job.isSuccessful()?1:0;
}
public static void main(String[] args)throws Exception
{
int rsa=ToolRunner.run(new Configuration(), new test1(), args);
System.exit(rsa);
}
}


可以看出针对多行数据只执行了一次map任务

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop