自定义-Hadoop自定义分组Group
2016-08-21 10:13
519 查看
自定义分组MyGroup:
主要是继承WritableComparator类,重写compare函数
我这里重写的是该源码函数:
[java]
view plain
copy
print?
/** Compare two WritableComparables.
*
* <p> The default implementation uses the natural ordering, calling {@link
* Comparable#compareTo(Object)}. */
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
}
原始文件数据:
[java]
view plain
copy
print?
hadoop a
spark a
hive a
hbase a
tachyon a
storm a
redis a
代码:
[java]
view plain
copy
print?
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyGroup {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage databaseV1 <inputpath> <outputpath>");
}
Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1");
job.setJarByClass(MyGroup.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper1.class);
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setReducerClass(MyReducer1.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
}
public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] spl=value.toString().split("\t");
context.write(new Text(spl[0].trim()), new Text(spl[1].trim()));
}
}
public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
Long count=0L;
for (@SuppressWarnings("unused") Text v2 : v2s) {
count++;
context.write(new Text("in--"+k2), new Text(count.toString()));
}
context.write(new Text("out--"+k2), new Text(count.toString()));
}
}
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
p1.compareTo(p2);
return 0;
}
}
}
默认分组结果是这样的:
[java]
view plain
copy
print?
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
return p1.compareTo(p2);
}
}
相同Key为一组:
[java]
view plain
copy
print?
in--hadoop 1
out--hadoop 1
in--hbase 1
out--hbase 1
in--hive 1
out--hive 1
in--redis 1
out--redis 1
in--spark 1
out--spark 1
in--storm 1
out--storm 1
in--tachyon 1
out--tachyon 1
自定义后:
[java]
view plain
copy
print?
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
p1.compareTo(p2);
return 0;
}
}
所有key均为一组:
[java]
view plain
copy
print?
in--hadoop 1
in--hbase 2
in--hive 3
in--redis 4
in--spark 5
in--storm 6
in--tachyon 7
out--tachyon 7
主要是继承WritableComparator类,重写compare函数
我这里重写的是该源码函数:
[java]
view plain
copy
print?
/** Compare two WritableComparables.
*
* <p> The default implementation uses the natural ordering, calling {@link
* Comparable#compareTo(Object)}. */
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
}
原始文件数据:
[java]
view plain
copy
print?
hadoop a
spark a
hive a
hbase a
tachyon a
storm a
redis a
代码:
[java]
view plain
copy
print?
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyGroup {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage databaseV1 <inputpath> <outputpath>");
}
Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1");
job.setJarByClass(MyGroup.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper1.class);
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setReducerClass(MyReducer1.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.waitForCompletion(true);
}
public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] spl=value.toString().split("\t");
context.write(new Text(spl[0].trim()), new Text(spl[1].trim()));
}
}
public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
Long count=0L;
for (@SuppressWarnings("unused") Text v2 : v2s) {
count++;
context.write(new Text("in--"+k2), new Text(count.toString()));
}
context.write(new Text("out--"+k2), new Text(count.toString()));
}
}
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
p1.compareTo(p2);
return 0;
}
}
}
默认分组结果是这样的:
[java]
view plain
copy
print?
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
return p1.compareTo(p2);
}
}
相同Key为一组:
[java]
view plain
copy
print?
in--hadoop 1
out--hadoop 1
in--hbase 1
out--hbase 1
in--hive 1
out--hive 1
in--redis 1
out--redis 1
in--spark 1
out--spark 1
in--storm 1
out--storm 1
in--tachyon 1
out--tachyon 1
自定义后:
[java]
view plain
copy
print?
public static class MyGroupComparator extends WritableComparator{
public MyGroupComparator(){
super(Text.class,true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable a, WritableComparable b) {
Text p1 = (Text) a;
Text p2 = (Text) b;
p1.compareTo(p2);
return 0;
}
}
所有key均为一组:
[java]
view plain
copy
print?
in--hadoop 1
in--hbase 2
in--hive 3
in--redis 4
in--spark 5
in--storm 6
in--tachyon 7
out--tachyon 7
相关文章推荐
- Hadoop自定义分组Group
- hadoop自定义分组group
- Hadoop自定义分组Group
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- Hadoop实战-MapReduce之分组(group-by)统计(七)
- Hadoop 自定义排序,自定义分区,自定义分组
- Hadoop mapreduce自定义分组RawComparator
- hadoop自定义排序、分组、分区(温度统计)
- Hadoop——自定义数据类型,实现WritableComparable, 并且 分组,排序
- LXR_CHNGroup自定义分组与系统通讯录分组
- 流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
- 【Hadoop】Hadoop MR 自定义分组 Partition机制
- hadoop自定义分组 步骤1.4
- MapReduce自定义分组Group
- 自定义分组Group
- Hadoop之——自定义分组比较器实现分组功能
- hadoop提交作业自定义排序和分组
- 一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
- hadoop-2.7.1 MapReduce自定义分组的实现
- hadoop提交作业自定义排序和分组