您的位置:首页 > 运维架构

从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序

2013-07-30 10:19 645 查看
编写一个气象数据挖掘的MapReduce程序

1. 气象数据在哪里?

    NCDC  美国国家气候数据中心

    获取数据的方式在www.hadoopbook.com里给出了,是这里http://hadoopbook.com/code.html

    两个示例的数据在这里下载https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all  ,文件名分别是1901,1902

    原始记录是许多小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。

2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。

3. 用Hadoop分析数据

    3.1 分析一条记录:

    0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999

    记录的含义:

    0029029070

    99999

    19010101  观察日期

    0600  观察时间

    4+64333  经度

    +023450 纬度

    FM-12

    +0005

    99999

    V020

    270

    1

    N

    0159

    1

    99999

    9

    9

    N

    000000

    1

    N

    9-0078

   1

    +99999

    102001ADDGF108991999999999999999999

    3.2
源代码文件MaxTemperatureMapper.java

package com.ifis;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
String line = value.toString();
String year = line.substring(15,19);
int airTemperature;
if(line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88, 92));
}else{
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")){
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}


    3.3 源代码MaxTemperatureReducer.java

package com.ifis;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperatureReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{
int maxValue = Integer.MIN_VALUE;
while(values.hasNext()){
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}


    3.4 源代码MaxTemperature.java

package com.ifis;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class MaxTemperature{
public static void main(String[] args) throws IOException{
if (args.length != 2){
System.err.println("Usage: MaxTemperature <intput path> <output path>");
System.exit(-1);
}

JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max Temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}


    3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ ./src/*.java

    3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
    3.7 用put命令将1901文件放到hdfs。
    3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 1901 o4

    3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000

4. 一个新版api的MapReducer。
    4.1 说明
    这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
    从这里找到类名,然后点击进入,可以看到包的信息。
    如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
    注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。

    4.2 源代码
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;

public class NewMaxTemperature{
static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88, 92));
}else{
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")){
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable v : values){
maxValue = Math.max(maxValue, v.get());
}
context.write(key, new IntWritable(maxValue));
}
}

public static void main(String[] args) throws Exception{
if (args.length != 2){
System.err.println("Usage NewMaxTemerapture <input path> <output path>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}


    4.3 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
    4.4 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
    4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat

    4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
    4.7 查看结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000

5. 将4的例子改造成mapper,reducer分开写的形式。
    假设项目的目录是p3,此目录下的目录结构和文件结构如下:
    |-- classes

    `-- src

        |-- MaxTemperature.java

        |-- MaxTemperatureMapper.java

        `-- MaxTemperatureReducer.java

    5.1 源代码MaxTemperatureMapper.java
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+'){
airTemperature = Integer.parseInt(line.substring(88, 92));
}else{
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")){
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}


    5.2 源代码MaxTemperatureReducer.java
package com.ifis;

import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable v : values){
maxValue = Math.max(maxValue, v.get());
}
context.write(key, new IntWritable(maxValue));
}
}


    5.3 源代码MaxTemperature.java
package com.ifis;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class MaxTemperature{
public static void main(String[] args) throws Exception{
if (args.length != 2){
System.err.println("Usage NewMaxTemerapture <input path> <output path>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?0:1);
}
}


    5.4 编译:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
    5.5 打包:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ . 
    5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。

    5.7 执行:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 190*.dat o3

    5.8 查看执行结果:brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000 

6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。

7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。

8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: