hadoop编程:分析CSDN注册邮箱分布情况
2014-07-08 15:48
274 查看
hadoop编程:分析CSDN注册邮箱分布情况
本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明.
环境:主机:Ubuntu10.04hadoop版本:1.2.1
开发工具:eclipse4.4.0
说明:要求:原始数据共6428632条,分析不同邮箱的注册情况,并按使用人数从大到小排序。分析:hadoop自带一个排序,是按key值来进行排序的。要按值(value)进行排序,需要二次排序。
步骤:
1.job1:统计不同注册邮箱的使用人数,用默认的key值排序,保存在HDFS系统中
2.job2:对job1的输出进行二次排序,按值从大到小排序
结果输出:
使用人数在1W以上的邮箱共有24个:
qq.com 1976196
163.com 1766927
126.com 807895
sina.com 351596
yahoo.com.cn 205491
hotmail.com 202948
gmail.com 186843
sohu.com 104736
yahoo.cn 87048
tom.com 72365
yeah.net 53295
21cn.com 50710
vip.qq.com 35119
139.com 29207
263.net 24779
sina.com.cn 19156
live.cn 18920
sina.cn 18601
yahoo.com 18454
foxmail.com 16432
163.net 15176
msn.com 14211
eyou.com 13372
yahoo.com.tw 10810
源代码:
JOB1:统计不同注册邮箱的人数CsdnData.java
MapData.java
ReducerData.java
JOB2:对job1的输出进行二次排序,按值从大到小排序SortSecond.java
MapSecond.java
ReduceSecond.java
KeyMy.java
SortMy.java
1.《hadoop权威指南》2. http://zengzhaozheng.blog.51cto.com/8219051/1379271
本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明.
环境:主机:Ubuntu10.04hadoop版本:1.2.1
开发工具:eclipse4.4.0
说明:要求:原始数据共6428632条,分析不同邮箱的注册情况,并按使用人数从大到小排序。分析:hadoop自带一个排序,是按key值来进行排序的。要按值(value)进行排序,需要二次排序。
步骤:
1.job1:统计不同注册邮箱的使用人数,用默认的key值排序,保存在HDFS系统中
2.job2:对job1的输出进行二次排序,按值从大到小排序
结果输出:
使用人数在1W以上的邮箱共有24个:
qq.com 1976196
163.com 1766927
126.com 807895
sina.com 351596
yahoo.com.cn 205491
hotmail.com 202948
gmail.com 186843
sohu.com 104736
yahoo.cn 87048
tom.com 72365
yeah.net 53295
21cn.com 50710
vip.qq.com 35119
139.com 29207
263.net 24779
sina.com.cn 19156
live.cn 18920
sina.cn 18601
yahoo.com 18454
foxmail.com 16432
163.net 15176
msn.com 14211
eyou.com 13372
yahoo.com.tw 10810
源代码:
JOB1:统计不同注册邮箱的人数CsdnData.java
package com.bazhangkeji.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class CsdnData { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "csdndata"); job.setJarByClass(CsdnData.class); job.setMapperClass(MapData.class); job.setReducerClass(ReducerData.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapData.java
package com.bazhangkeji.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MapData extends Mapper<Object, Text, Text, IntWritable> { IntWritable one = new IntWritable(1); Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringBuffer str_in = new StringBuffer(); StringBuffer str_out = new StringBuffer(); int index = 0; //初始化字符串 str_in.setLength(0); str_out.setLength(0); str_in.append(value.toString()); //获得邮箱的起始位置 index = str_in.toString().lastIndexOf('@'); if (index != -1) { word.set(str_in.toString().substring(index + 1).trim().toLowerCase()); context.write(word, one); } } }
ReducerData.java
package com.bazhangkeji.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class ReducerData extends Reducer<Text,IntWritable,Text,IntWritable> { IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
JOB2:对job1的输出进行二次排序,按值从大到小排序SortSecond.java
package com.bazhangkeji.hadoop2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortSecond { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: csdndata <in> <out>"); System.exit(2); } Job job = new Job(conf, "sortsecond"); job.setJarByClass(SortSecond.class); job.setMapperClass(MapSecond.class); job.setReducerClass(ReduceSecond.class); job.setSortComparatorClass(SortMy.class); //设置自定义二次排序策略 job.setOutputKeyClass(KeyMy.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapSecond.java
package com.bazhangkeji.hadoop2; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MapSecond extends Mapper<LongWritable, Text, KeyMy, IntWritable> { IntWritable one = new IntWritable(1); Text word = new Text(); KeyMy keymy = new KeyMy(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String str_in = value.toString(); int index = 0; index = str_in.indexOf('\t'); if (value.toString().length() > 3 && index != -1) { String str1 = str_in.substring(0, index); String str2 = str_in.substring(index + 1); if (str1.length() != 0 && str2.length() != 0) { one.set(Integer.parseInt(str2)); word.set(str1); keymy.setFirstKey(word); keymy.setSecondKey(one); context.write(keymy, one); } } } }
ReduceSecond.java
package com.bazhangkeji.hadoop2; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class ReduceSecond extends Reducer<KeyMy,IntWritable,Text,IntWritable> { IntWritable result = new IntWritable(); public void reduce(KeyMy key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(key.getFirstKey(), key.getSecondKey()); } }
KeyMy.java
package com.bazhangkeji.hadoop2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义组合键 */ public class KeyMy implements WritableComparable<KeyMy>{ private static final Logger logger = LoggerFactory.getLogger(KeyMy.class); private Text firstKey; private IntWritable secondKey; public KeyMy() { this.firstKey = new Text(); this.secondKey = new IntWritable(); } public Text getFirstKey() { return this.firstKey; } public void setFirstKey(Text firstKey) { this.firstKey = firstKey; } public IntWritable getSecondKey() { return this.secondKey; } public void setSecondKey(IntWritable secondKey) { this.secondKey = secondKey; } @Override public void readFields(DataInput dateInput) throws IOException { // TODO Auto-generated method stub this.firstKey.readFields(dateInput); this.secondKey.readFields(dateInput); } @Override public void write(DataOutput outPut) throws IOException { this.firstKey.write(outPut); this.secondKey.write(outPut); } /** * 自定义比较策略 * 注意:该比较策略用于 mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段, * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整) */ @Override public int compareTo(KeyMy KeyMy) { logger.info("-------KeyMy flag-------"); return this.firstKey.compareTo(KeyMy.getFirstKey()); } }
SortMy.java
package com.bazhangkeji.hadoop2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 自定义二次排序策略 */ public class SortMy extends WritableComparator { private static final Logger logger = LoggerFactory.getLogger(SortMy.class); public SortMy() { super(KeyMy.class,true); } @Override public int compare(WritableComparable KeyMyOne, WritableComparable KeyMyOther) { logger.info("---------enter SortMy flag---------"); KeyMy c1 = (KeyMy) KeyMyOne; KeyMy c2 = (KeyMy) KeyMyOther; return c2.getSecondKey().get()-c1.getSecondKey().get();//0,负数,正数 } }参考资料:
1.《hadoop权威指南》2. http://zengzhaozheng.blog.51cto.com/8219051/1379271
相关文章推荐
- hadoop编程:分析CSDN注册邮箱分布情况
- CSDN 百万注册用户使用邮箱服务器的情况
- CSDN 百万注册用户使用邮箱服务器的情况
- hadoop 编程规范(hadoop专利分析)
- JVM运行时数据分析(内存中堆、栈的分布情况)
- mkdos分区后 空间大小分布情况分析
- 一种分析代金券使用分布情况的方法python实现版(上)
- Hadoop应用程序性能调优案例分析——CSDN TUP第四期精彩摘要
- Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
- 12月最新Windows Live邮箱注册漏洞及分析
- Discuz注册页面的邮箱验证代码分析
- pyhton爬虫(10)——通过亚马逊商品评论时间分析商品销量分布情况
- Hadoop源码分析:Hadoop编程思想
- Spark生态之Alluxio学习14--alluxio内存文件加载方式和分布情况分析
- Hadoop源码分析之数据节点的握手,注册,上报数据块和心跳
- Tomcat内存溢出的三种情况及解决办法分析【引自CSDN】
- hadoop伪分布模式下的编程步骤
- Hadoop源码分析之数据节点的握手,注册,上报数据块和心跳
- 用 Hadoop 进行分布式并行编程, 第 2 部分 程序实例与分析