您的位置:首页 > 大数据 > Hadoop

MapReduce程序之数据去重

2018-03-08 00:02 176 查看

[toc]

MapReduce程序之数据去重

需求

有下面两个文件:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/duplication$ cat file2.txt
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

要求去除两个文件中的重复行并输出到文件中。

MapReduce程序

关于如何去重,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序,程序代码如下:

package com.uplooking.bigdata.mr.duplication;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
* 数据去重
*/
public class DuplicationJob {

/**
* 驱动程序,使用工具类来生成job
*
* @param args
*/
public static void main(String[] args) throws Exception {

if (args == null || args.length < 2) {
System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
System.exit(-1);
}

Job job = MapReduceJobUtil.buildJob(new Configuration(),
DuplicationJob.class,
args[0],
TextInputFormat.class,
DuplicationMapper.class,
Text.class,
NullWritable.class,
new Path(args[1]),
TextOutputFormat.class,
DuplicationReducer.class,
Text.class,
NullWritable.class);

job.setNumReduceTasks(1);
job.waitForCompletion(true);
}

/**
* 单词去重Mapper操作,主要是将每一行的内容作为key输出到reduce中
* 即map输出的key为某一行的内容,value为NullWritable
*/
public static class DuplicationMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接将行作为key写到context中
context.write(value, NullWritable.get());
}
}

/**
* 因为shuffle会将map输出中key相同的key-value对都拉取到同一个reducer中
* 所以数据到达reducer后,key就是唯一的key,而values则为空的集合
* 所以在reducer中也是直接将数据写入到context中,让reducer写出数据即可
*/
public static class DuplicationReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
}

测试

这里使用本地环境来运行MapReduce程序,输入的参数如下:

/Users/yeyonghao/data/input/duplication /Users/yeyonghao/data/output/mr/duplication

也可以将其打包成jar包,然后上传到Hadoop环境中运行。

运行程序后,查看输出结果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/duplication$ cat part-r-00000
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

可以看到,使用我们的MapReduce程序,已经完成了数据去重的目的。

数据去重的另一种思路

除了前面的数据去重思路外,还有另外一种思路,虽然相对会复杂一些,不过这里也提及一下。

Map处理时,可以将每一行数据处理成

&lt;2012-3-3, c&gt;
的形式,然后输出,作为reduce的输入,比如经过shuffle之后,到达reducer的输入为<2012-3-3, [c, c]>,那么就可以对迭代列表中的数据通过Java中的set集合来进行去重,这也可以作为解决这个上面的数据去重案例的一种思路,但显然这个方法没有前面的方法好,不具有通用性,所以还是建议使用第一种方法。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  大数据 Hadoop MapReduce