从零开始学习Hadoop--补一 一个实际的例子
2013-11-04 15:27
381 查看
1.Pi值估算原理
Hadoop自带的例子中,有一个计算Pi值的例子。这例子比较全面,它用的API是旧版的。本章先分析一下这个例子,然后再用新版的API重新实现一下。这个程序的原理是这样的。假如有一个边长为1的正方形。以正方形的一个端点为圆心,以1为半径,画一个圆弧,于是在正方形内就有了一个直角扇形。在正方形里随机生成若干的点,则有些点是在扇形内,有些点是在扇形外。正方形的面积是1,扇形的面积是0.25*Pi。设点的数量一共是n,扇形内的点数量是nc,在点足够多足够密集的情况下,会近似有nc/n的比值约等于扇形面积与正方形面积的比值,也就是nc/n=
0.25*Pi/1,即Pi = 4*nc/n。
如何生成随机点?最简单的方式是在[0,1]的区间内每次生成两个随机小数作为随机点的x和y坐标。可惜这种生成方式效果不够好,随机点之间有间隙过大和重叠的可能,会让计算精度不够高。Halton序列算法生成样本点的效果要好得多,更均匀,计算精度比随机生成的点更高,因此这个例子用Halton序列算法生成点集。关于Halton序列可以参考这里http://orion.math.iastate.edu/reu/2001/voronoi/halton_sequence.html和这里http://www.aae.wisc.edu/dphaneuf/AAE%20875/Halton%20sequences.pdf,在这里就不详细说了。
在正方形内生成的样本点越多,计算Pi值越精确,这样,这个问题就很适合用Hadoop来处理啦。假设要在正方形内生成1000万个点,可以设置10个Map任务,每个Map任务处理100万个点,也可以设置100个Map任务,每个Map任务处理10万个点。
2.旧版API的Pi值估算MapReduce程序
此处带来来自Hadoop的示例程序。为了计算,设置10个Map任务,每个任务处理1000个点,具体流程是这样的:
1)运行PiEstimator的MapReduce程序,输入参数是10,1000,意思是设置10个Map任务,每个Map任务处理1000个点。
2)PiEstimator进行初始化。初始化时,有一个步骤是在HDFS上生成一个目录,也就是输入目录。这个目录下有10个序列文件。Map任务的数量的数量决定序列文件的数量,PiEstimator就生成有10个序列文件。每个序列文件保存两个整数,分别是要处理的样本点在Halton序列的序号和生成样本点的数量。也就是说,第一个文件的内容是”0,1000”,第二个文件的内容是”1000,1000”,第三个文件的内容是“2000,1000”,第四个文件的内容是“3000,1000”,以此类推。如果用Halton序列算法生成一万个样本点,那么,第一个Map任务生成的点的序号是从0到999,第二个Map任务生成的点的序号是从1000到1999,第三个Map任务生成的点的序号是从2000到2999,以此类推。Halton序列算法生成随机点的的唯一参数是序号。
3)PiEstimator运行MapReduce任务。
4)PiEstimator从MapReduce的输出目录读取两个整数,它们分别是直角扇形内的点的数量和直角扇形外的点的数量。
5)根据4)的结果数值,计算Pi值,然后返回。
PiEstimator.java文件的对应PiEstimator类。PiEstimator类有三个内部类,分别是HalthonSequence类,PiMapper类,PiReducer类。HalthonSequence类负责产生样本点,PiMapper类是Map过程,PiReducer类是Reduce过程。
PiRefuce.java的代码如下:
packageorg.apache.hadoop.examples; importjava.io.IOException; importjava.math.BigDecimal; importjava.util.Iterator; importorg.apache.hadoop.conf.Configured; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.BooleanWritable; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.SequenceFile; importorg.apache.hadoop.io.Writable; importorg.apache.hadoop.io.WritableComparable; importorg.apache.hadoop.io.SequenceFile.CompressionType; importorg.apache.hadoop.mapred.FileInputFormat; importorg.apache.hadoop.mapred.FileOutputFormat; importorg.apache.hadoop.mapred.JobClient; importorg.apache.hadoop.mapred.JobConf; importorg.apache.hadoop.mapred.MapReduceBase; importorg.apache.hadoop.mapred.Mapper; importorg.apache.hadoop.mapred.OutputCollector; importorg.apache.hadoop.mapred.Reducer; importorg.apache.hadoop.mapred.Reporter; importorg.apache.hadoop.mapred.SequenceFileInputFormat; importorg.apache.hadoop.mapred.SequenceFileOutputFormat; importorg.apache.hadoop.util.Tool; importorg.apache.hadoop.util.ToolRunner; public classPiEstimator extends Configured implements Tool { //临时目录的路径,保存运行中的文件数据。 static privatefinal Path TMP_DIR = new Path( PiEstimator.class.getSimpleName()+ "_TMP_3_141592654"); //Halton序列类,产生随机点。 private staticclass HaltonSequence { static finalint[] P = {2, 3}; static finalint[] K = {63, 40}; private longindex; privatedouble[] x; privatedouble[][] q; privateint[][] d; //构造函数 HaltonSequence(longstartindex) { index =startindex; x = newdouble[K.length]; q = newdouble[K.length][]; d = newint[K.length][]; for(int i= 0; i < K.length; i++) { q[i] =new double[K[i]]; d[i] =new int[K[i]]; } for(int i= 0; i < K.length; i++) { long k =index; x[i] =0; for(intj = 0; j < K[i]; j++) { q[i][j]= (j == 0? 1.0: q[i][j-1])/P[i]; d[i][j]= (int)(k % P[i]); k = (k- d[i][j])/P[i]; x[i]+= d[i][j] * q[i][j]; } } } //生成点 double[]nextPoint() { index++; for(int i= 0; i < K.length; i++) { for(intj = 0; j < K[i]; j++) { d[i][j]++; x[i]+= q[i][j]; if(d[i][j] < P[i]) { break; } d[i][j]= 0; x[i]-= (j == 0? 1.0: q[i][j-1]); } } return x; } } //PiMapper类,定义Map过程 public staticclass PiMapper extends MapReduceBase implementsMapper<LongWritable, LongWritable, BooleanWritable,LongWritable> { public voidmap(LongWritable offset, LongWritablesize, OutputCollector<BooleanWritable,LongWritable> out, Reporterreporter) throws IOException { /* 函数map从序列文件里得到offset和size,offset是要产生的样本点在Halton序列里的序号,size是这个Map任务需要产生的样本点数量。 */ finalHaltonSequence haltonsequence = newHaltonSequence(offset.get()); longnumInside = 0; longnumOutside = 0; for(long i= 0; i < size.get(); ) { //生成随机点 finaldouble[] point = haltonsequence.nextPoint(); //判断随机点是否在直角扇形内。 finaldouble x = point[0] - 0.5; finaldouble y = point[1] - 0.5; if (x*x+ y*y > 0.25) { numOutside++; } else { numInside++; } i++; //每产生1000个点,就更新一下状态,提示作用。 if (i %1000 == 0) { reporter.setStatus("Generated" + i + " samples."); } } //Map任务运行完毕,输出结果。 out.collect(newBooleanWritable(true), new LongWritable(numInside)); out.collect(newBooleanWritable(false), new LongWritable(numOutside)); } } //PiReduce类,定义Reduce过程 public staticclass PiReducer extends MapReduceBase implementsReducer<BooleanWritable, LongWritable, WritableComparable<?>,Writable> { private longnumInside = 0; private longnumOutside = 0; privateJobConf conf; @Override public voidconfigure(JobConf job) { conf =job; } public voidreduce(BooleanWritable isInside, Iterator<LongWritable>values, OutputCollector<WritableComparable<?>,Writable> output, Reporterreporter) throws IOException { /* isInside是布尔类型,或者是true或者是false。 values包含10个数值,分别来自10个Map任务。 */ //在这里,累加计算10次Map任务所有的直角扇形之外和直角扇形之内的样本点数量。 if(isInside.get()) { for(;values.hasNext(); numInside += values.next().get()); } else { for(;values.hasNext(); numOutside += values.next().get()); } } //在Reduce过程结束后,将计算结果写到临时目录的文件中,以供PiEstimator类读取。 @Override public voidclose() throws IOException { PathoutDir = new Path(TMP_DIR, "out"); PathoutFile = new Path(outDir, "reduce-out"); FileSystemfileSys = FileSystem.get(conf); SequenceFile.Writerwriter = SequenceFile.createWriter(fileSys, conf, outFile,LongWritable.class, LongWritable.class, CompressionType.NONE); writer.append(newLongWritable(numInside), new LongWritable(numOutside)); writer.close(); } } //PiEstimator类的estimate函数,运行MapReduce,然后计算Pi值。 public staticBigDecimal estimate(int numMaps, long numPoints, JobConf jobConf ) throwsIOException { jobConf.setJobName(PiEstimator.class.getSimpleName()); jobConf.setInputFormat(SequenceFileInputFormat.class); jobConf.setOutputKeyClass(BooleanWritable.class); jobConf.setOutputValueClass(LongWritable.class); jobConf.setOutputFormat(SequenceFileOutputFormat.class); jobConf.setMapperClass(PiMapper.class); jobConf.setNumMapTasks(numMaps); jobConf.setReducerClass(PiReducer.class); jobConf.setNumReduceTasks(1); /* 关掉 speculativeexecution功能。 speculativeexecution功能是指,假如Hadoop发现有些任务执行的比较慢,那么,它会在其他的节点上再运行一个同样的任务。这两个任务,哪个先完成就以哪个结果为准。 但Reduce任务需要将数值写入到HDFS的文件里,而且这个文件名是固定的,如果同时运行两个以上的Reduce任务,会导致写入出错,所以要关闭这个功能。 */ jobConf.setSpeculativeExecution(false); //设置输入目录和输出目录 final PathinDir = new Path(TMP_DIR, "in"); final PathoutDir = new Path(TMP_DIR, "out"); FileInputFormat.setInputPaths(jobConf,inDir); FileOutputFormat.setOutputPath(jobConf,outDir); //生成输入目录 finalFileSystem fs = FileSystem.get(jobConf); if(fs.exists(TMP_DIR)) { throw newIOException("Tmp directory " +fs.makeQualified(TMP_DIR) + "already exists. Please remove it first."); } if(!fs.mkdirs(inDir)) { throw newIOException("Cannot create input directory " + inDir); } try { //为每个Map任务生成一个序列文件,并写入数值。 for(inti=0; i < numMaps; ++i) { finalPath file = new Path(inDir, "part"+i); finalLongWritable offset = new LongWritable(i * numPoints); finalLongWritable size = new LongWritable(numPoints); finalSequenceFile.Writer writer = SequenceFile.createWriter( fs,jobConf, file, LongWritable.class,LongWritable.class, CompressionType.NONE); try { writer.append(offset,size); }finally { writer.close(); } System.out.println("Wroteinput for Map #"+i); } //运行MapReduce任务 System.out.println("StartingJob"); final longstartTime = System.currentTimeMillis(); JobClient.runJob(jobConf); finaldouble duration = (System.currentTimeMillis() -startTime)/1000.0; System.out.println("JobFinished in " + duration + " seconds"); //读取输出结果 PathinFile = new Path(outDir, "reduce-out"); LongWritablenumInside = new LongWritable(); LongWritablenumOutside = new LongWritable(); SequenceFile.Readerreader = new SequenceFile.Reader(fs, inFile, jobConf); try { reader.next(numInside,numOutside); } finally{ reader.close(); } //计算Pi值然后返回 returnBigDecimal.valueOf(4).setScale(20) .multiply(BigDecimal.valueOf(numInside.get())) .divide(BigDecimal.valueOf(numMaps)) .divide(BigDecimal.valueOf(numPoints)); } finally { fs.delete(TMP_DIR,true); } } //实现Tool接口的run函数 public intrun(String[] args) throws Exception { if(args.length != 2) { System.err.println("Usage:"+getClass().getName()+" <nMaps> <nSamples>"); ToolRunner.printGenericCommandUsage(System.err); return -1; } final intnMaps = Integer.parseInt(args[0]); final longnSamples = Long.parseLong(args[1]); System.out.println("Numberof Maps = " + nMaps); System.out.println("Samplesper Map = " + nSamples); finalJobConf jobConf = new JobConf(getConf(), getClass()); System.out.println("Estimatedvalue of Pi is " +estimate(nMaps, nSamples, jobConf)); return 0; } //PiEstimator类的main函数 public staticvoid main(String[] argv) throws Exception { System.exit(ToolRunner.run(null,new PiEstimator(), argv)); } } |
3.新版API的Pi值估算的MapReduce程序
新版API和旧版API有较大差异,包括Mapper,Reducer,Job都有变化。本章在旧版API的代码上进行修改,给出一个基于新版API的程序。在多数地方已经做了注释,故不再一一解释。3.1NewPiEst.java文件的源代码
packagecom.brianchen.hadoop; importjava.lang.Exception; importjava.lang.Integer; importjava.math.BigDecimal; importjava.io.IOException; importjava.lang.InterruptedException; importjava.lang.Iterable; importorg.apache.hadoop.conf.Configured; importorg.apache.hadoop.util.Tool; importorg.apache.hadoop.util.ToolRunner; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.io.SequenceFile; importorg.apache.hadoop.io.SequenceFile.CompressionType; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; importorg.apache.commons.logging.Log; importorg.apache.commons.logging.LogFactory; importorg.apache.hadoop.util.GenericOptionsParser; public classNewPiEst extends Configured implements Tool{ //临时目录,存储用 static privatefinal Path TMP_DIR = new Path("pitmp"); //Log static finalLog LOG = LogFactory.getLog(NewPiEst.class); //Halton序列的类 private staticclass HaltonSequence{ // bases static finalint[] P = {2, 3}; // maximumnumber of digits allowed static finalint[] K = {63, 40}; private longindex; privatedouble[] x; privatedouble[][] q; privateint[][] d; HaltonSequence(longstartindex){ index =startindex; x = newdouble[K.length]; q = newdouble[K.length][]; d = newint[K.length][]; for(int i= 0; i < K.length; i++){ q[i] =new double[K[i]]; d[i] =new int[K[i]]; } for(int i= 0; i < K.length; i++){ long k =index; x[i] =0; for(intj = 0; j < K[i]; j++){ q[i][j]= (j == 0? 1.0: q[i][j-1])/P[i]; d[i][j]= (int)(k % P[i]); k = (k- d[i][j])/P[i]; x[i]+= d[i][j] * q[i][j]; } } } double[]nextPoint(){ index++; for(int i= 0; i < K.length; i++){ for(int j = 0;j < K[i]; j++){ d[i][j]++; x[i]+= q[i][j]; if(d[i][j] < P[i]){ break; } d[i][j]= 0; x[i]-= (j == 0? 1.0: q[i][j-1]); } } return x; } } //新版API的Mapper类 public staticclass PiMapper extends Mapper<LongWritable, LongWritable,LongWritable, LongWritable>{ public voidmap(LongWritable offset, LongWritable size, Context context) throwsIOException, InterruptedException{ finalHaltonSequence hs = new HaltonSequence(offset.get()); longnInside = 0; longnOutside = 0; for(int i= 0; i < size.get(); i++){ final double[]point = hs.nextPoint(); if(point[0]*point[0] + point[1]*point[1] > 1){ nOutside++; }else{ nInside++; } context.write(newLongWritable(1), new LongWritable(nOutside)); context.write(newLongWritable(2), new LongWritable(nInside)); } } } //新版API的Reducer类 public staticclass PiReducer extends Reducer<LongWritable,LongWritable, LongWritable, LongWritable> { long nInside= 0; longnOutside = 0; public voidreduce(LongWritable isInside, Iterable<LongWritable>values, Context context) throwsIOException, InterruptedException{ if(isInside.get() == 2 ){ for(LongWritable val : values) { nInside +=val.get(); } }else{ for(LongWritable val : values) { nOutside +=val.get(); } } LOG.info("reduce-log:"+ "isInside = " + isInside.get() + ", nInside ="+ nInside + ", nOutSide = "+nOutside ); } //Reducer类在结束前执行cleanup函数,于是在这里将reduce过程计算的nInside和nOutSide写入文件。 @Override protectedvoid cleanup(Context context) throws IOException,InterruptedException{ PathOutDir = new Path(TMP_DIR, "out"); PathoutFile = new Path(OutDir, "reduce-out"); Configurationconf = new Configuration(); FileSystemfs = FileSystem.get(conf); SequenceFile.Writerwriter = SequenceFile.createWriter( fs,conf, outFile, LongWritable.class, LongWritable.class,CompressionType.NONE); writer.append(newLongWritable(nInside), new LongWritable(nOutside)); writer.close(); } } public staticBigDecimal estimate(int nMaps, int nSamples, Job job)throwsException{ LOG.info("\n\nestimate \n\n"); //设置Job的Jar,Mapper,Reducer等等 job.setJarByClass(NewPiEst.class); job.setMapperClass(PiMapper.class); job.setReducerClass(PiReducer.class); job.setNumReduceTasks(1); //设置输入输出格式为序列文件格式 job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //设置输出键和输出值的类型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); job.setSpeculativeExecution(false); Path inDir =new Path(TMP_DIR, "in"); Path outDir= new Path(TMP_DIR, "out"); //设置输入文件所在目录和输出结果所在目录 FileInputFormat.addInputPath(job,inDir); FileOutputFormat.setOutputPath(job,outDir); //检查目录 FileSystemfs = FileSystem.get(job.getConfiguration()); if(fs.exists(TMP_DIR)){ throw newIOException("Tmp directory " +fs.makeQualified(TMP_DIR) + " already exists, pls removeit."); } //生成目录 if(!fs.mkdirs(inDir)){ throw newIOException("Cannot create input directory " +inDir); } try{ //生成若干个序列文件,每个文件放两个整数。每个序列文件将对应一个Map任务 for(int i= 0; i < nMaps; i++){ final Path file= new Path(inDir, "part"+i); finalLongWritable offset = new LongWritable(i*nSamples); finalLongWritable size = new LongWritable(nSamples); finalSequenceFile.Writer writer = SequenceFile.createWriter( fs,job.getConfiguration(), file, LongWritable.class,LongWritable.class, CompressionType.NONE); writer.append(offset,size); writer.close(); System.out.println("wroteinput for Map #" + i); } //执行MapReduce任务 System.out.println("startingmapreduce job"); final longstartTime = System.currentTimeMillis(); booleanret = job.waitForCompletion(true); finaldouble duration = (System.currentTimeMillis() -startTime)/1000.0; System.out.println("Jobfinished in " + duration + " seconds."); //从HDFS将MapReduce的结果读取出来 PathinFile = new Path(outDir, "reduce-out"); LongWritablenInside = new LongWritable(); LongWritablenOutside = new LongWritable(); SequenceFile.Readerreader = new SequenceFile.Reader(fs, inFile,job.getConfiguration()); reader.next(nInside,nOutside); reader.close(); LOG.info("estimate-log:" + "nInside = "+nInside.get()+", nOutSide ="+nOutside.get()); //计算Pi值然后返回 returnBigDecimal.valueOf(4).multiply( BigDecimal.valueOf(nInside.get()) ).divide( BigDecimal.valueOf(nInside.get()+ nOutside.get()), 20, BigDecimal.ROUND_HALF_DOWN ); }finally{ fs.delete(TMP_DIR,true); } } public intrun(String[] args) throws Exception{ LOG.info("\n\nrun \n\n"); if(args.length != 2){ System.err.println("Use:NewPieEst 10 10000"); System.exit(1); } //解析参数 int nMaps =Integer.parseInt(args[0]); int nSamples= Integer.parseInt(args[1]); Configurationconf = new Configuration(); String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); Job job =new Job(conf, "Pi estimating job"); System.out.println("Pi= " + estimate(nMaps, nSamples, job)); return 0; } public staticvoid main(String[] argv) throws Exception{ LOG.info("\n\nmain \n\n"); System.exit(ToolRunner.run(null,new NewPiEst(), argv)); } } |
3.2 编译和运行
项目的目录和文件结构跟前面章节类似,请作为练习处理,下面给出近似的各命令。编译:”javac-cp/home/brian/usr/hadoop/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-logging-1.1.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-cli-1.2.jar-d
./classes/ src/*.java”
打包:”jar-cvf newpiest.jar -C ./classes/ .”
运行:”./bin/hadoopjar~/all/work/ifitus/ifitus-dev-svn/hadoop_from_zero/src/chp08/newpiest/newpiest.jar com.brianchen.hadoop.NewPiEst 10 1000”
相关文章推荐
- 通过一个实际的例子学习Oracle存储过程
- 通过一个实际的例子学习Oracle存储过程
- spring security学习- 一个实际的例子
- 通过一个实际的例子学习Oracle存储过程
- 通过一个实际的例子学习SQLServer存储过程
- 通过一个实际的例子学习Oracle存储过程
- 通过一个实际的例子学习Oracle存储过程
- 通过一个实际的例子学习Oracle存储过程
- 通过一个实际的例子学习SQLServer存储过程
- 通过一个实际的例子学习Oracle存储过程
- scrapy学习笔记1---一个爬取的完整例子
- 一个学习进程与线程的例子
- 一个学习for 循环语句不错的例子
- 从零开始最短路径学习Hadoop之03----HDFS分布式文件系统
- 从零开始学习Node.js例子八 使用SQLite3和MongoDB
- 学习struts2拦截器非常好的一个例子(转载)
- 从零开始学习OpenCL开发(二)一个最简单的示例与简单性能分析
- JS学习----面向对象编程的一个例子
- 与afreez一起学习DirectFB之:一个linux下的framebuffer例子的学问