基于hadoop2.2的map端表关联(map side join)mapreduce实现
2014-05-11 20:53
405 查看
大数据工作组交流Q-Q群:161636262
原因:之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash
table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
表文件:
login:
1 0
20121213
2 0
20121213
3 1
20121213
4 1
20121213
1 0
20121114
2 0
20121114
3 1
20121114
4 1
20121114
1 0
20121213
1 0
20121114
9 0
20121114
user:
1
rolin 北京
3
王五 tianjin
4
赵六 guangzhou
2
李四 北京
sex:
0
男
1
女
实现三张表的关联查询:
sql:select count(*) from login l,user u,sex s where l.外键=u.主键 and l.外键=s.主键 group by u.主键 ;
就是求每个用户一段时间内的登陆次数.
代码:
package youling.studio.mapjoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Main extends Configured implements Tool{
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//map短关联操作小表的缓存
private Map<String,String> userMap = new HashMap<String,String>();
private Map<String,String> sexMap = new HashMap<String,String>();
private Text outputKey = new Text();
private Text outputValue = new Text();
private String[] kv;
//map类初始化时执行,资源准备
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context){
BufferedReader in = null;
try{
//获取缓存文件列表
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String userLine = null;
String sexLine = null;
for (Path path : localCacheFiles) {
if(path.toString().contains("user")){
//读user的缓存文件
in = new BufferedReader(new InputStreamReader(new FileInputStream("USER")));
while(null!=(userLine = in.readLine())){
userMap.put(userLine.split("\t",-1)[0], userLine.split("\t",-1)[1]);
}
}else if(path.toString().contains("sex")){
//读sex的缓存文件
in = new BufferedReader(new InputStreamReader(new FileInputStream("SEX")));
while(null!=(sexLine=in.readLine())){
sexMap.put(sexLine.split("\t",-1)[0], sexLine.split("\t",-1)[1]);
}
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
if(in!=null){
in.close();
}
}catch(Exception e1){
e1.printStackTrace();
}
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
kv = value.toString().split("\t");
//符合条件的进行关联,过滤掉不合条件的
if(userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])){
outputKey.set(userMap.get(kv[0])+"\t"+sexMap.get(kv[1]));
outputValue.set("1");
context.write(outputKey, outputValue);
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
private Text outputValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int count = 0;
for (Text text : value) {
count++ ;//计数
}
outputValue.set(count+"");
context.write(key, outputValue);
}
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(),"MyJob");
job.setJobName("myjob");
job.setJarByClass(Main.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String[] arg = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
//缓存文件,第一第二个输入
String uri1 = new Path(arg[0]).toUri().toString()+"#"+"SEX";
String uri2 = new Path(arg[1]).toUri().toString()+"#"+"USER";
DistributedCache.addCacheFile(new URI(uri1), job.getConfiguration());
DistributedCache.addCacheFile(new URI(uri2), job.getConfiguration());
System.out.println(new Path(arg[1]).toUri());
FileInputFormat.addInputPath(job, new Path(arg[2]));
FileOutputFormat.setOutputPath(job, new Path(arg[3]));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Main(),args);
System.exit(res);
}
}
使用hadoop-eclipse插件运行:
添加命令行参数: hdfs://192.168.1.186:8000/user/root/mapjoin/in/sex hdfs://192.168.1.186:8000/user/root/mapjoin/in/user hdfs://192.168.1.186:8000/user/root/mapjoin/in/login hdfs://192.168.1.186:8000/user/root/mapjoin/out/
或者打成jar包:
hadoop jar Main.jar main方法的全类名 hdfs://192.168.1.186:8000/user/root/mapjoin/in/sex hdfs://192.168.1.186:8000/user/root/mapjoin/in/user hdfs://192.168.1.186:8000/user/root/mapjoin/in/login hdfs://192.168.1.186:8000/user/root/mapjoin/out/
结果:
rolin
男 4
李四
男 2
王五
女 2
赵六
女 2
源码下载地址:
pan.baidu.com/s/1bnqzBxP
原因:之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash
table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
表文件:
login:
1 0
20121213
2 0
20121213
3 1
20121213
4 1
20121213
1 0
20121114
2 0
20121114
3 1
20121114
4 1
20121114
1 0
20121213
1 0
20121114
9 0
20121114
user:
1
rolin 北京
3
王五 tianjin
4
赵六 guangzhou
2
李四 北京
sex:
0
男
1
女
实现三张表的关联查询:
sql:select count(*) from login l,user u,sex s where l.外键=u.主键 and l.外键=s.主键 group by u.主键 ;
就是求每个用户一段时间内的登陆次数.
代码:
package youling.studio.mapjoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Main extends Configured implements Tool{
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
//map短关联操作小表的缓存
private Map<String,String> userMap = new HashMap<String,String>();
private Map<String,String> sexMap = new HashMap<String,String>();
private Text outputKey = new Text();
private Text outputValue = new Text();
private String[] kv;
//map类初始化时执行,资源准备
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context){
BufferedReader in = null;
try{
//获取缓存文件列表
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String userLine = null;
String sexLine = null;
for (Path path : localCacheFiles) {
if(path.toString().contains("user")){
//读user的缓存文件
in = new BufferedReader(new InputStreamReader(new FileInputStream("USER")));
while(null!=(userLine = in.readLine())){
userMap.put(userLine.split("\t",-1)[0], userLine.split("\t",-1)[1]);
}
}else if(path.toString().contains("sex")){
//读sex的缓存文件
in = new BufferedReader(new InputStreamReader(new FileInputStream("SEX")));
while(null!=(sexLine=in.readLine())){
sexMap.put(sexLine.split("\t",-1)[0], sexLine.split("\t",-1)[1]);
}
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
try{
if(in!=null){
in.close();
}
}catch(Exception e1){
e1.printStackTrace();
}
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
kv = value.toString().split("\t");
//符合条件的进行关联,过滤掉不合条件的
if(userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])){
outputKey.set(userMap.get(kv[0])+"\t"+sexMap.get(kv[1]));
outputValue.set("1");
context.write(outputKey, outputValue);
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
private Text outputValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int count = 0;
for (Text text : value) {
count++ ;//计数
}
outputValue.set(count+"");
context.write(key, outputValue);
}
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(),"MyJob");
job.setJobName("myjob");
job.setJarByClass(Main.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String[] arg = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
//缓存文件,第一第二个输入
String uri1 = new Path(arg[0]).toUri().toString()+"#"+"SEX";
String uri2 = new Path(arg[1]).toUri().toString()+"#"+"USER";
DistributedCache.addCacheFile(new URI(uri1), job.getConfiguration());
DistributedCache.addCacheFile(new URI(uri2), job.getConfiguration());
System.out.println(new Path(arg[1]).toUri());
FileInputFormat.addInputPath(job, new Path(arg[2]));
FileOutputFormat.setOutputPath(job, new Path(arg[3]));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Main(),args);
System.exit(res);
}
}
使用hadoop-eclipse插件运行:
添加命令行参数: hdfs://192.168.1.186:8000/user/root/mapjoin/in/sex hdfs://192.168.1.186:8000/user/root/mapjoin/in/user hdfs://192.168.1.186:8000/user/root/mapjoin/in/login hdfs://192.168.1.186:8000/user/root/mapjoin/out/
或者打成jar包:
hadoop jar Main.jar main方法的全类名 hdfs://192.168.1.186:8000/user/root/mapjoin/in/sex hdfs://192.168.1.186:8000/user/root/mapjoin/in/user hdfs://192.168.1.186:8000/user/root/mapjoin/in/login hdfs://192.168.1.186:8000/user/root/mapjoin/out/
结果:
rolin
男 4
李四
男 2
王五
女 2
赵六
女 2
源码下载地址:
pan.baidu.com/s/1bnqzBxP
相关文章推荐
- 基于mapreduce的Hadoop join实现分析(一)
- 基于mapreduce的 Hadoop join 实现分析(二)
- 基于mapreduce的Hadoop join实现
- 基于mapreduce的Hadoop join实现
- 基于mapreduce的Hadoop join实现分析(一)
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- MapReduce map side join实例
- MapReduce实现CommonJoin和MapJoin
- MapReduce之Map端Join实现
- Hadoop 多表 join:map side join 范例
- Hadoop中MapReduce多种join实现实例分析
- Spark map-side-join 关联优化
- Hadoop 2.6 使用MapReduce实现基于物品的推荐系统
- Hadoop 2.6 使用MapReduce实现基于用户的推荐系统
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- [置顶] Hadoop伪分布安装详解+MapReduce运行原理+基于MapReduce的KNN算法实现
- Hadoop MapReduce处理海量小文件:基于CombineFileInputFormat(每次往map中读入1行)
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- 用Hadoop流实现mapreduce版推荐系统基于物品的协同过滤算法
- Hadoop中MapReduce实现join多种实例分析