您的位置:首页 > 产品设计 > UI/UE

Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce

2013-06-13 15:14 441 查看
1.数据存储与分析

问题:当磁盘的存储量随着时间的推移越来越大的时候,对磁盘上的数据的读取速度却没有多大的增长
从多个磁盘上进行并行读写操作是可行的,但是存在以下几个方面的问题:
1).第一个问题是硬件错误。硬件越多出错的几率就越大。一种常用的解决方式是数据冗余,保留多分拷贝,即使一份数据处理出错,还有另外的数据。HDFS使用的也是类似的方式,但稍有不同。
2).第二个问题是数据处理的相关性问题。例如很多分析工作在一快磁盘上处理出来的结果需要与其他磁盘上处理处理出来的结果合并才能完成任务。各种分布式系统也都给出了合并的策略,但是做好这方面确实是一个挑战。MapReduce提供了一种编程模型,他将从硬盘上读写数据的问题抽象出来,转化成对一系列键值对的计算
简而言之,Hadoop提供了一个可靠的存储和分析系统。存储又HDFS提供,分析由MapReduce提供。

2.与其他系统比较
1).RDBMS
为什么需要MapReduce?

a.磁盘的寻道时间提高的速度低于数据的传输速度,如果数据访问模式由寻道时间支配的话,在读写数据集的一大部分的时候速度就会较流式读取慢很多,这样就出现了瓶颈。

b.另一方面在更新数据集的少量数据的时候,传统的B-树工作的比较好,但是在更新数据集的大部分数据的时候B-树就显得比MapReduce方式慢了。MapReduce使用排序/合并操作去重建数据库(完成数据更新).

c.MapReduce比较适合于需要分析整个数据集,并且要使用批处理方式,特别是特定的分析的情况;RDBMS点查询方面占优势,或在已编制索引的数据集提供低延迟的检索和更新的数据,但是数据量不能太大。MapReduce适合一次写入,多次读取的操作,但是关系数据库就比较适合对数据集的持续更新。

d.MapReduce比较适合处理半结构化,非结构化的数据

e.MapReduce是可以进行线性扩展的编程模型。一个对集群级别的数据量而写的MapReduce可以不加修改的应用于小数据量或者更大数据量的处理上。更重要的是当你的输入数据增长一倍的时候,相应的处理时间也会增加一倍。但是如果你把集群也增长一倍的话,处理的速度则会和没有增加数据量时候的速度一样快,这方面对SQL查询来说不见得是正确的。

f.关系数据往往进行规则化以保证数据完整性,并删除冗余。这样做给MapReduce提出了新的问题:它使得读数据变成了非本地执行,而MapReduce的一个重要前提(假设)就是数据可以进行高速的流式读写。

2).Grid Compuing 网格计算

a.MapReduce使数据和计算在一个节点上完成,这样就变成了本地的读取。这是MapReduce高性能的核心.

b.MPI将控制权大大的交给了程序员,但是这就要求程序员明确的处理数据流等情况,而MapReduce只提供高层次的操作:程序员只需考虑处理键值对的函数,而对数据流则是比较隐晦的。

c.MapReduce是一种非共享(Shared-nothing)的架构,当MapReduce实现检测到map或者reduce过程出错的时候,他可以将错误的部分再执行一次。MPI程序员则需要明确的考虑检查点和恢复,这虽然给程序员很大自由,但是也使得程序变得难写。

3).志愿计算
MapReduce是针对在一个高聚合网络连接的数据中心中进行的可信的、使用专用的硬件工作持续数分钟或者数个小时而设计的。相比之下,志愿计算则是在不可信的、链接速度有很大差异的、没有数据本地化特性的,互联网上的计算机上运行永久的(超长时间的)计算,

3.天气数据集
数据是NCDC的数据,我们关注以下特点:
1) 数据是半格式化的
2) 目录里面存放的是从1901-2001年一个世纪的记录,是gzip压缩过的文件。
3) 以行为单位,使用ASCII格式存储,每行就是一条记录
4) 每条记录我们关注一些基本的元素,比如温度,这些数据在每条数据中都会出现,并且宽度也是固定的。
下面是一条记录的格式,为了便于显示,做了一部分调整。



4.使用Unix工具分析数据
以分析某年份的最高温度为例,下面是一段Unix的脚本程序:

#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done


这段脚本的执行过程如下:
脚本循环处理每一个压缩的年份文件,首先打印出年份,然后对每一个文件使用awk处理。Awk脚本从数据中解析出两个字段:一个air temperature,一个quality code。air temperature值加0被转换成整形。接下来查看温度数据是否有效(9999表示在NCDC数据集中丢失的值),并且检查quality code是不是可信并没有错误的。如果读取一切正常,temp将与目前的最大值比较,如果出现新的最大值,则更新当前max的值。当文件中所有行的数据都被处理之后,开始执行End程序块,并且打印出最大值。
下面是某次运行结果的起始部分:



为了加速处理速度,我们将程序的某些部分进行并行执行。这在理论上是比较简单的,我们可以按照年份来在不同的处理器上执行,使用所有可用的硬件线程,但是还是有些问题:
1).把任务切分成相同大小的块不总是那么容易的。
2).合并单独处理出来的结果还需要进一步的处理
3).人们仍旧被单机的处理能力所束缚。

5. 使用Hadoop分析数据
1).Map和Reduce
MapReduce将工作分为map阶段和reduce阶段,每个阶段都将键值对作为输入输入,键值对的类型可以由程序员选择。程序员还指定两个函数:map和reduce函数。
Map阶段的输入数据是NCDC的原始数据,我们选择文本格式输入,这样可以把记录中的每一行作为文本value。Key是当前行离开始行的偏移量,但是我们并不需要这个信息,所以忽略不要。
我们的Map函数比较简单,仅仅从输入中析取出temperature。其中,map函数仅仅是完成了数据的准备阶段,这样使得reducer函数可以基于它查找历年的最高温度。Map函数也是一个很好的过滤阶段,这里可以过滤掉丢失、置疑、错误的temperature数据。
下面是输入数据样例:



下面这些行以键值对的方式来给map函数处理,其中加粗的是我们需要处理的数据



上面的键(key)是文件中的行偏移量,map函数不需要,这里的map函数的功能仅限于提取年份和气温,并将他们作为输出:



map函数输出经由mapreduce框架中进行进一步的处理后,主要需要根据键对键/值对进行排序和分组。经过这一番处理之后,Reduce收来的结果如下:



处理这些数据,reduce所需要做的工作仅仅是遍历这些数据,找出最大值,产生最终的输出结果:



MapReduce的逻辑数据流:



2).Java MapReduce 程序
这里需要三块代码:Map函数、Reduce函数、用来运行作业的main函数
Map函数的实现
package com.hadoop.definitive.guid.MaXTemperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

public static Logger log = Logger.getLogger(MaxTemperatureMapper.class);

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
log.info("Year:" + year + "  tmperature:" + airTemperature);
}
}
}

Hadoop提供了他自己的基本类型,这些类型为网络序列化做了专门的优化。可以在org.apache.hadoop.io包中找到他们。比如LongWritable相当于Java中的Long,Text相当于String而IntWritable在相当于Integer。map()方法传入一个key和一个value。我们将Text类型的value转化成Java的String,然后用String的substring方法取出我偶们需要的部分,最后利用context.write按照键/值的形式收集数据。

Reduce函数的实现

package com.hadoop.definitive.guid.MaXTemperature;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
//System.out.println("Key=" + key + " value=" + value + " MaxValue=" + maxValue);
}
context.write(key, new IntWritable(maxValue));
}
}


负责运行MapReduce作业的main函数

package com.hadoop.definitive.guid.MaXTemperature;

import java.io.IOException;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature<local data path> <hdfs input path> <hdfs output path>");
System.exit(-1);
}

Configuration conf = new Configuration();
//Take care. If the job is running in eclipse, not in hadoop jar, then must be add special conf manally, otherwise, program will just read default conf, not the conf in hadoop/conf
conf.addResource("conf/core-site.xml");

Job job = new Job(conf);
job.setJobName("Max temperature");
job.setJarByClass(MaxTemperature.class);

// Try to upload ncdc input file "/mnt/hgfs/Shared/1901" to hdfs
FileSystem fs = FileSystem.get(conf);
upload(fs, "/mnt/hgfs/Shared/1901", args[0]);
//FileUtil.copy(FileSystem.getLocal(conf), new Path("/mnt/hgfs/Shared/1901"), fs, new Path("/definitive_guid/ncdc/input/1901"), false, true, conf);

//check output status, if exist, delete it
if(fs.exists(new Path(args[1]))) fs.delete(new Path(args[1]), true);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setCombinerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

/**
* 上传目录或文件
*
* @param fs
* @param local
* @param remote
*/
public synchronized static void upload(FileSystem fs, String local,
String remote) {
//Path homeDir = fs.getHomeDirectory();
//Path workDir = fs.getWorkingDirectory();
//System.out.println("homeDir = " + homeDir);
//System.out.println("workDir = " + workDir);

Path dst = new Path(remote);
Path src = new Path(local);
try {
fs.copyFromLocalFile(false, true, src, dst);
System.out.println("upload " + local + " to  " + remote + " successed. ");
} catch (Exception e) {
System.err.println("upload " + local + " to  " + remote + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}

}


6.数据流
1).MapReduce作业的工作单元由如下组成:输入数据、MapReduce程序、配置信息
2).Hadoop将Task分成两类:MapTask、ReduceTask.
3).为了控制Hadoop的Job运行,Hadoop有两累节点:一种是jobtracker,一种是tasktracker。Jobtracker通过调度tasktracker协调所有工作的执行。Tasktracker运行任务并将报告发送给jobtracker,jobtracker所有工作的进度。如果一个任务失败,jobtracker再重新调度一个不同的tasktracker进行工作。但是在Hadoop
0.23和2.0版本ongoing他们已经被Resource Manager、Node Manager和ApplicationMaster所替代,相应的资源并被Container所封装
4).Hadoop将MapReduce的输入数据分成固定大小的分片,成为数据分片(input split),然后为每个分片创建一个MapTask (分片默认64M大小)
5).Hadoop可以通过在存储有输入的数据节点上运行相应Map任务,提高性能(数据本地优化),另外Map任务将其树立后的数据写到本地磁盘,减少Hadoop分布式Node之间的数据传输压力。相比之下任务没有数据本地优化的优势-----单个Reduce任务的输入通常来自所有的map输出
下图展现了Data-local(数据本地),Rack-local与Off-local Map任务的区别:



下图给出MapReduce的集中执行方式数据流的情况:
一个Reduce的情况:



两个Reduce的情况:



无Reduce情况:



7.Combiner函数
为了优化集群数据传输,减少Map任务和Reduce任务之间的数据传输,Hadop针对Map任务指定一个Combiner函数(合并函数)--对本地键(key)相同的做合并处理,实际上Combiner函数很像Reduce函数,只不过运行在本地,最后Combiner函数的输出作为作为Reduce的函数的输入,Hadoop执行函数的顺序就变为Map--->Combiner--->Reduce
示例如下:
第一个Map的输出:



第二个Map的输出:



Reduce函数被调用时:



最后输出结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: