一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
2017-09-25 17:40
465 查看
1:首先搞好实体类对象:
write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
2:流量分区处理操作的步骤:
2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;
2.2:需要自定义改造两个机制:
2.2.1:改造分区的逻辑,自定义一个partitioner
2.2.2:自定义reducer task的并发任务数
3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:
3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
3.2:HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
4:将打好的jar包上传到虚拟机上面:
然后启动搭建的集群start-dfs.sh,start-yarn.sh:
然后操作如下所示:
5:复制多份测试数据操作如下,测试map的多线程执行:
5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。
5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。
5.3:切片的具体大小应该根据所处理的文件的大小来调整。
6:Combiners编程
6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
7:shuffle机制:
7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
7.2:写磁盘前,要partition(分组),sort(排序)。如果有combiner,combine排序后数据。
7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
7.4:Reducer通过Http方式得到输出文件的分区。
7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
7.6:排序阶段合并map输出。然后走Reduce阶段。
write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
package com.areapartition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; /*** * * @author Administrator * 1:write 是把每个对象序列化到输出流 * 2:readFields是把输入流字节反序列化 * 3:实现WritableComparable * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 * */ public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNumber;//电话号码 private long upFlow;//上行流量 private long downFlow;//下行流量 private long sumFlow;//总流量 public String getPhoneNumber() { return phoneNumber; } public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //为了对象数据的初始化方便,加入一个带参的构造函数 public FlowBean(String phoneNumber, long upFlow, long downFlow) { this.phoneNumber = phoneNumber; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数 public FlowBean() { } //重写toString()方法 @Override public String toString() { return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; } //从数据流中反序列出对象的数据 //从数据流中读取字段时必须和序列化的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNumber = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } //将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNumber); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //流量比较的实现方法 @Override public int compareTo(FlowBean o) { //大就返回-1,小于等于返回1,进行倒序排序 return sumFlow > o.sumFlow ? -1 : 1; } }
2:流量分区处理操作的步骤:
2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;
2.2:需要自定义改造两个机制:
2.2.1:改造分区的逻辑,自定义一个partitioner
2.2.2:自定义reducer task的并发任务数
package com.areapartition; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /*** * 流量分区处理操作 * @author Administrator * 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件; * 2:需要自定义改造两个机制: * 2.1:改造分区的逻辑,自定义一个partitioner * 2.2:自定义reducer task的并发任务数 */ public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //获取到我们需要的字段 String phoneNumber = fields[1]; long up_flow = Long.parseLong(fields[7]); long down_flow = Long.parseLong(fields[8]); //封装成key-value并且输出 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); } } public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //遍历求和 long up_flowSum = 0; long down_flowSum = 0; for(FlowBean fb : values){ up_flowSum += fb.getUpFlow(); down_flowSum += fb.getDownFlow(); } //封装成key-value并且输出 context.write(key, new FlowBean(key.toString(),up_flowSum,down_flowSum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //创建配置文件 Configuration conf = new Configuration(); //获取一个作业 Job job = Job.getInstance(conf); //设置整个job所用的那些类在哪个jar包 job.setJarByClass(FlowSumArea.class); //本job使用的mapper和reducer的类 job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class); //指定mapper的输出数据key-value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //指定reduce的输出数据key-value类型Text job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置reduce的任务并发数,应该跟分组的数量保持一致 job.setNumReduceTasks(7); //指定要处理的输入数据存放路径 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类, //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定处理结果的输出数据存放路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job提交给集群运行 //job.waitForCompletion(true); //正常执行成功返回0,否则返回1 System.exit(job.waitForCompletion(true) ? 0 : 1);; } }
3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:
3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
3.2:HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
package com.areapartition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY,VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String, Integer> areaMap = new HashMap<String,Integer>(); static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); areaMap.put("841", 5); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号 Integer areaCoder = areaMap.get(key.toString().subSequence(0, 3)) == null ? 6 : areaMap.get(key.toString().subSequence(0, 3)); return areaCoder; } }
4:将打好的jar包上传到虚拟机上面:
然后启动搭建的集群start-dfs.sh,start-yarn.sh:
然后操作如下所示:
[root@master hadoop]# hadoop jar flowarea.jar com.areapartition.FlowSumArea /flow/data /flow/areaoutput4 17/09/25 15:36:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 17/09/25 15:36:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/09/25 15:36:38 INFO input.FileInputFormat: Total input paths to process : 1 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: number of splits:1 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506324201206_0004 17/09/25 15:36:38 INFO impl.YarnClientImpl: Submitted application application_1506324201206_0004 17/09/25 15:36:38 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1506324201206_0004/ 17/09/25 15:36:38 INFO mapreduce.Job: Running job: job_1506324201206_0004 17/09/25 15:36:43 INFO mapreduce.Job: Job job_1506324201206_0004 running in uber mode : false 17/09/25 15:36:43 INFO mapreduce.Job: map 0% reduce 0% 17/09/25 15:36:48 INFO mapreduce.Job: map 100% reduce 0% 17/09/25 15:36:56 INFO mapreduce.Job: map 100% reduce 14% 17/09/25 15:37:04 INFO mapreduce.Job: map 100% reduce 29% 17/09/25 15:37:08 INFO mapreduce.Job: map 100% reduce 43% 17/09/25 15:37:10 INFO mapreduce.Job: map 100% reduce 71% 17/09/25 15:37:11 INFO mapreduce.Job: map 100% reduce 86% 17/09/25 15:37:12 INFO mapreduce.Job: map 100% reduce 100% 17/09/25 15:37:12 INFO mapreduce.Job: Job job_1506324201206_0004 completed successfully 17/09/25 15:37:12 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1158 FILE: Number of bytes written=746635 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2322 HDFS: Number of bytes written=526 HDFS: Number of read operations=24 HDFS: Number of large read operations=0 HDFS: Number of write operations=14 Job Counters Launched map tasks=1 Launched reduce tasks=7 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=2781 Total time spent by all reduces in occupied slots (ms)=98540 Total time spent by all map tasks (ms)=2781 Total time spent by all reduce tasks (ms)=98540 Total vcore-seconds taken by all map tasks=2781 Total vcore-seconds taken by all reduce tasks=98540 Total megabyte-seconds taken by all map tasks=2847744 Total megabyte-seconds taken by all reduce tasks=100904960 Map-Reduce Framework Map input records=22 Map output records=22 Map output bytes=1072 Map output materialized bytes=1158 Input split bytes=93 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=1158 Reduce input records=22 Reduce output records=21 Spilled Records=44 Shuffled Maps =7 Failed Shuffles=0 Merged Map outputs=7 GC time elapsed (ms)=1751 CPU time spent (ms)=4130 Physical memory (bytes) snapshot=570224640 Virtual memory (bytes) snapshot=2914865152 Total committed heap usage (bytes)=234950656 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2229 File Output Format Counters Bytes Written=526 [root@master hadoop]# hadoop fs -ls /flow/ Found 10 items drwxr-xr-x - root supergroup 0 2017-09-25 15:25 /flow/areaoutput drwxr-xr-x - root supergroup 0 2017-09-25 15:34 /flow/areaoutput2 drwxr-xr-x - root supergroup 0 2017-09-25 15:35 /flow/areaoutput3 drwxr-xr-x - root supergroup 0 2017-09-25 15:37 /flow/areaoutput4 -rw-r--r-- 1 root supergroup 2229 2017-09-20 10:00 /flow/data drwxr-xr-x - root supergroup 0 2017-09-20 09:35 /flow/output drwxr-xr-x - root supergroup 0 2017-09-20 09:47 /flow/output2 drwxr-xr-x - root supergroup 0 2017-09-20 10:01 /flow/output3 drwxr-xr-x - root supergroup 0 2017-09-20 10:21 /flow/output4 drwxr-xr-x - root supergroup 0 2017-09-21 19:32 /flow/sortoutput [root@master hadoop]# hadoop fs -ls /flow/areaoutput4 Found 8 items -rw-r--r-- 1 root supergroup 0 2017-09-25 15:37 /flow/areaoutput4/_SUCCESS -rw-r--r-- 1 root supergroup 77 2017-09-25 15:36 /flow/areaoutput4/part-r-00000 -rw-r--r-- 1 root supergroup 49 2017-09-25 15:37 /flow/areaoutput4/part-r-00001 -rw-r--r-- 1 root supergroup 104 2017-09-25 15:37 /flow/areaoutput4/part-r-00002 -rw-r--r-- 1 root supergroup 22 2017-09-25 15:37 /flow/areaoutput4/part-r-00003 -rw-r--r-- 1 root supergroup 102 2017-09-25 15:37 /flow/areaoutput4/part-r-00004 -rw-r--r-- 1 root supergroup 24 2017-09-25 15:37 /flow/areaoutput4/part-r-00005 -rw-r--r-- 1 root supergroup 148 2017-09-25 15:37 /flow/areaoutput4/part-r-00006 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00000 13502468823 102 7335 7437 13560436666 954 200 1154 13560439658 5892 400 6292 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00001 13602846565 12 1938 1950 13660577991 9 6960 6969 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00002 13719199419 0 200 200 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 200 320 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00003 13826544101 0 200 200 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00004 13922314466 3008 3720 6728 13925057413 63 11058 11121 13926251106 0 200 200 13926435656 1512 200 1712 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00005 84138413 4116 1432 5548 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00006 13480253104 180 200 380 15013685858 27 3659 3686 15920133257 20 3156 3176 15989002119 3 1938 1941 18211575961 12 1527 1539 18320173382 18 9531 9549
5:复制多份测试数据操作如下,测试map的多线程执行:
5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。
5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。
5.3:切片的具体大小应该根据所处理的文件的大小来调整。
[root@master hadoop]# hadoop fs -mkdir /flow/data/ [root@master hadoop]# hadoop fs -put HTTP_20130313143750.dat /flow/data/ [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.2 [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.3 [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.4 [root@master hadoop]# hadoop fs -ls /flow/data/ Found 4 items -rw-r--r-- 1 root supergroup 2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat -rw-r--r-- 1 root supergroup 2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat.2 -rw-r--r-- 1 root supergroup 2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.3 -rw-r--r-- 1 root supergroup 2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.4 [root@master hadoop]#
6:Combiners编程
6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
7:shuffle机制:
7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
7.2:写磁盘前,要partition(分组),sort(排序)。如果有combiner,combine排序后数据。
7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
7.4:Reducer通过Http方式得到输出文件的分区。
7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
7.6:排序阶段合并map输出。然后走Reduce阶段。
相关文章推荐
- 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序
- 一脸懵逼学习Hive的安装(将sql语句翻译成MapReduce程序的一个工具)
- hadoop-2.7.1 MapReduce自定义分组的实现
- 从零开始学习Hadoop--第2章 第一个MapReduce程序
- Hadoop 学习笔记 (十) MapReduce实现排序 全局变量
- [转载]Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 使用Python实现Hadoop MapReduce程序
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- 使用Python实现Hadoop MapReduce程序
- 使用Python实现Hadoop MapReduce程序
- Hadoop-2.3.0学习(4)——第一个简单的mapreduce程序
- Hadoop-2.3.0学习(5)——mapreduce程序
- 关于SQLServer2005的学习笔记——自定义分组的实现
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- 使用Python实现Hadoop MapReduce程序
- Hadoop学习历程(四、运行一个真正的MapReduce程序)
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 关于hadoop的mapreduce编程中自定义key,value建立的类实现writable接口
- 使用Python实现Hadoop MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序(转)