您的位置:首页 > 其它

MapReduce中join操作流程

2018-03-01 23:16 197 查看

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。准备好下面两张表:(1)m_ys_lab_jointest_a(以下简称表A)建表语句为:[sql] view plain copycreate table if not exists m_ys_lab_jointest_a (  
     id bigint,  
     name string  
)  
row format delimited  
fields terminated by '9'  
lines terminated by '10'  
stored as textfile;  
数据:
id     name
1     北京
2     天津
3     河北
4     山西
5     内蒙古
6     辽宁
7     吉林
8     黑龙江
(2)m_ys_lab_jointest_b(以下简称表B)建表语句为:[sql] view plain copycreate table if not exists m_ys_lab_jointest_b (  
     id bigint,  
     statyear bigint,  
     num bigint  
)  
row format delimited  
fields terminated by '9'  
lines terminated by '10'  
stored as textfile;  
数据:
id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347
我们的目的是,以id为key做join操作,得到以下表:m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

计算模型

整个计算过程是:(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。如下图所示:


代码

代码如下:[java] view plain copyimport java.io.IOException;  
import java.util.HashMap;  
import java.util.Iterator;  
import java.util.Vector;  
  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapred.FileSplit;  
import org.apache.hadoop.mapred.JobConf;  
import org.apache.hadoop.mapred.MapReduceBase;  
import org.apache.hadoop.mapred.Mapper;  
import org.apache.hadoop.mapred.OutputCollector;  
import org.apache.hadoop.mapred.RecordWriter;  
import org.apache.hadoop.mapred.Reducer;  
import org.apache.hadoop.mapred.Reporter;  
  
/** 
 * MapReduce实现Join操作 
 */  
public class MapRedJoin {  
    public static final String DELIMITER = "\u0009"; // 字段分隔符  
      
    // map过程  
    public static class MapClass extends MapReduceBase implements  
            Mapper<LongWritable, Text, Text, Text> {  
                          
        public void configure(JobConf job) {  
            super.configure(job);  
        }  
          
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
                Reporter reporter) throws IOException, ClassCastException {  
            // 获取输入文件的全路径和名称  
            String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
            // 获取记录字符串  
            String line = value.toString();  
            // 抛弃空记录  
            if (line == null || line.equals("")) return;   
              
            // 处理来自表A的记录  
            if (filePath.contains("m_ys_lab_jointest_a")) {  
                String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
                if (values.length < 2) return;  
                  
                String id = values[0]; // id  
                String name = values[1]; // name  
                  
                output.collect(new Text(id), new Text("a#"+name));  
            }  
            // 处理来自表B的记录  
            else if (filePath.contains("m_ys_lab_jointest_b")) {  
                String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
                if (values.length < 3) return;  
                  
                String id = values[0]; // id  
                String statyear = values[1]; // statyear  
                String num = values[2]; //num  
                  
                output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
            }  
        }  
    }  
      
    // reduce过程  
    public static class Reduce extends MapReduceBase  
            implements Reducer<Text, Text, Text, Text> {  
        public void reduce(Text key, Iterator<Text> values,  
                OutputCollector<Text, Text> output, Reporter reporter)  
                throws IOException {  
                      
            Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  
            Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  
              
            while (values.hasNext()) {  
                String value = values.next().toString();  
                if (value.startsWith("a#")) {  
                    vecA.add(value.substring(2));  
                } else if (value.startsWith("b#")) {  
                    vecB.add(value.substring(2));  
                }  
            }  
              
            int sizeA = vecA.size();  
            int sizeB = vecB.size();  
              
            // 遍历两个向量  
            int i, j;  
            for (i = 0; i < sizeA; i ++) {  
                for (j = 0; j < sizeB; j ++) {  
                    output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
                }  
            }     
        }  
    }  
      
    protected void configJob(JobConf conf) {  
        conf.setMapOutputKeyClass(Text.class);  
        conf.setMapOutputValueClass(Text.class);  
        conf.setOutputKeyClass(Text.class);  
        conf.setOutputValueClass(Text.class);  
        conf.setOutputFormat(ReportOutFormat.class);  
    }  
}  

技术细节

下面说一下其中的若干技术细节:(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:[align=left]String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();[/align][align=left](2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。[/align][align=left](3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。[/align]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce join