您的位置:首页 > 运维架构

User Defined Hadoop DataType

2016-08-25 16:01 337 查看

User Defined Hadoop DataType

目录

User Defined Hadoop DataType

目录

需求

实现

运行

需求

有时候 Hadoop 内置的数据类型不能满足我们的要求,这个时候就需要自定义类型了。

假设输入文件是很多电话号码,每行一个:

13612345678
13051812535
13051812535
13912345677
13412345678


要求按照如下格式输出

13412345678 is subscribed from ***China Mobile***, appearing 1 times


其中的 China Mobile1,都是算出来的。

实现

需要一个电话号码类 TelNo,需要实现 WritableComparable 接口。

// TelNo.java
package com.stephen.hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TelNo implements WritableComparable<TelNo>{
private String no;
private String operator;
private Integer times;
private transient final int BEGINPOS = 0;
private transient final int ENDPOS = 3;

public TelNo() {}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(no);
out.writeUTF(operator);
out.writeInt(times);
}

@Override
public void readFields(DataInput in) throws IOException {
no = in.readUTF();
operator = in.readUTF();
times = in.readInt();
}

@Override
public int compareTo(TelNo o) {
return this.no.compareTo(o.getNo());
}

public boolean equals(Object o) {
if( !(o instanceof TelNo)) {
return false;
}
TelNo other = (TelNo) o;
return this.no.compareTo(other.getNo()) == 0;
}

public int hashCode() {
return no.hashCode();
}

public Integer getTimes() {
return times;
}

public void setTimes(Integer times) {
this.times = times;
}

public void setNo(String no) {
this.no = no;
}

public String getNo() {
return no;
}

public String getOperator() {
String header = no.substring(BEGINPOS, ENDPOS);
if (header.compareTo("130") >= 0) {
if (header.compareTo("135") <= 0) {
operator = "***China Mobile***";
} else if (header.compareTo("137") <= 0) {
operator = "***China Unicom***";
} else if (header.compareTo("139") <= 0) {
operator = "***China Telecom***";
} else {
operator = "***Invalid Operator***";
}
}
return operator;
}

@Override
public String toString() {
return "is subscribed from " + getOperator() + ", appearing " + times + " times";
}
}


MapReduce 实现如下(Partitioner 类没有使用)

// TelNoCategorizerTool.java
package com.stephen.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

public class TelNoCategorizerTool extends Configured implements Tool {

public static class TelNoMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private Text telno = new Text();
private final static LongWritable one = new LongWritable(1);

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String newkey = value.toString();
telno.set(newkey);
context.write(telno, one);
}
}

public static class TelNoReducer extends
Reducer<Text, LongWritable, Text, TelNo> {

private TelNo telNo = new TelNo();

public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
telNo.setNo(key.toString());
telNo.setTimes(sum);
context.write(key, telNo);
}
}

public static class OperatorPartitioner<K, V> extends Partitioner<K, V> {
private static final List<String> mobileNumList = new ArrayList<>();
private static final List<String> unicomNumList = new ArrayList<>();
private static final List<String> telecomNumList = new ArrayList<>();

static {
mobileNumList.add("130");
mobileNumList.add("131");
mobileNumList.add("132");
mobileNumList.add("133");
mobileNumList.add("134");
mobileNumList.add("135");

unicomNumList.add("136");
unicomNumList.add("137");

telecomNumList.add("138");
telecomNumList.add("139");
}

@Override
public int getPartition(K key, V value, int numReduceTasks) {
String telNoHead = key.toString().substring(0, 3);
if (mobileNumList.contains(telNoHead)) {
return 1;
} else if (unicomNumList.contains(telNoHead)) {
return 2;
} else if (telecomNumList.contains(telNoHead)) {
return 3;
} else {
return 0;
}
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();

Job job = Job.getInstance(conf, "Telno Categorizer");
job.setJarByClass(TelNoCategorizerTool.class);

job.setMapperClass(TelNoMapper.class);
job.setReducerClass(TelNoReducer.class);

// 只对 Mapper 生效
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

/**
* 这两个方法对 Mapper 和 Reducer 都生效
* 所以要在上面单独指定 Mapper 的Key 和 Value 的格式
* 没有 setReduceOutputKeyClass...方法
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TelNo.class);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.setPartitionerClass(OperatorPartitioner.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),
new TelNoCategorizerTool(), args);
System.exit(exitCode);
}
}


运行

执行一下:

hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool /user/stephen/input/ /user/stephen/output


查看结果:

hadoop fs -cat /user/stephen/output/part-r-00000

#output
13051812535 is subscribed from ***China Mobile***, appearing 2 times
13412345678 is subscribed from ***China Mobile***, appearing 1 times
13612345678 is subscribed from ***China Unicom***, appearing 1 times
13912345677 is subscribed from ***China Telecom***, appearing 1 times


如果想要分区:

hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool -D mapreduce.job.reduces=4 /user/stephen/input/ /user/stephen/output


能看到 3 个文件(使用了 LazyOutputFormat,不会输出空记录),分别包含了分区后的记录。

hadoop fs -ls /user/stephen/output/

#output
-rw-r--r--   1 root supergroup          0 2016-08-26 13:48 /user/stephen/output/_SUCCESS
-rw-r--r--   1 root supergroup        144 2016-08-26 13:48 /user/stephen/output/part-r-00001
-rw-r--r--   1 root supergroup         72 2016-08-26 13:48 /user/stephen/output/part-r-00002
-rw-r--r--   1 root supergroup         73 2016-08-26 13:48 /user/stephen/output/part-r-00003


3 个文件的内容合并起来就是之前的 part-r-00000 的内容。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  自定义类型 Hadoop
相关文章推荐