您的位置:首页 > 编程语言 > Java开发

使用Hadoop分析数据——简单案例java语言编程之MaxTemperature

2016-02-04 16:18 861 查看
 为了充分发挥Hadoop提供的并行处理优势,我们需要将查询表示成MapReduce作业。经过一些本地的小规模测试,我们将能够在集群设备上运行Hadoop。

  Map阶段和reduce阶段

  

 MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择他们的类型。程序员还需要具体定义两个函数:map函数和reduce函数。

 

 map阶段的输入是原始的数据(从NCDC获取)。我们选择文本格式作为输入格式,以便将数据集的每一行作为一个文本值进行输入。键为该行起始位置相对于文件其实位置的偏移量,但是我们不需要这个信息,忽略。

 我们要的map函数很简答。由于我们只对年份和气温这两个属性感兴趣,所以只需要取出这两个属性数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,是reduce函数能在该准备数据上继续处理:即找出每年的最高气温

 为了全面了解map的工作方式,我们思考一下几行作为输入数据的示例数据:

 

0067011990999991950051507004…9999999N9+00001+99999…

0043011990999991950051512004…9999999N9+00221+99999…

0043011990999991950051518004…9999999N9-00111+999999…

0043012650999991949032412004…0500001N9+01111+99999…

0043012650999991949032418004…0500001N9+00781+99999…

这些行以键/值对的方式来表示map函数:

0067011990999991950051507004…9999999N9+00001+99999…

0043011990999991950051512004…9999999N9+00221+99999…

0043011990999991950051518004…9999999N9-00111+99999…

0043012650999991949032412004…0500001N9+01111+99999…

0043012650999991949032418004…0500001N9+00781+99999…

map函数的功能仅限于提取年份和气温信息(分别为红色和黑色),并将它们输出(气温已经用整数处理表示):

(1950,0)

(1950,22)

(1950,-11)

(1950,111)

(1950,78)

map函数的输出经过MapReduce框架处理后,最后被发送到reduce函数。这一处理过程中需要根据键对键/值进行排序和分组。因此,示例中,reduce函数会看到如下输入

(1949,[111,78])

(1950,[0,22,-11])

每一年份后紧跟着一系列气温数据。所有reduce函数现在需要做的是遍历整个列表并从中找出最大的读数。

(1949,111)

(1950,22)

这是最终的输出结果:每一年的全球最高气温纪录。

流程图工作原理如下所示:



Java  Mapreduce

明白MapReduce程序的 工作原理之后,下一步便是通过代码来是实现它,我们需要三样东西:一个map函数、一个reduce函数和一些用来运行作业的代码。map函数由Mapper借口实现来表示,后者声明了一个map()方法。下面是我们的map函数实现。

  查找最高气温的mapper

  

package com.paic.elis.test;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}


 该mapper接口是一个泛型类型,它由4个参数,分别制定map函数的输入键,输入值,输出键,输出值。就目前的示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。我们这里使用LongWritable类型(相当于java中的long类型)、Text类型(就相当于java中的string类型)和Intwritable类型(相当于java中的integer类型)。

 map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转化成java的string类型,之后使用substring()方法提取我们感兴趣的列。

 map()方法还提供了outputcollector示例用于输出内容的写入。在这种情况下,我们将年份数据按照Text对象进行读/写,(因为我们把年份当做键)将气温值封装咋iintweitable类型中。

 我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。

 

 查找最高气温的reducer

 

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}


同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce函数的输入类型必须与map函数的输出类型匹配:吉text类型和intwritable类型。在这种情况下,reduce函数的输出类型也必须是text和intwritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与一看到的最高气温获得的。

下面的应用程序在气象数据集中找出最高气温

package com.paic.elis.test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}

Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


job对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在hadoop集群上运行这个作业时,我们需要将代码打包成一个jar文件(hadoop会在集群上分发这个文件)。我们无需明确指定jar文件的名称,而只需要在job对象的setJarByClass方法中传递一个类,hadoop将通过该类查找包含有该类的jar文件今儿找到相关的jar文件。

  构造job对象之后,需要指定输入和输出数据的路径。调用fileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以使单个文件,目录(此时,将目录下所有文件当做输入)或符合特定文件模式的一组文件。由函数名克制,可以多次调用addinputpath()实现多路径的输入。

  通过调用fileouputFormat类中的静态函数setOutputPath()来指定输出路径。该函数指定了reduce函数输出文件的写入路径。在运行任务前,该目录不应该存在,否则hadoop会报错,并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外的覆盖式非常恼人的)。

  接着,通过setMapperclass()和setReducerClass()指定map和reduce类型。

  setOutputkeyClass()和setOutputvalueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则通过setmapoutputkeyclass()和setmapoutputvalueclass()函数来实现。

  输入的类型通过inputFormat类来控制,我们的例子中没有进行设置,因为使用的是默认的TextinputFormat(文本输入格式)

  在设置定义map和reduce 函数的类后,便可以开始运行任务JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台

  运行测试

  写好MapReduce作业后,通常会拿一个小型的数据集进行测试以排除代码相关问题。首先,以独立(本机)模式安装Hadoop。在这种模式下,Hadoop在本地文件系统上运行作业运行程序。现在就用上面的那5行数据进行测试。

  本人由于在公司,具有真实的分布式环境,于是登陆到系统。将在eclipse 的java已经相关的jar进行打包,通过ssh传到linux系统(公司用的是付费版的RedHat)。这个过程,我再重新写一个博客进行阐释。点击链接,即可查看。

  测试命令1: hadoop fs -put ./sample.txt /user/hduser0009/

上面的意思是将这个输入参数的文本放到集群中去。

测试命令2:hadoop jar MaxTemperature.jar sample.txt output

上面的意思是执行我打包好的jar文件,并传入两个参数。

  测试输出过程:

  



  





  从这个输出所提供的信息可以看出很多有用的信息,比如,这个作业有指定的表示,即,job_1454549337586_0042,并且执行了一个map任务和一个reduce任务。输出中的Map-Reduce Framework中有一些统计信息,例如,可以看出有5个map的输入以及5个map的输出,然后有2个reduce组的输入和2个reduce结果的输出。

   这个最终的结果和我们之前手动寻找的结果一样。我们把这个解释为1949年的最高气温纪录为11.1℃,而1950年为2.2℃。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息