从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序
2013-07-30 10:19
645 查看
编写一个气象数据挖掘的MapReduce程序
1. 气象数据在哪里?
NCDC 美国国家气候数据中心
获取数据的方式在www.hadoopbook.com里给出了,是这里http://hadoopbook.com/code.html
两个示例的数据在这里下载https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all ,文件名分别是1901,1902
原始记录是许多小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。
2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。
3. 用Hadoop分析数据
3.1 分析一条记录:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
记录的含义:
0029029070
99999
19010101 观察日期
0600 观察时间
4+64333 经度
+023450 纬度
FM-12
+0005
99999
V020
270
1
N
0159
1
99999
9
9
N
000000
1
N
9-0078
1
+99999
102001ADDGF108991999999999999999999
3.2
源代码文件MaxTemperatureMapper.java
3.3 源代码MaxTemperatureReducer.java
3.4 源代码MaxTemperature.java
3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
3.7 用put命令将1901文件放到hdfs。
3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 1901 o4
3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000
4. 一个新版api的MapReducer。
4.1 说明
这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
从这里找到类名,然后点击进入,可以看到包的信息。
如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。
4.2 源代码
4.3 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
4.4 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
4.7 查看结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000
5. 将4的例子改造成mapper,reducer分开写的形式。
假设项目的目录是p3,此目录下的目录结构和文件结构如下:
|-- classes
`-- src
|-- MaxTemperature.java
|-- MaxTemperatureMapper.java
`-- MaxTemperatureReducer.java
5.1 源代码MaxTemperatureMapper.java
5.2 源代码MaxTemperatureReducer.java
5.3 源代码MaxTemperature.java
5.4 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
5.5 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ .
5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
5.7 执行:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 190*.dat o3
5.8 查看执行结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000
6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。
7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。
8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。
1. 气象数据在哪里?
NCDC 美国国家气候数据中心
获取数据的方式在www.hadoopbook.com里给出了,是这里http://hadoopbook.com/code.html
两个示例的数据在这里下载https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all ,文件名分别是1901,1902
原始记录是许多小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。
2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。
3. 用Hadoop分析数据
3.1 分析一条记录:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
记录的含义:
0029029070
99999
19010101 观察日期
0600 观察时间
4+64333 经度
+023450 纬度
FM-12
+0005
99999
V020
270
1
N
0159
1
99999
9
9
N
000000
1
N
9-0078
1
+99999
102001ADDGF108991999999999999999999
3.2
源代码文件MaxTemperatureMapper.java
package com.ifis; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{ String line = value.toString(); String year = line.substring(15,19); int airTemperature; if(line.charAt(87) == '+'){ airTemperature = Integer.parseInt(line.substring(88, 92)); }else{ airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if(airTemperature != MISSING && quality.matches("[01459]")){ output.collect(new Text(year), new IntWritable(airTemperature)); } } }
3.3 源代码MaxTemperatureReducer.java
package com.ifis; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{ int maxValue = Integer.MIN_VALUE; while(values.hasNext()){ maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
3.4 源代码MaxTemperature.java
package com.ifis; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; public class MaxTemperature{ public static void main(String[] args) throws IOException{ if (args.length != 2){ System.err.println("Usage: MaxTemperature <intput path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max Temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } }
3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
3.7 用put命令将1901文件放到hdfs。
3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 1901 o4
3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000
4. 一个新版api的MapReducer。
4.1 说明
这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
从这里找到类名,然后点击进入,可以看到包的信息。
如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。
4.2 源代码
package com.ifis; import java.io.IOException; import java.lang.InterruptedException; import java.lang.Iterable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; public class NewMaxTemperature{ static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+'){ airTemperature = Integer.parseInt(line.substring(88, 92)); }else{ airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")){ context.write(new Text(year), new IntWritable(airTemperature)); } } } static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int maxValue = Integer.MIN_VALUE; for(IntWritable v : values){ maxValue = Math.max(maxValue, v.get()); } context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception{ if (args.length != 2){ System.err.println("Usage NewMaxTemerapture <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true)?0:1); } }
4.3 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
4.4 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
4.7 查看结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000
5. 将4的例子改造成mapper,reducer分开写的形式。
假设项目的目录是p3,此目录下的目录结构和文件结构如下:
|-- classes
`-- src
|-- MaxTemperature.java
|-- MaxTemperatureMapper.java
`-- MaxTemperatureReducer.java
5.1 源代码MaxTemperatureMapper.java
package com.ifis; import java.io.IOException; import java.lang.InterruptedException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+'){ airTemperature = Integer.parseInt(line.substring(88, 92)); }else{ airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")){ context.write(new Text(year), new IntWritable(airTemperature)); } } }
5.2 源代码MaxTemperatureReducer.java
package com.ifis; import java.io.IOException; import java.lang.InterruptedException; import java.lang.Iterable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int maxValue = Integer.MIN_VALUE; for(IntWritable v : values){ maxValue = Math.max(maxValue, v.get()); } context.write(key, new IntWritable(maxValue)); } }
5.3 源代码MaxTemperature.java
package com.ifis; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; public class MaxTemperature{ public static void main(String[] args) throws Exception{ if (args.length != 2){ System.err.println("Usage NewMaxTemerapture <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true)?0:1); } }
5.4 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
5.5 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ .
5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
5.7 执行:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar com.ifis.MaxTemperature 190*.dat o3
5.8 查看执行结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000
6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。
7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。
8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。
相关文章推荐
- 从零开始最短路径学习Hadoop之05----MapReduce应用开发
- 从零开始学习Hadoop--第2章 第一个MapReduce程序
- 从零开始学习Hadoop--第2章 第一个MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- Hadoop 中文编码相关问题 -- mapreduce程序处理GBK编码数据并输出GBK编码数据
- Hadoop学习笔记2--第一个Mapreduce程序
- 从零开始最短路径学习Hadoop之03----HDFS分布式文件系统
- [转载]Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 从零开始最短路径学习Hadoop之01----Hadoop的安装配置测试
- Hadoop-2.3.0学习(4)——第一个简单的mapreduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序(3)
- Hadoop 中文编码相关问题 -- mapreduce程序处理GBK编码数据并输出GBK编码数据
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 从零开始最短路径学习Hadoop之04----Hadoop的I/O
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 一步两步,学习大数据(四)——IDEA 搭建hadoop mapreduce程序
- 从零开始最短路径学习Hadoop之06----构建Hadoop集群
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序(转)