您的位置:首页 > 编程语言

MapReduce数据类型、二次排序及Join编程

2018-02-23 12:24 218 查看
1、数据类型

      MapReduce中的所有数据类型都统一地实现了writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储。当程序间传递对象或者持久化对象的时候,就需要序列化对象成字节流,writable就是Hadoop中序列化的格式。

      BooleanWritable:标准布尔型数值                        ByteWritable:单字节数值

      DoubleWritable:双字节数值                                 FloatWritable:浮点数

      IntWritable:整型数                                                LongWritable:长整型数

      Text:使用UTF8格式存储的文本                              NullWritable:当<key,value>中的key或value为空时使用

      Writable   - value

               write()是把每个对象序列化到输出流,readFields()是把输入流字节反序列化。

      WritableComparable    - key  必须要实现。

      对象比较需要使用比较器,comparable就是一个排序接口,如果一个类实现了它,就代表着这个类支持排序。两个对象比较就是两个值比较。

      Java值对象的比较:重写toString()、hashCode()、equals()方法。

2、二次排序

      MapReduce框架无论是默认排序还是自定义排序,都是只对于key值进行排序的,当我们用第一个字段排序后,还需要对第二个字段排序时,需要用到二次排序。

      需求:对于reduce输出结果进行二次排序。

      每个分片对应一个map任务,shuffle进入内存缓冲区(分区、排序、合并),将数据传到reduce。

      排序特性:对于第一个字段排序,将第一个作为key,第一个字段相同,第二个就会放在一起。

      排序消耗的是资源(CPU、内存),主要指的是内存。

      将两个字段放到一个key中,组合一个新的key。

      例:a,1->a#1,1

      保证原来的第一个字段进行分组,当key的第一个字段相同,就代表key相同。优势:利用了MapReduce中的shuffle过程。

      实现二次排序:

      (1)组合key,key是一个组合的字段,自定义数据类型,继承WritableComparable接口。

      (2)保证原来的分组,需要自定义分组规则,需要继承RawComparator,在Job中使用setGroupingComparatorClass设置。在Reduce阶段,构造一个与Key相对应的Value迭代器的时候,只要first相同就属于同一个组,放在一个Value迭代器中。

      (3)保证原来的分区,需要自定义分区规则,继承Partitioner接口,此为Key的第一次比较,在Job中使用setPartitionerClass设置。hashPartitioner分区是MapReduce框架默认的分区计算方法。

      默认情况下reduce的数目是1个,但是可以自定义数目。通过测试来设置数目,我们通常会将所有的任务规划到一个时间区域内。

3、RawComparator

      Hadoop为序列化提供了优化,类型的比较对MapReduce而言至关重要,Key和Key的比较也是在排序阶段完成的,hadoop提供了原生的比较器接口RawComparator<T>用于序列化字节间的比较,该接口允许其实现直接比较数据流中的记录,无需反序列化为对象,RawComparator是一个原生的优化接口类,它只是简单的提供了用于数据流中简单的数据对比方法,从而提供优化。

      该类并非被多数的衍生类所实现,其具体的子类为WritableComparator,多数情况下是作为实现Writable接口的类的内置类,提供序列化字节的比较。类似于一个注册表,里面记录了所有Comparator类的集合,Comparators成员用一张Hash表记录Key=Class,value=WritableComparator的注册信息。

4、MapReduce Join编程

      (1)设计<key, value>,两张表中的相同字段作为一个公共的连接点,作为map输出的key。

      (2)setup()准备工作-->map()-->cleanup()清理工作

               stepup():在map没有处理之前,先将小表的数据读到并存储到HDFS中,<cid, customerinfo>

               map():读取大表,<cid, orderinfo>

               map join适合数据量较小的情形

               map端输出:

                     customer

                           <cid, cInfo>

                     order

                           <cid, orderInfo>

      (3)reduce join,可以叫shuffle join

               设置一个标识,判断是用户信息还是订单信息。

               reduce端join:

                     customer

                           <cid, cInfo>

                                  tag: customer

                                  data: cInfo

                     order

                           <cid, orderInfo>

                                  tag: order

                                  data: orderInfo

               MySQL中导出的数据文件都是CSV文件。

               reduce端输出:<cid, list(cInfo, orderInfo, orderInfo, orderInfo,orderInfo,...)>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: