Hadoop系列学习–Partitioner内置分区与Partitioner自定义分区
2015-05-15 14:58
399 查看
Hadoop系列学习–Partitioner内置分区与Partitioner自定义分区
MapReduce的编程灵活性很高,其中Partitioner分区函数的作用也很重要。
Partitioner分区函数的作用:
根据业务的需求,灵活的根据业务输出多个日志
多个Reduce并发处理日志,提高工作运行的效率
使数据能够均匀分布在reduce上进行操作,避免产生热点区域。
**
默认的Partitioner分区
**
默认的partitioner是HashPartitioner,他对每条记录的键进行哈希操作以决定该记录应该属于哪个分区。每个分区对应一个reducer任务,所以分区数等于作业的reducer的个数。
自定义Partitioner分区
默认分区具有限制性,由于它是根据HashCode的值去分区,而有一些业务,例如我们要将日志的11位数与不是11位数的输出日志分开。例如:
我们需要将key是11位数的分到一个reduce任务,不是11位数的分到其他的reduce中。
自定义Partitioner完整分区代码:
得到的日志为
MapReduce的编程灵活性很高,其中Partitioner分区函数的作用也很重要。
Partitioner分区函数的作用:
根据业务的需求,灵活的根据业务输出多个日志
多个Reduce并发处理日志,提高工作运行的效率
使数据能够均匀分布在reduce上进行操作,避免产生热点区域。
**
默认的Partitioner分区
**
默认的partitioner是HashPartitioner,他对每条记录的键进行哈希操作以决定该记录应该属于哪个分区。每个分区对应一个reducer任务,所以分区数等于作业的reducer的个数。
public class HashPartitioner<K,V> extends Partitioner<K,V>{ @Override public int getPartition(K k, V v, int numPartitions) { return (k.hashCode() & Integer.MAX_VALUE) % numPartitions; } }
自定义Partitioner分区
默认分区具有限制性,由于它是根据HashCode的值去分区,而有一些业务,例如我们要将日志的11位数与不是11位数的输出日志分开。例如:
log1: 1111111111 aaa 1111111111 aaa1 1111111111 aaa14 1111111111 aaa2 11111111112 aaa3 log2: 222222 aaa 222222 aaa1 222222 aaa14 222222 aaa2 2222222 aaa3
我们需要将key是11位数的分到一个reduce任务,不是11位数的分到其他的reduce中。
/*Partition分区*/ public static class ParitionParition<K,V> extends Partitioner<K,V> { @Override public int getPartition(K k, V v, int i) { int length = k.toString().length(); if(length == 11){ return 0; } else { return 1; } } }
自定义Partitioner完整分区代码:
/**
* 分区
* 通过Partition使Maper的输出均匀分配到Reduce中
* 哪个key分配到哪个reduce中,
* Created with IntelliJ IDEA.
* User: Administrator
* Date: 15-5-14
* Time: 下午5:02
* To change this template use File | Settings | File Templates.
*/
public class PartitionerTest {
private static String keystr;
private static String valuestr;
/*Map*/
public static class PartitionMap extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
while (stringTokenizer.hasMoreTokens()){
keystr = stringTokenizer.nextToken();
valuestr = stringTokenizer.nextToken();
}
LongWritable keywritable = new LongWritable(Long.parseLong(keystr));
context.write(keywritable, new Text(valuestr));
}
}
/*Reduce*/
public static class PartitionReduce extends Reducer<LongWritable, Text, LongWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values){
context.write(key, new Text("1"));
}
}
}
/*Partition分区*/ public static class ParitionParition<K,V> extends Partitioner<K,V> { @Override public int getPartition(K k, V v, int i) { int length = k.toString().length(); if(length == 11){ return 0; } else { return 1; } } }
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("mapred.jar", "test1-1.0-SNAPSHOT-jar-with-dependencies.jar");
configuration.set("fs.default.name", "hdfs://127.0.0.1:9000");//namenode的地址
configuration.set("mapred.job.tracker", "127.0.0.1:9001");
String [] arg = new String[]{"/user/4xt0fuqksoqhq9p/partitioninput","/partitionoutput"};
String [] otherArgs = new GenericOptionsParser(configuration, arg).getRemainingArgs();
if(otherArgs.length < 2){
System.out.println("Use <Word> <Count>");
System.exit(2);
}
Job job = new Job(configuration, "ParitionerTest");
job.setJarByClass(PartitionerTest.class);
job.setMapperClass(PartitionMap.class);
job.setCombinerClass(PartitionReduce.class);
job.setReducerClass(PartitionReduce.class);
//Partition
job.setPartitionerClass(ParitionParition.class);
//
job.setNumReduceTasks(2);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ?0:1);
}
}
得到的日志为
相关文章推荐
- Hadoop概念学习系列之Hadoop、Spark学习路线
- hadoop学习系列1之在eclipse中手动编译hadoop-eclipse-plugin-1.1.1.jar包
- Hadoop概念学习系列之Hadoop新手学习指导之hadoop核心知识学习(二十一)
- hadoop 入门学习系列十----oozie安装
- 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解
- 【Hadoop入门学习系列之二】HDFS架构和编程
- 学习Hadoop不错的系列文章
- 学习Hadoop不错的系列文章
- Hadoop 从零开始学习系列-hadoop版本升级之文件迁移
- Hadoop学习笔记系列文章导航
- Hadoop学习笔记系列文章导航
- hadoop 学习总结系列 (一)
- Hadoop家族学习路线图(干货系列)
- [置顶] 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解
- hadoop入门学习系列之二hadoop的mapreduce的wordcount流程介绍
- hadoop系列学习之WordCount运行详解
- 大数据学习系列之一 ----- Hadoop环境搭建(单机)
- 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce
- 学习Hadoop不错的系列文章
- hadoop 入门学习系列七-----hadoop集群搭建