您的位置:首页 > 运维架构

Hadoop 学习研究(五): hadoop中的join操作

2017-05-27 20:22 417 查看
Hadoop中的Join操作:

考虑如下问题:

假设有两个数据集:一个是城市名称编号,一个是日期和产出,考虑如何将这两个不同的数据集合二为一。或者有如下需求:获取某个城市在指定年份的产出等等问题。

需要涉及到多个数据集或者不同的日志类型文件。

连接操作的具体实现技术取决于数据集的规模大小和分区情况。

reduce side join:

假设要进行join的数据分别来自File1和File2.

reduce side join是一种最简单的join方式,其主要思想如下:

       在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

       在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

整个计算过程是:

(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。

(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。



代码示例:

Map阶段:
// 处理来自表A的记录  

            if (filePath.contains("m_ys_lab_jointest_a")) {  

               
String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
             .........................

                 out.write(new Text(id), new Text("a#"+name));  

            }  
// 处理来自表B的记录  

            else if (filePath.contains("m_ys_lab_jointest_b")) {  

                String[] values = line.split(DELIMITER); // 按分隔符分割出字段  

                if (values.length < 3) return;  

                    .........................                           

               output.write(new Text(id), new Text("b#"+statyear+DELIMITER+num));  

                  }  

 //reduce过程  

         public static class Reduce<.........>{ 

                   Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  

                   Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  

              

               while (values.hasNext()) {  

                String value = values.next().toString();  

                if (value.startsWith("a#")) {  

                    vecA.add(value.substring(2));  

                      } else if (value.startsWith("b#")) {  

                           vecB.add(value.substring(2));  

                }  

               }  

                       .........................................

            // 遍历两个向量  

             int i=0, j=0;  

               for (i = 0; i < sizeA; i ++) {  

                    for (j = 0; j < sizeB; j ++) {  

                        output.write(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  

                    }  

                   }   

Map side join:(大表对小表)

        之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

  Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

这个地方就涉及到:如何在Hadoop架构中实现全局变量(也就是如何让每个map task都能缓存小表)

对于大型的数据:

         (1)可以使用分布式缓存:DistributedCache.addCacheFile()指定要复制的文件,在每个Task任务开始之前,会将必要的运行jar包、.xml配置文件和分布式缓存文件下载到localFS。

         (2)用户可以使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

示例如下:

1、在提交job的时候,把小数据通过DistributedCache分发到各个节点。

2、map端使用DistributedCache读到数据,在内存中构建映射关系,处于效率的 考虑多数使用hashtable。

3、map()函数中,对每一对,根据key到第2)步构建的映射里面中找出数据,进行连接,输出。

                                     

Map:
protected void setup(Context context)          
FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL),
context.getConfiguration());
FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));

        ................................       
BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));

            ................................        
CUSTOMER_MAP.put(bean.getCustId(), bean);

protected void map(LongWritable key, Text value, Context context)              

   // 格式: 订单编号 客户编号    订单金额

        int custId = Integer.parseInt(cols[1]);     // 取出客户编号

         CustomerBean customerBean = CUSTOMER_MAP.get(custId);

            

         if (customerBean == null){ //没有对应的customer信息可以连接

                 return;   }

                      ............................            

         context.write(outputKey, outputValue);

  

SemiJoin:(大表对大表)

SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。

        实现方法很简单:选取其中一个表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,(需要考虑这个表中有用的字段大小必须在可接受的范围内)File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个Task Container中,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。

这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。

        另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: