Hadoop经典案例Spark实现(一)——通过采集的气象数据分析每年的最高温度
2016-06-29 17:18
751 查看
1、原始数据分析
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
数据说明:
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据
第50位值只能是0、1、4、5、9几个数字
2、首先MapReduce实现
1) map 任务
2)reduce任务
(3)job提交
3、Spark代码实现Scala版本
上面为了过滤非法的数据,在map前先做了filter过滤。
mapreduce与spark执行的任务结果是一样的
year : 1949, max : 111
year : 1950, max : 78
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
数据说明:
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据
第50位值只能是0、1、4、5、9几个数字
2、首先MapReduce实现
1) map 任务
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println("key: " + key); String year = line.substring(15, 19); int airTemperature; if (line.charAt(45) == '+') { airTemperature = Integer.parseInt(line.substring(46, 50)); } else { airTemperature = Integer.parseInt(line.substring(45, 50)); } String quality = line.substring(50, 51); System.out.println("quality: " + quality); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
2)reduce任务
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class NewMaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for(IntWritable value: values){ maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
(3)job提交
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NewMaxTemperature { public static void main(String[] args)throws Exception { if (args.length != 2) { System.err.print("Usage: MaxTemperature<input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、Spark代码实现Scala版本
val one = sc.textFile("/tmp/hadoop/one") val yearAndTemp = one.filter(line => { val quality = line.substring(50, 51); var airTemperature = 0 if(line.charAt(45)=='+'){ airTemperature = line.substring(46, 50).toInt }else{ airTemperature = line.substring(45, 50).toInt } airTemperature != 9999 && quality.matches("[01459]")}).map{ line =>{ val year = line.substring(15,19) var airTemperature = 0 if(line.charAt(45)=='+'){ airTemperature = line.substring(46, 50).toInt }else{ airTemperature = line.substring(45, 50).toInt } (year,airTemperature) } } val res = yearAndTemp.reduceByKey( (x,y)=> if(x>y) x else y ) res.collect.foreach(x=>println("year : " + x._1+", max : "+x._2))
上面为了过滤非法的数据,在map前先做了filter过滤。
mapreduce与spark执行的任务结果是一样的
year : 1949, max : 111
year : 1950, max : 78
相关文章推荐
- 每天一个linux命令(41):route命令
- 监控打印机
- Hadoop中HDFS的存储机制
- springmvc整合mybatis错误:java.lang.AbstractMethodError,sqlSessionFactoryBeanName造成tomcat启动错误
- OpenCV 3.0中IplImage* 转cv::Mat
- PXE实现批量安装部署(win与linux系统)
- linux socket 缓冲区默认大小
- centos7上通过手工方式和shell脚本方式安装jdk
- 搭建samba 实现虚拟机与本机文件共享
- as4 通过yum自动升级实现
- 各种解压缩 linux .tar .gz .bz2 .bz .zip
- 19.Shell文件包含
- linux设备树的解释 - DTC编译器
- Linux下如何执行Shell脚本
- 嵌入式 如何定位死循环或高CPU使用率(linux)
- 每天一个linux命令(40):ifconfig命令
- Linux 下查看线程信息
- linux命令行编辑快捷键
- Shell good example
- shell-百度百科