MapReduce编程学习(1)--简要分析并附源代码
2016-02-20 18:45
288 查看
在完成了完全分布式Hadoop平台的搭建之后,一直在忙于C语言与计算机应用基础两门课程的教学与考试,好容易放了寒假,前几天在忙于LAMP与Oracle两门课程的教学备课。备课,永远是一名教师的无奈。顿时发现除了长了几斤肉肉之外,学习进展是一无是处。呵呵,也是醉了。抽了三个多小时把MapReduce编程的理念学习了一下,感觉相对来说还算好学,因为模板固定,以下记录几个实例供参考吧。在此申请:部分案例与代码来源于《炼数成金》课堂资源(没有打广告的嫌疑哦。。)
实例1:经典WordCount程序:
package wordCount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class wordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(wordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
再在MyEclipse的Run Configuration中配置好输入与输出参数如下:
之后运行即可。
实例2:主被叫号码统计
将如下左图文件中的数据整理成下右图的数据:
算法思想:
实例3:合并运营商的位置数据与上网数据,以划分时段
主要有两代Java代码:
实例1:经典WordCount程序:
package wordCount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class wordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(wordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
再在MyEclipse的Run Configuration中配置好输入与输出参数如下:
之后运行即可。
实例2:主被叫号码统计
将如下左图文件中的数据整理成下右图的数据:
算法思想:
package test2; import java.io.IOException; //import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import org.apache.hadoop.util.ToolRunner; //import org.apache.hadoop.util.GenericOptionsParser; //import com.sun.jersey.core.impl.provider.entity.XMLRootElementProvider.Text; public class test_2 extends Configured implements Tool { enum Counter { LINESKIP, //出错的行 } public static class Map extends Mapper<LongWritable,Text,Text,Text> { public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String line = value.toString(); try { String [] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); } catch(ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); return; } } } public static class Reduce extends Reducer<Text,Text,Text,Text> { public void reduce(Text key,Iterable<Text> values,Context context) { String valueString; String out=""; for(Text value:values) { valueString = value.toString(); out+=valueString+"|"; } try { context.write(key, new Text(out)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public int run(String[] args)throws Exception { Configuration conf=getConf(); Job job = new Job(conf,"test_2"); job.setJarByClass(test_2.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); System.out.println("任务名称:"+job.getJobName()); System.out.println("任务成功:"+(job.isSuccessful()?"Yes":"No")); System.out.println("跳过行数:"+job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful()?0:1; } public static void main(String[] args)throws Exception { int res=ToolRunner.run(new Configuration(), new test_2(),args); System.exit(res); } }顺便说一下MapReduce程序的导出:
实例3:合并运营商的位置数据与上网数据,以划分时段
主要有两代Java代码:
package cn.dataguru.hadoop; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.TreeMap; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //位置数据 //IMSI|IMEI|UPDATETYPE|CGI|TIME //上网数据 //IMSI|IMEI|CGI|TIME|CGI|URL /** * 汇总基站数据表 * 计算每个用户在不同的时间段不同的基站停留的时长 * 输入参数 < input path > < output path > < date > < timepoint > * 参数示例: “/base /output 2012-09-12 09-17-24" * 意味着以“/base”为输入,"/output"为输出,指定计算2012年09月12日的数据,并分为00-07,07-17,17-24三个时段 * 输出格式 “IMSI|CGI|TIMFLAG|STAY_TIME” */ public class BaseStationDataPreprocess extends Configured implements Tool { /** * 计数器 * 用于计数各种异常数据 */ enum Counter { TIMESKIP, //时间格式有误 OUTOFTIMESKIP, //时间不在参数指定的时间段内 LINESKIP, //源文件行有误 USERSKIP //某个用户某个时间段被整个放弃 } /** * 读取一行数据 * 以“IMSI+时间段”作为 KEY 发射出去 */ public static class Map extends Mapper<LongWritable, Text, Text, Text> { String date; String [] timepoint; boolean dataSource; /** * 初始化 */ public void setup ( Context context ) throws IOException { this.date = context.getConfiguration().get("date"); //读取日期 this.timepoint = context.getConfiguration().get("timepoint").split("-"); //读取时间分割点 //提取文件名 FileSplit fs = (FileSplit)context.getInputSplit(); String fileName = fs.getPath().getName(); if( fileName.startsWith("POS") ) dataSource = true; else if ( fileName.startsWith("NET") ) dataSource = false; else throw new IOException("File Name should starts with POS or NET"); } /** * MAP任务 * 读取基站数据 * 找出数据所对应时间段 * 以IMSI和时间段作为 KEY * CGI和时间作为 VALUE */ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); TableLine tableLine = new TableLine(); //读取行 try { tableLine.set(line, this.dataSource, this.date, this.timepoint ); } catch ( LineException e ) { if(e.getFlag()==-1) context.getCounter(Counter.OUTOFTIMESKIP).increment(1); else context.getCounter(Counter.TIMESKIP).increment(1); return; } catch (Exception e) { context.getCounter(Counter.LINESKIP).increment(1); return; } context.write( tableLine.outKey(), tableLine.outValue() ); } } /** * 统计同一个IMSI在同一时间段 * 在不同CGI停留的时长 */ public static class Reduce extends Reducer<Text, Text, NullWritable, Text> { private String date; private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 初始化 */ public void setup ( Context context ) { this.date = context.getConfiguration().get("date"); //读取日期 } public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { String imsi = key.toString().split("\\|")[0]; String timeFlag = key.toString().split("\\|")[1]; //用一个TreeMap记录时间 TreeMap<Long, String> uploads = new TreeMap<Long, String>(); String valueString; for ( Text value : values ) { valueString = value.toString(); try { uploads.put( Long.valueOf( valueString.split("\\|")[1] ), valueString.split("\\|")[0] ); } catch ( NumberFormatException e ) { context.getCounter(Counter.TIMESKIP).increment(1); continue; } } try { //在最后添加“OFF”位置 Date tmp = this.formatter.parse( this.date + " " + timeFlag.split("-")[1] + ":00:00" ); uploads.put ( ( tmp.getTime() / 1000L ), "OFF"); //汇总数据 HashMap<String, Float> locs = getStayTime(uploads); //输出 for( Entry<String, Float> entry : locs.entrySet() ) { StringBuilder builder = new StringBuilder(); builder.append(imsi).append("|"); builder.append(entry.getKey()).append("|"); builder.append(timeFlag).append("|"); builder.append(entry.getValue()); context.write( NullWritable.get(), new Text(builder.toString()) ); } } catch ( Exception e ) { context.getCounter(Counter.USERSKIP).increment(1); return; } } /** * 获得位置停留信息 */ private HashMap<String, Float> getStayTime(TreeMap<Long, String> uploads) { Entry<Long, String> upload, nextUpload; HashMap<String, Float> locs = new HashMap<String, Float>(); //初始化 Iterator<Entry<Long, String>> it = uploads.entrySet().iterator(); upload = it.next(); //计算 while( it.hasNext() ) { nextUpload = it.next(); float diff = (float) (nextUpload.getKey()-upload.getKey()) / 60.0f; if( diff <= 60.0 ) //时间间隔过大则代表关机 { if( locs.containsKey( upload.getValue() ) ) locs.put( upload.getValue(), locs.get(upload.getValue())+diff ); else locs.put( upload.getValue(), diff ); } upload = nextUpload; } return locs; } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("date", args[2]); conf.set("timepoint", args[3]); Job job = new Job(conf, "BaseStationDataPreprocess"); job.setJarByClass(BaseStationDataPreprocess.class); FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( Text.class ); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { if ( args.length != 4 ) { System.err.println(""); System.err.println("Usage: BaseStationDataPreprocess < input path > < output path > < date > < timepoint >"); System.err.println("Example: BaseStationDataPreprocess /user/james/Base /user/james/Output 2012-09-12 07-09-17-24"); System.err.println("Warning: Timepoints should be begined with a 0+ two digit number and the last timepoint should be 24"); System.err.println("Counter:"); System.err.println("\t"+"TIMESKIP"+"\t"+"Lines which contain wrong date format"); System.err.println("\t"+"OUTOFTIMESKIP"+"\t"+"Lines which contain times that out of range"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are invalid"); System.err.println("\t"+"USERSKIP"+"\t"+"Users in some time are invalid"); System.exit(-1); } //运行任务 int res = ToolRunner.run(new Configuration(), new BaseStationDataPreprocess(), args); System.exit(res); } }
package cn.dataguru.hadoop; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.Text; /** * 定义异常类 */ class LineException extends Exception { private static final long serialVersionUID = 8245008693589452584L; int flag; public LineException(String msg, int flag) { super(msg); this.flag = flag; } public int getFlag() { return flag; } } /** * 读取一行数据 * 提取所要字段 */ public class TableLine { private String imsi, position, time, timeFlag; private Date day; private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 初始化并检查该行的合法性 */ public void set ( String line, boolean source, String date, String [] timepoint ) throws LineException { String [] lineSplit = line.split("\t"); if( source ) { this.imsi = lineSplit[0]; this.position = lineSplit[3]; this.time = lineSplit[4]; } else { this.imsi = lineSplit[0]; this.position = lineSplit[2]; this.time = lineSplit[3]; } //检查日期合法性 if ( ! this.time.startsWith(date) ) //年月日必须与date一致 throw new LineException("", -1); try { this.day = this.formatter.parse(this.time); } catch ( ParseException e ) { throw new LineException("", 0); } //计算所属时间段 int i = 0, n = timepoint.length; int hour = Integer.valueOf( this.time.split(" ")[1].split(":")[0] ); while ( i < n && Integer.valueOf( timepoint[i] ) <= hour ) i++; if ( i < n ) { if ( i == 0 ) this.timeFlag = ( "00-" + timepoint[i] ); else this.timeFlag = ( timepoint[i-1] + "-" + timepoint[i] ); } else //Hour大于最大的时间点 throw new LineException("", -1); } /** * 输出KEY */ public Text outKey() { return new Text ( this.imsi + "|" + this.timeFlag ); } /** * 输出VALUE */ public Text outValue() { long t = ( day.getTime() / 1000L ); //用时间的偏移量作为输出时间 return new Text ( this.position + "|" + String.valueOf(t) ); } }
相关文章推荐
- Golang(Go语言)的三大设计目标
- C/C++程序员应聘常见面试题剖析
- Spring AOP
- JAVA:Random的种子含义
- PHP 生成HTML文件
- mybatis0211 mybatis和spring整合
- 多线程学习
- ubuntu安装java eclipse
- 2014 I/O归来:Google连接一切
- java(16)--利用session存储购买商品
- ThinkPHP学习笔记
- spring-aop学习笔记
- java中transient关键字
- STM32的bootloader IAP编程(转载总结)
- Git从零教你入门(4):Git服务之 gogs部署安装
- Git从零教你入门(4):Git服务之 gogs部署安装
- Go for Visual Studio Code
- JAVA多线程-线程间通信(三)-通过管道进行线程间通信
- How to improve Java's I/O performance( 提升 java i/o 性能)
- C#继承/this/base/new