您的位置:首页 > 运维架构

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

2017-09-25 17:40 465 查看
1:首先搞好实体类对象:

  write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

package com.areapartition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/***
*
* @author Administrator
* 1:write 是把每个对象序列化到输出流
* 2:readFields是把输入流字节反序列化
* 3:实现WritableComparable
*      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
*
*/
public class FlowBean implements WritableComparable<FlowBean>{

private String phoneNumber;//电话号码
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量

public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

//为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNumber, long upFlow, long downFlow) {
this.phoneNumber = phoneNumber;
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
public FlowBean() {
}

//重写toString()方法
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
}

//从数据流中反序列出对象的数据
//从数据流中读取字段时必须和序列化的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {
phoneNumber = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();

}

//将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNumber);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);

}

//流量比较的实现方法
@Override
public int compareTo(FlowBean o) {

//大就返回-1,小于等于返回1,进行倒序排序
return sumFlow > o.sumFlow ? -1 : 1;
}

}


2:流量分区处理操作的步骤:

   2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;

   2.2:需要自定义改造两个机制:

    2.2.1:改造分区的逻辑,自定义一个partitioner

    2.2.2:自定义reducer task的并发任务数

package com.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/***
* 流量分区处理操作
* @author Administrator
* 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;
* 2:需要自定义改造两个机制:
*      2.1:改造分区的逻辑,自定义一个partitioner
*      2.2:自定义reducer task的并发任务数
*/
public class FlowSumArea {

public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//拿到一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, "\t");

//获取到我们需要的字段
String phoneNumber = fields[1];
long up_flow = Long.parseLong(fields[7]);
long down_flow = Long.parseLong(fields[8]);

//封装成key-value并且输出
context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
}
}

public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
//遍历求和
long up_flowSum = 0;
long down_flowSum = 0;
for(FlowBean fb : values){
up_flowSum += fb.getUpFlow();
down_flowSum += fb.getDownFlow();
}

//封装成key-value并且输出
context.write(key, new FlowBean(key.toString(),up_flowSum,down_flowSum));
}

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);

//设置整个job所用的那些类在哪个jar包
job.setJarByClass(FlowSumArea.class);
//本job使用的mapper和reducer的类
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);

//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);

//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

//指定reduce的输出数据key-value类型Text
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

//设置reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(7);

//指定要处理的输入数据存放路径
//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
FileInputFormat.setInputPaths(job, new Path(args[0]));

//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//将job提交给集群运行
//job.waitForCompletion(true);
//正常执行成功返回0,否则返回1
System.exit(job.waitForCompletion(true) ? 0 : 1);;

}

}


3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:

  3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

  3.2:HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

package com.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY,VALUE> extends Partitioner<KEY, VALUE>{

private static HashMap<String, Integer> areaMap = new HashMap<String,Integer>();

static{
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);
areaMap.put("841", 5);
}

@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号
Integer areaCoder = areaMap.get(key.toString().subSequence(0, 3)) == null ? 6 : areaMap.get(key.toString().subSequence(0, 3));

return areaCoder;
}

}


4:将打好的jar包上传到虚拟机上面:

然后启动搭建的集群start-dfs.sh,start-yarn.sh:

然后操作如下所示:

