您的位置:首页 > 运维架构

Hadoop经典案例Spark实现(一)——通过采集的气象数据分析每年的最高温度

2016-06-29 17:18 751 查看
1、原始数据分析
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999

数据说明: 
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据

第50位值只能是0、1、4、5、9几个数字

2、首先MapReduce实现

1) map 任务

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NewMaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value,

Context context) throws IOException, InterruptedException {

String line = value.toString();

System.out.println("key: " + key);

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(45) == '+') {

airTemperature = Integer.parseInt(line.substring(46, 50));

} else {

airTemperature = Integer.parseInt(line.substring(45, 50));

}

String quality = line.substring(50, 51);

System.out.println("quality: " + quality);

if (airTemperature != MISSING && quality.matches("[01459]")) {

context.write(new Text(year), new IntWritable(airTemperature));

}
}
}

2)reduce任务
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NewMaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;

for(IntWritable value: values){

maxValue = Math.max(maxValue, value.get());

}

context.write(key, new IntWritable(maxValue));
}

}


(3)job提交
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewMaxTemperature {

public static void main(String[] args)throws Exception {
if (args.length != 2) {

System.err.print("Usage: MaxTemperature<input path> <output path>");

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(NewMaxTemperature.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(NewMaxTemperatureMapper.class);

job.setReducerClass(NewMaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


3、Spark代码实现Scala版本
val one = sc.textFile("/tmp/hadoop/one")

val yearAndTemp = one.filter(line => {
val quality = line.substring(50, 51);
var airTemperature = 0
if(line.charAt(45)=='+'){
airTemperature = line.substring(46, 50).toInt
}else{
airTemperature = line.substring(45, 50).toInt
}
airTemperature != 9999 && quality.matches("[01459]")}).map{
line =>{
val year = line.substring(15,19)
var airTemperature = 0

if(line.charAt(45)=='+'){
airTemperature = line.substring(46, 50).toInt
}else{
airTemperature = line.substring(45, 50).toInt
}
(year,airTemperature)
}
}

val res = yearAndTemp.reduceByKey(
(x,y)=> if(x>y) x else y
)

res.collect.foreach(x=>println("year : " + x._1+", max : "+x._2))


上面为了过滤非法的数据,在map前先做了filter过滤。

mapreduce与spark执行的任务结果是一样的
year : 1949, max : 111

year : 1950, max : 78
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: