User Defined Hadoop DataType
2016-08-25 16:01
337 查看
User Defined Hadoop DataType
目录
User Defined Hadoop DataType目录
需求
实现
运行
需求
有时候 Hadoop 内置的数据类型不能满足我们的要求,这个时候就需要自定义类型了。假设输入文件是很多电话号码,每行一个:
13612345678 13051812535 13051812535 13912345677 13412345678
要求按照如下格式输出
13412345678 is subscribed from ***China Mobile***, appearing 1 times
其中的 China Mobile 和 1,都是算出来的。
实现
需要一个电话号码类 TelNo,需要实现 WritableComparable 接口。// TelNo.java package com.stephen.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TelNo implements WritableComparable<TelNo>{ private String no; private String operator; private Integer times; private transient final int BEGINPOS = 0; private transient final int ENDPOS = 3; public TelNo() {} @Override public void write(DataOutput out) throws IOException { out.writeUTF(no); out.writeUTF(operator); out.writeInt(times); } @Override public void readFields(DataInput in) throws IOException { no = in.readUTF(); operator = in.readUTF(); times = in.readInt(); } @Override public int compareTo(TelNo o) { return this.no.compareTo(o.getNo()); } public boolean equals(Object o) { if( !(o instanceof TelNo)) { return false; } TelNo other = (TelNo) o; return this.no.compareTo(other.getNo()) == 0; } public int hashCode() { return no.hashCode(); } public Integer getTimes() { return times; } public void setTimes(Integer times) { this.times = times; } public void setNo(String no) { this.no = no; } public String getNo() { return no; } public String getOperator() { String header = no.substring(BEGINPOS, ENDPOS); if (header.compareTo("130") >= 0) { if (header.compareTo("135") <= 0) { operator = "***China Mobile***"; } else if (header.compareTo("137") <= 0) { operator = "***China Unicom***"; } else if (header.compareTo("139") <= 0) { operator = "***China Telecom***"; } else { operator = "***Invalid Operator***"; } } return operator; } @Override public String toString() { return "is subscribed from " + getOperator() + ", appearing " + times + " times"; } }
MapReduce 实现如下(Partitioner 类没有使用)
// TelNoCategorizerTool.java package com.stephen.hadoop; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; public class TelNoCategorizerTool extends Configured implements Tool { public static class TelNoMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text telno = new Text(); private final static LongWritable one = new LongWritable(1); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String newkey = value.toString(); telno.set(newkey); context.write(telno, one); } } public static class TelNoReducer extends Reducer<Text, LongWritable, Text, TelNo> { private TelNo telNo = new TelNo(); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (LongWritable val : values) { sum += val.get(); } telNo.setNo(key.toString()); telNo.setTimes(sum); context.write(key, telNo); } } public static class OperatorPartitioner<K, V> extends Partitioner<K, V> { private static final List<String> mobileNumList = new ArrayList<>(); private static final List<String> unicomNumList = new ArrayList<>(); private static final List<String> telecomNumList = new ArrayList<>(); static { mobileNumList.add("130"); mobileNumList.add("131"); mobileNumList.add("132"); mobileNumList.add("133"); mobileNumList.add("134"); mobileNumList.add("135"); unicomNumList.add("136"); unicomNumList.add("137"); telecomNumList.add("138"); telecomNumList.add("139"); } @Override public int getPartition(K key, V value, int numReduceTasks) { String telNoHead = key.toString().substring(0, 3); if (mobileNumList.contains(telNoHead)) { return 1; } else if (unicomNumList.contains(telNoHead)) { return 2; } else if (telecomNumList.contains(telNoHead)) { return 3; } else { return 0; } } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = Job.getInstance(conf, "Telno Categorizer"); job.setJarByClass(TelNoCategorizerTool.class); job.setMapperClass(TelNoMapper.class); job.setReducerClass(TelNoReducer.class); // 只对 Mapper 生效 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); /** * 这两个方法对 Mapper 和 Reducer 都生效 * 所以要在上面单独指定 Mapper 的Key 和 Value 的格式 * 没有 setReduceOutputKeyClass...方法 */ job.setOutputKeyClass(Text.class); job.setOutputValueClass(TelNo.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setPartitionerClass(OperatorPartitioner.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new TelNoCategorizerTool(), args); System.exit(exitCode); } }
运行
执行一下:hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool /user/stephen/input/ /user/stephen/output
查看结果:
hadoop fs -cat /user/stephen/output/part-r-00000 #output 13051812535 is subscribed from ***China Mobile***, appearing 2 times 13412345678 is subscribed from ***China Mobile***, appearing 1 times 13612345678 is subscribed from ***China Unicom***, appearing 1 times 13912345677 is subscribed from ***China Telecom***, appearing 1 times
如果想要分区:
hadoop jar TelNoCategorizerTool.jar com.stephen.hadoop.TelNoCategorizerTool -D mapreduce.job.reduces=4 /user/stephen/input/ /user/stephen/output
能看到 3 个文件(使用了 LazyOutputFormat,不会输出空记录),分别包含了分区后的记录。
hadoop fs -ls /user/stephen/output/ #output -rw-r--r-- 1 root supergroup 0 2016-08-26 13:48 /user/stephen/output/_SUCCESS -rw-r--r-- 1 root supergroup 144 2016-08-26 13:48 /user/stephen/output/part-r-00001 -rw-r--r-- 1 root supergroup 72 2016-08-26 13:48 /user/stephen/output/part-r-00002 -rw-r--r-- 1 root supergroup 73 2016-08-26 13:48 /user/stephen/output/part-r-00003
3 个文件的内容合并起来就是之前的 part-r-00000 的内容。
相关文章推荐
- Unsupported Oracle data type USERDEFINED encountered 的一种解决方案
- HOW TO: Change the Owner of a User-Defined Data Type That Is in Use in SQL Server 2000
- appfuse1.8.0安装app_user.enabled data type (1111, ‘bit’) not recognized and will be ignored错误解决办法
- How to export/import data with LOB type from one user/schema to another
- Do SQL Server User Defined Datatypes (UDT) affect performance?
- Linux mount Windows共享后编译出现“Value too large for defined data type”
- VMware共享文件夹编译出现“Value too large for defined data type”错误的解决办法
- 在mount windows 文件,编译时 cc1plus: error: hello.cpp: Value too large for defined data type
- No property sql found for type xxx.xxx.entity.UserdefinedGroup
- Linux mount Windows共享后编译出现“Value too large for defined data type”
- ls:Value too large for defined data type 解决办法
- EntityType 'UserInfo' has no key defined. Define the key for this EntityType.
- VHDL之User-defined data types
- Value too large for defined data type
- WJ的Direct3D简明教程3:Create Texture with User-defined Image Data
- mount目录访问出现“Value too large for defined data type”错误解决办法
- New ADODB.Connection ADOX.Catalog 提示user-defined type not defined
- delphi 开发的系统在win7下出现的 'Invalid data type for 'UserPreferencesMask'的解决
- C# 获取ORACLE SYS.XMLTYPE "遇到不支持的 Oracle 数据类型 USERDEFINED"
- ORA-22804: remote operations not permitted on object tables or user-defined type columns