[root@master hadoop]# hadoop jar flowarea.jar com.areapartition.FlowSumArea /flow/data /flow/areaoutput4
17/09/25 15:36:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/25 15:36:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/25 15:36:38 INFO input.FileInputFormat: Total input paths to process : 1
17/09/25 15:36:38 INFO mapreduce.JobSubmitter: number of splits:1
17/09/25 15:36:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506324201206_0004
17/09/25 15:36:38 INFO impl.YarnClientImpl: Submitted application application_1506324201206_0004
17/09/25 15:36:38 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1506324201206_0004/ 17/09/25 15:36:38 INFO mapreduce.Job: Running job: job_1506324201206_0004
17/09/25 15:36:43 INFO mapreduce.Job: Job job_1506324201206_0004 running in uber mode : false
17/09/25 15:36:43 INFO mapreduce.Job:  map 0% reduce 0%
17/09/25 15:36:48 INFO mapreduce.Job:  map 100% reduce 0%
17/09/25 15:36:56 INFO mapreduce.Job:  map 100% reduce 14%
17/09/25 15:37:04 INFO mapreduce.Job:  map 100% reduce 29%
17/09/25 15:37:08 INFO mapreduce.Job:  map 100% reduce 43%
17/09/25 15:37:10 INFO mapreduce.Job:  map 100% reduce 71%
17/09/25 15:37:11 INFO mapreduce.Job:  map 100% reduce 86%
17/09/25 15:37:12 INFO mapreduce.Job:  map 100% reduce 100%
17/09/25 15:37:12 INFO mapreduce.Job: Job job_1506324201206_0004 completed successfully
17/09/25 15:37:12 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1158
FILE: Number of bytes written=746635
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2322
HDFS: Number of bytes written=526
HDFS: Number of read operations=24
HDFS: Number of large read operations=0
HDFS: Number of write operations=14
Job Counters
Launched map tasks=1
Launched reduce tasks=7
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2781
Total time spent by all reduces in occupied slots (ms)=98540
Total time spent by all map tasks (ms)=2781
Total time spent by all reduce tasks (ms)=98540
Total vcore-seconds taken by all map tasks=2781
Total vcore-seconds taken by all reduce tasks=98540
Total megabyte-seconds taken by all map tasks=2847744
Total megabyte-seconds taken by all reduce tasks=100904960
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1158
Input split bytes=93
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1158
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =7
Failed Shuffles=0
Merged Map outputs=7
GC time elapsed (ms)=1751
CPU time spent (ms)=4130
Physical memory (bytes) snapshot=570224640
Virtual memory (bytes) snapshot=2914865152
Total committed heap usage (bytes)=234950656
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2229
File Output Format Counters
Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/
Found 10 items
drwxr-xr-x   - root supergroup          0 2017-09-25 15:25 /flow/areaoutput
drwxr-xr-x   - root supergroup          0 2017-09-25 15:34 /flow/areaoutput2
drwxr-xr-x   - root supergroup          0 2017-09-25 15:35 /flow/areaoutput3
drwxr-xr-x   - root supergroup          0 2017-09-25 15:37 /flow/areaoutput4
-rw-r--r--   1 root supergroup       2229 2017-09-20 10:00 /flow/data
drwxr-xr-x   - root supergroup          0 2017-09-20 09:35 /flow/output
drwxr-xr-x   - root supergroup          0 2017-09-20 09:47 /flow/output2
drwxr-xr-x   - root supergroup          0 2017-09-20 10:01 /flow/output3
drwxr-xr-x   - root supergroup          0 2017-09-20 10:21 /flow/output4
drwxr-xr-x   - root supergroup          0 2017-09-21 19:32 /flow/sortoutput
[root@master hadoop]# hadoop fs -ls /flow/areaoutput4
Found 8 items
-rw-r--r--   1 root supergroup          0 2017-09-25 15:37 /flow/areaoutput4/_SUCCESS
-rw-r--r--   1 root supergroup         77 2017-09-25 15:36 /flow/areaoutput4/part-r-00000
-rw-r--r--   1 root supergroup         49 2017-09-25 15:37 /flow/areaoutput4/part-r-00001
-rw-r--r--   1 root supergroup        104 2017-09-25 15:37 /flow/areaoutput4/part-r-00002
-rw-r--r--   1 root supergroup         22 2017-09-25 15:37 /flow/areaoutput4/part-r-00003
-rw-r--r--   1 root supergroup        102 2017-09-25 15:37 /flow/areaoutput4/part-r-00004
-rw-r--r--   1 root supergroup         24 2017-09-25 15:37 /flow/areaoutput4/part-r-00005
-rw-r--r--   1 root supergroup        148 2017-09-25 15:37 /flow/areaoutput4/part-r-00006
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00000
13502468823    102    7335    7437
13560436666    954    200    1154
13560439658    5892    400    6292
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00001
13602846565    12    1938    1950
13660577991    9    6960    6969
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00002
13719199419    0    200    200
13726230503    2481    24681    27162
13726238888    2481    24681    27162
13760778710    120    200    320
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00003
13826544101    0    200    200
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00004
13922314466    3008    3720    6728
13925057413    63    11058    11121
13926251106    0    200    200
13926435656    1512    200    1712
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00005
84138413    4116    1432    5548
[root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00006
13480253104    180    200    380
15013685858    27    3659    3686
15920133257    20    3156    3176
15989002119    3    1938    1941
18211575961    12    1527    1539
18320173382    18    9531    9549


5:复制多份测试数据操作如下,测试map的多线程执行:

  5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。

  5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。

  5.3:切片的具体大小应该根据所处理的文件的大小来调整。

[root@master hadoop]# hadoop fs -mkdir /flow/data/
[root@master hadoop]# hadoop fs -put HTTP_20130313143750.dat /flow/data/
[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.2
[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.3
[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.4
[root@master hadoop]# hadoop fs -ls /flow/data/
Found 4 items
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat.2
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.3
-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.4
[root@master hadoop]#


6:Combiners编程

  6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

  6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

  6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

  6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

7:shuffle机制:

   7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

   7.2:写磁盘前,要partition(分组),sort(排序)。如果有combiner,combine排序后数据。

   7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

   7.4:Reducer通过Http方式得到输出文件的分区。

   7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。

   7.6:排序阶段合并map输出。然后走Reduce阶段。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: