您的位置:首页 > 编程语言

hadoop mapreduce 计算平均气温的代码,绝对原创

2015-05-23 16:25 134 查看
[code]1901    46
1902    21
1903    48
1904    33
1905    43
1906    47
1907    31
1908    28
1909    26
1910    35
1911    30
1912    16
1913    29
1914    29
1915    5
1916    21
1917    22
1918    31
1919    27
1920    43
1921    34
1922    27
1923    26

以上为结果

package com.teset;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgTemprature extends Configured implements Tool {

 public static class AvgMapper extends
   Mapper<LongWritable, Text, Text, IntWritable> {

  @Override
  protected void map(LongWritable key, Text value,
    Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
   // map处理数据
   String str = value.toString();
   String year = null;
   int temprature = 0;
   StringTokenizer tokenstr = new StringTokenizer(str);
   int i = 0;
   while (tokenstr.hasMoreTokens()) {
    String tempstr = tokenstr.nextToken();
    i++;
    if (i == 1) {
     year = tempstr;
     continue;
    } else if (i == 5 && Integer.parseInt(tempstr) != -9999) {
     temprature = Integer.parseInt(tempstr);
     context.write(new Text(year), new IntWritable(temprature));
     break;
    }

   }

  }

 }

 public static class AvgReducer extends
   Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,
    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {
   int sum=0;
   int count=0;
   for (IntWritable value : values) {
    sum=sum+value.get();
    count++;
   }
   if (count!=0){
   context.write(key, new IntWritable(sum/count));
   }
  }

 }

 public static void main(String[] args) throws Exception {
  int res = ToolRunner.run(new Configuration(), new AvgTemprature(), args);
  System.exit(res);
 }

 @Override
 public int run(String[] arg0) throws Exception {
  Configuration conf = getConf();
  Job job = new Job(conf, "AvgTem");// 任务名
  job.setJarByClass(AvgTemprature.class);// 指定class
  // 输入和输出流
  job.setMapperClass(AvgMapper.class);// map
  job.setReducerClass(AvgReducer.class);
  FileInputFormat.addInputPath(job, new Path(arg0[0]));
  FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
  job.setCombinerClass(AvgReducer.class);
  job.setOutputValueClass(IntWritable.class);
  job.setOutputKeyClass(Text.class);
  job.waitForCompletion(true);
  return job.isSuccessful() ? 0 : 1;

 }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: