您的位置:首页 > 运维架构

从零开始学习Hadoop--补一 一个实际的例子

2013-11-04 15:27 381 查看

1.Pi值估算原理

Hadoop自带的例子中,有一个计算Pi值的例子。这例子比较全面,它用的API是旧版的。本章先分析一下这个例子,然后再用新版的API重新实现一下。

这个程序的原理是这样的。假如有一个边长为1的正方形。以正方形的一个端点为圆心,以1为半径,画一个圆弧,于是在正方形内就有了一个直角扇形。在正方形里随机生成若干的点,则有些点是在扇形内,有些点是在扇形外。正方形的面积是1,扇形的面积是0.25*Pi。设点的数量一共是n,扇形内的点数量是nc,在点足够多足够密集的情况下,会近似有nc/n的比值约等于扇形面积与正方形面积的比值,也就是nc/n=
0.25*Pi/1,即Pi = 4*nc/n。

如何生成随机点?最简单的方式是在[0,1]的区间内每次生成两个随机小数作为随机点的x和y坐标。可惜这种生成方式效果不够好,随机点之间有间隙过大和重叠的可能,会让计算精度不够高。Halton序列算法生成样本点的效果要好得多,更均匀,计算精度比随机生成的点更高,因此这个例子用Halton序列算法生成点集。关于Halton序列可以参考这里http://orion.math.iastate.edu/reu/2001/voronoi/halton_sequence.html和这里http://www.aae.wisc.edu/dphaneuf/AAE%20875/Halton%20sequences.pdf,在这里就不详细说了。

在正方形内生成的样本点越多,计算Pi值越精确,这样,这个问题就很适合用Hadoop来处理啦。假设要在正方形内生成1000万个点,可以设置10个Map任务,每个Map任务处理100万个点,也可以设置100个Map任务,每个Map任务处理10万个点。

2.旧版API的Pi值估算MapReduce程序

此处带来来自Hadoop的示例程序。
为了计算,设置10个Map任务,每个任务处理1000个点,具体流程是这样的:
1)运行PiEstimator的MapReduce程序,输入参数是10,1000,意思是设置10个Map任务,每个Map任务处理1000个点。

2)PiEstimator进行初始化。初始化时,有一个步骤是在HDFS上生成一个目录,也就是输入目录。这个目录下有10个序列文件。Map任务的数量的数量决定序列文件的数量,PiEstimator就生成有10个序列文件。每个序列文件保存两个整数,分别是要处理的样本点在Halton序列的序号和生成样本点的数量。也就是说,第一个文件的内容是”0,1000”,第二个文件的内容是”1000,1000”,第三个文件的内容是“2000,1000”,第四个文件的内容是“3000,1000”,以此类推。如果用Halton序列算法生成一万个样本点,那么,第一个Map任务生成的点的序号是从0到999,第二个Map任务生成的点的序号是从1000到1999,第三个Map任务生成的点的序号是从2000到2999,以此类推。Halton序列算法生成随机点的的唯一参数是序号。

3)PiEstimator运行MapReduce任务。

4)PiEstimator从MapReduce的输出目录读取两个整数,它们分别是直角扇形内的点的数量和直角扇形外的点的数量。

5)根据4)的结果数值,计算Pi值,然后返回。

PiEstimator.java文件的对应PiEstimator类。PiEstimator类有三个内部类,分别是HalthonSequence类,PiMapper类,PiReducer类。HalthonSequence类负责产生样本点,PiMapper类是Map过程,PiReducer类是Reduce过程。

PiRefuce.java的代码如下:

packageorg.apache.hadoop.examples;

importjava.io.IOException;
importjava.math.BigDecimal;
importjava.util.Iterator;

importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.BooleanWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.SequenceFile;
importorg.apache.hadoop.io.Writable;
importorg.apache.hadoop.io.WritableComparable;
importorg.apache.hadoop.io.SequenceFile.CompressionType;
importorg.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
importorg.apache.hadoop.mapred.JobConf;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.Mapper;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reducer;
importorg.apache.hadoop.mapred.Reporter;
importorg.apache.hadoop.mapred.SequenceFileInputFormat;
importorg.apache.hadoop.mapred.SequenceFileOutputFormat;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;

public classPiEstimator extends Configured implements Tool {
//临时目录的路径,保存运行中的文件数据。
static privatefinal Path TMP_DIR = new Path(
PiEstimator.class.getSimpleName()+ "_TMP_3_141592654");

//Halton序列类,产生随机点。
private staticclass HaltonSequence {
static finalint[] P = {2, 3};
static finalint[] K = {63, 40};

private longindex;
privatedouble[] x;
privatedouble[][] q;
privateint[][] d;

//构造函数
HaltonSequence(longstartindex) {
index =startindex;
x = newdouble[K.length];
q = newdouble[K.length][];
d = newint[K.length][];
for(int i= 0; i < K.length; i++) {
q[i] =new double[K[i]];
d[i] =new int[K[i]];
}

for(int i= 0; i < K.length; i++) {
long k =index;
x[i] =0;
for(intj = 0; j < K[i]; j++) {
q[i][j]= (j == 0? 1.0: q[i][j-1])/P[i];
d[i][j]= (int)(k % P[i]);
k = (k- d[i][j])/P[i];
x[i]+= d[i][j] * q[i][j];
}
}
}

//生成点
double[]nextPoint() {
index++;
for(int i= 0; i < K.length; i++) {
for(intj = 0; j < K[i]; j++) {
d[i][j]++;
x[i]+= q[i][j];
if(d[i][j] < P[i]) {
break;
}
d[i][j]= 0;
x[i]-= (j == 0? 1.0: q[i][j-1]);
}
}
return x;
}
}

//PiMapper类,定义Map过程
public staticclass PiMapper extends MapReduceBase
implementsMapper<LongWritable, LongWritable, BooleanWritable,LongWritable> {
public voidmap(LongWritable offset,
LongWritablesize,
OutputCollector<BooleanWritable,LongWritable> out,
Reporterreporter) throws IOException {
/*
函数map从序列文件里得到offset和size,offset是要产生的样本点在Halton序列里的序号,size是这个Map任务需要产生的样本点数量。
*/
finalHaltonSequence haltonsequence = newHaltonSequence(offset.get());
longnumInside = 0;
longnumOutside = 0;

for(long i= 0; i < size.get(); ) {
//生成随机点
finaldouble[] point = haltonsequence.nextPoint();
//判断随机点是否在直角扇形内。
finaldouble x = point[0] - 0.5;
finaldouble y = point[1] - 0.5;
if (x*x+ y*y > 0.25) {
numOutside++;
} else {
numInside++;
}
i++;
//每产生1000个点,就更新一下状态,提示作用。
if (i %1000 == 0) {
reporter.setStatus("Generated" + i + " samples.");
}
}

//Map任务运行完毕,输出结果。
out.collect(newBooleanWritable(true), new LongWritable(numInside));
out.collect(newBooleanWritable(false), new LongWritable(numOutside));
}
}

//PiReduce类,定义Reduce过程
public staticclass PiReducer extends MapReduceBase
implementsReducer<BooleanWritable, LongWritable, WritableComparable<?>,Writable> {
private longnumInside = 0;
private longnumOutside = 0;
privateJobConf conf;

@Override
public voidconfigure(JobConf job) {
conf =job;
}

public voidreduce(BooleanWritable isInside,
Iterator<LongWritable>values,
OutputCollector<WritableComparable<?>,Writable> output,
Reporterreporter) throws IOException {
/*
isInside是布尔类型,或者是true或者是false。
values包含10个数值,分别来自10个Map任务。
*/

//在这里,累加计算10次Map任务所有的直角扇形之外和直角扇形之内的样本点数量。
if(isInside.get()) {
for(;values.hasNext(); numInside += values.next().get());
} else {
for(;values.hasNext(); numOutside += values.next().get());
}
}

//在Reduce过程结束后,将计算结果写到临时目录的文件中,以供PiEstimator类读取。
@Override
public voidclose() throws IOException {
PathoutDir = new Path(TMP_DIR, "out");
PathoutFile = new Path(outDir, "reduce-out");
FileSystemfileSys = FileSystem.get(conf);
SequenceFile.Writerwriter = SequenceFile.createWriter(fileSys, conf,
outFile,LongWritable.class, LongWritable.class,
CompressionType.NONE);
writer.append(newLongWritable(numInside), new LongWritable(numOutside));
writer.close();
}
}

//PiEstimator类的estimate函数,运行MapReduce,然后计算Pi值。
public staticBigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throwsIOException {
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);

jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);

jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);

jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);

/*
关掉 speculativeexecution功能。
speculativeexecution功能是指,假如Hadoop发现有些任务执行的比较慢,那么,它会在其他的节点上再运行一个同样的任务。这两个任务,哪个先完成就以哪个结果为准。
但Reduce任务需要将数值写入到HDFS的文件里,而且这个文件名是固定的,如果同时运行两个以上的Reduce任务,会导致写入出错,所以要关闭这个功能。
*/
jobConf.setSpeculativeExecution(false);

//设置输入目录和输出目录
final PathinDir = new Path(TMP_DIR, "in");
final PathoutDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf,inDir);
FileOutputFormat.setOutputPath(jobConf,outDir);

//生成输入目录
finalFileSystem fs = FileSystem.get(jobConf);
if(fs.exists(TMP_DIR)) {
throw newIOException("Tmp directory " +fs.makeQualified(TMP_DIR)
+ "already exists. Please remove it first.");
}

if(!fs.mkdirs(inDir)) {
throw newIOException("Cannot create input directory " + inDir);
}

try {
//为每个Map任务生成一个序列文件,并写入数值。
for(inti=0; i < numMaps; ++i) {
finalPath file = new Path(inDir, "part"+i);
finalLongWritable offset = new LongWritable(i * numPoints);
finalLongWritable size = new LongWritable(numPoints);
finalSequenceFile.Writer writer = SequenceFile.createWriter(
fs,jobConf, file,
LongWritable.class,LongWritable.class, CompressionType.NONE);
try {
writer.append(offset,size);
}finally {
writer.close();
}
System.out.println("Wroteinput for Map #"+i);
}

//运行MapReduce任务
System.out.println("StartingJob");
final longstartTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
finaldouble duration = (System.currentTimeMillis() -startTime)/1000.0;
System.out.println("JobFinished in " + duration + " seconds");

//读取输出结果
PathinFile = new Path(outDir, "reduce-out");
LongWritablenumInside = new LongWritable();
LongWritablenumOutside = new LongWritable();
SequenceFile.Readerreader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside,numOutside);
} finally{
reader.close();
}

//计算Pi值然后返回
returnBigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR,true);
}
}

//实现Tool接口的run函数
public intrun(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage:"+getClass().getName()+" <nMaps> <nSamples>");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

final intnMaps = Integer.parseInt(args[0]);
final longnSamples = Long.parseLong(args[1]);

System.out.println("Numberof Maps = " + nMaps);
System.out.println("Samplesper Map = " + nSamples);

finalJobConf jobConf = new JobConf(getConf(), getClass());
System.out.println("Estimatedvalue of Pi is "
+estimate(nMaps, nSamples, jobConf));
return 0;
}

//PiEstimator类的main函数
public staticvoid main(String[] argv) throws Exception {
System.exit(ToolRunner.run(null,new PiEstimator(), argv));
}
}

PiEstimator的运行次序是:1)PiEstimator类的main函数;2)PiEsitmator类的run函数;3)PiEsitmator类的estimat函数。

3.新版API的Pi值估算的MapReduce程序

新版API和旧版API有较大差异,包括Mapper,Reducer,Job都有变化。本章在旧版API的代码上进行修改,给出一个基于新版API的程序。在多数地方已经做了注释,故不再一一解释。

3.1NewPiEst.java文件的源代码

packagecom.brianchen.hadoop;

importjava.lang.Exception;
importjava.lang.Integer;
importjava.math.BigDecimal;
importjava.io.IOException;
importjava.lang.InterruptedException;
importjava.lang.Iterable;

importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.io.SequenceFile;
importorg.apache.hadoop.io.SequenceFile.CompressionType;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
importorg.apache.commons.logging.Log;
importorg.apache.commons.logging.LogFactory;
importorg.apache.hadoop.util.GenericOptionsParser;

public classNewPiEst extends Configured implements Tool{
//临时目录,存储用
static privatefinal Path TMP_DIR = new Path("pitmp");
//Log
static finalLog LOG = LogFactory.getLog(NewPiEst.class);

//Halton序列的类
private staticclass HaltonSequence{
// bases
static finalint[] P = {2, 3};
// maximumnumber of digits allowed
static finalint[] K = {63, 40};

private longindex;
privatedouble[] x;
privatedouble[][] q;
privateint[][] d;

HaltonSequence(longstartindex){
index =startindex;
x = newdouble[K.length];
q = newdouble[K.length][];
d = newint[K.length][];
for(int i= 0; i < K.length; i++){
q[i] =new double[K[i]];
d[i] =new int[K[i]];
}

for(int i= 0; i < K.length; i++){
long k =index;
x[i] =0;

for(intj = 0; j < K[i]; j++){
q[i][j]= (j == 0? 1.0: q[i][j-1])/P[i];
d[i][j]= (int)(k % P[i]);
k = (k- d[i][j])/P[i];
x[i]+= d[i][j] * q[i][j];
}
}
}

double[]nextPoint(){
index++;
for(int i= 0; i < K.length; i++){
for(int j = 0;j < K[i]; j++){
d[i][j]++;
x[i]+= q[i][j];
if(d[i][j] < P[i]){
break;
}
d[i][j]= 0;
x[i]-= (j == 0? 1.0: q[i][j-1]);
}
}
return x;
}
}

//新版API的Mapper类
public staticclass PiMapper extends Mapper<LongWritable, LongWritable,LongWritable, LongWritable>{
public voidmap(LongWritable offset, LongWritable size, Context context)
throwsIOException, InterruptedException{
finalHaltonSequence hs = new HaltonSequence(offset.get());
longnInside = 0;
longnOutside = 0;

for(int i= 0; i < size.get(); i++){
final double[]point = hs.nextPoint();
if(point[0]*point[0] + point[1]*point[1] > 1){
nOutside++;
}else{
nInside++;
}

context.write(newLongWritable(1), new LongWritable(nOutside));
context.write(newLongWritable(2), new LongWritable(nInside));
}
}
}

//新版API的Reducer类
public staticclass PiReducer extends
Reducer<LongWritable,LongWritable, LongWritable, LongWritable> {
long nInside= 0;
longnOutside = 0;

public voidreduce(LongWritable isInside, Iterable<LongWritable>values, Context context)
throwsIOException, InterruptedException{
if(isInside.get() == 2 ){
for(LongWritable val : values) {
nInside +=val.get();
}
}else{
for(LongWritable val : values) {
nOutside +=val.get();
}
}

LOG.info("reduce-log:"+ "isInside = " + isInside.get() + ", nInside ="+ nInside + ", nOutSide = "+nOutside );
}

//Reducer类在结束前执行cleanup函数,于是在这里将reduce过程计算的nInside和nOutSide写入文件。
@Override
protectedvoid cleanup(Context context) throws IOException,InterruptedException{
PathOutDir = new Path(TMP_DIR, "out");
PathoutFile = new Path(OutDir, "reduce-out");
Configurationconf = new Configuration();
FileSystemfs = FileSystem.get(conf);
SequenceFile.Writerwriter = SequenceFile.createWriter(
fs,conf, outFile, LongWritable.class, LongWritable.class,CompressionType.NONE);
writer.append(newLongWritable(nInside), new LongWritable(nOutside));
writer.close();
}
}

public staticBigDecimal estimate(int nMaps, int nSamples, Job job)throwsException{
LOG.info("\n\nestimate \n\n");

//设置Job的Jar,Mapper,Reducer等等
job.setJarByClass(NewPiEst.class);
job.setMapperClass(PiMapper.class);
job.setReducerClass(PiReducer.class);
job.setNumReduceTasks(1);

//设置输入输出格式为序列文件格式
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

//设置输出键和输出值的类型
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);

job.setSpeculativeExecution(false);

Path inDir =new Path(TMP_DIR, "in");
Path outDir= new Path(TMP_DIR, "out");

//设置输入文件所在目录和输出结果所在目录
FileInputFormat.addInputPath(job,inDir);
FileOutputFormat.setOutputPath(job,outDir);

//检查目录
FileSystemfs = FileSystem.get(job.getConfiguration());
if(fs.exists(TMP_DIR)){
throw newIOException("Tmp directory " +fs.makeQualified(TMP_DIR) + " already exists, pls removeit.");
}

//生成目录
if(!fs.mkdirs(inDir)){
throw newIOException("Cannot create input directory " +inDir);
}

try{
//生成若干个序列文件,每个文件放两个整数。每个序列文件将对应一个Map任务
for(int i= 0; i < nMaps; i++){
final Path file= new Path(inDir, "part"+i);
finalLongWritable offset = new LongWritable(i*nSamples);
finalLongWritable size = new LongWritable(nSamples);
finalSequenceFile.Writer writer = SequenceFile.createWriter(
fs,job.getConfiguration(), file,
LongWritable.class,LongWritable.class, CompressionType.NONE);
writer.append(offset,size);
writer.close();
System.out.println("wroteinput for Map #" + i);
}

//执行MapReduce任务
System.out.println("startingmapreduce job");
final longstartTime = System.currentTimeMillis();
booleanret = job.waitForCompletion(true);
finaldouble duration = (System.currentTimeMillis() -startTime)/1000.0;
System.out.println("Jobfinished in " + duration + " seconds.");

//从HDFS将MapReduce的结果读取出来
PathinFile = new Path(outDir, "reduce-out");
LongWritablenInside = new LongWritable();
LongWritablenOutside = new LongWritable();
SequenceFile.Readerreader = new SequenceFile.Reader(fs, inFile,job.getConfiguration());
reader.next(nInside,nOutside);
reader.close();
LOG.info("estimate-log:" + "nInside = "+nInside.get()+", nOutSide ="+nOutside.get());

//计算Pi值然后返回
returnBigDecimal.valueOf(4).multiply(
BigDecimal.valueOf(nInside.get())
).divide(
BigDecimal.valueOf(nInside.get()+ nOutside.get()), 20, BigDecimal.ROUND_HALF_DOWN
);
}finally{
fs.delete(TMP_DIR,true);
}
}

public intrun(String[] args) throws Exception{
LOG.info("\n\nrun \n\n");
if(args.length != 2){
System.err.println("Use:NewPieEst 10 10000");
System.exit(1);

}

//解析参数
int nMaps =Integer.parseInt(args[0]);
int nSamples= Integer.parseInt(args[1]);
Configurationconf = new Configuration();
String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
Job job =new Job(conf, "Pi estimating job");

System.out.println("Pi= " + estimate(nMaps, nSamples, job));

return 0;
}

public staticvoid main(String[] argv) throws Exception{
LOG.info("\n\nmain \n\n");
System.exit(ToolRunner.run(null,new NewPiEst(), argv));
}
}

3.2 编译和运行

项目的目录和文件结构跟前面章节类似,请作为练习处理,下面给出近似的各命令。
编译:”javac-cp/home/brian/usr/hadoop/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-logging-1.1.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-cli-1.2.jar-d
./classes/ src/*.java”
打包:”jar-cvf newpiest.jar -C ./classes/ .”
运行:”./bin/hadoopjar~/all/work/ifitus/ifitus-dev-svn/hadoop_from_zero/src/chp08/newpiest/newpiest.jar com.brianchen.hadoop.NewPiEst 10 1000”
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop 大数据 hdfs