Hadoop的“全局变量”
2012-09-27 16:29
134 查看
以前有做过在Hadoop编写程序时使用全局变量的想法,但是最后却没有实现,上网查才看到说Hadoop不支持全局变量。但是有时候编程的时候又会用到,比如编写k-means算法的时候,如果可以有个全局变量存储中心点该多好呀。其实在hadoop中确实是有相关的实现的,比如可以在mapper中的setup函数中读取一个小文件,然后从这个文件中取出全局变量的值。
那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:
下面贴代码:
KmeansDriver:
Mapper:
Reducer:
DataPro:
输出文件如下:
那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:
0.0 0.2 0.4 0.3 0.2 0.4 0.4 0.2 0.4 0.5 0.2 0.4 5.0 5.2 5.4 6.0 5.2 6.4 4.0 5.2 4.4 10.3 10.4 10.5 10.3 10.4 10.5 10.3 10.4 10.5而且还有一个小数据文件(中心点)如下:
0 0 0 5 5 5 10 10 10我想做的事情就是把输入数据按照中心点求平均值,即首先我把输入数据分类,比如倒数三行应该都是属于(10,10,10)这个中心点的,那么我的map就把倒数三行的key都赋值为2,然后value值还是保持这三行不变。在reduce阶段,我求出相同key的sum值,同时求出一共的行数count,最后我用sum/count得到我想要的按照中心点求出的平均值了。
下面贴代码:
KmeansDriver:
package org.fansy.date927; import java.io.IOException; //import org.apache.commons.logging.Log; //import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; //import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class KmeansDriver { /** * k-means algorithm program */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf=new Configuration(); // set the centers data file Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers"); DistributedCache.addCacheFile(centersFile.toUri(), conf); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: KmeansDriver <in> <out>"); System.exit(2); } Job job = new Job(conf, "kmeans job"); job.setJarByClass(KmeansDriver.class); job.setMapperClass(KmeansM.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(DataPro.class); job.setNumReduceTasks(2); // job.setCombinerClass(KmeansC.class); job.setReducerClass(KmeansR.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); if(!job.waitForCompletion(true)){ System.exit(1); // run error then exit } } }上面代码中加红的部分比较重要,是mapper的setup函数实现读取文件数据的关键;
Mapper:
package org.fansy.date927; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{ private static Log log=LogFactory.getLog(KmeansM.class); private double[][] centers; private int dimention_m; // this is the k private int dimention_n; // this is the features static enum Counter{Fansy_Miss_Records}; @Override public void setup(Context context) throws IOException,InterruptedException{ Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration()); if(caches==null||caches.length<=0){ log.error("center file does not exist"); System.exit(1); } @SuppressWarnings("resource") BufferedReader br=new BufferedReader(new FileReader(caches[0].toString())); String line; List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>(); ArrayList<Double> center=null; // get the file data while((line=br.readLine())!=null){ center=new ArrayList<Double>(); String[] str=line.split("\t"); for(int i=0;i<str.length;i++){ center.add(Double.parseDouble(str[i])); } temp_centers.add(center); } // fill the centers @SuppressWarnings("unchecked") ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{}); dimention_m=temp_centers.size(); dimention_n=newcenters[0].size(); centers=new double[dimention_m][dimention_n]; for(int i=0;i<dimention_m;i++){ Double[] temp_double=newcenters[i].toArray(new Double[]{}); for(int j=0;j<dimention_n;j++){ centers[i][j]=temp_double[j]; // System.out.print(temp_double[j]+","); } // System.out.println(); } } public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String[] values=value.toString().split("\t"); if(values.length!=dimention_n){ context.getCounter(Counter.Fansy_Miss_Records).increment(1); return; } double[] temp_double=new double[values.length]; for(int i=0;i<values.length;i++){ temp_double[i]=Double.parseDouble(values[i]); } // set the index double distance=Double.MAX_VALUE; double temp_distance=0.0; int index=0; for(int i=0;i<dimention_m;i++){ double[] temp_center=centers[i]; temp_distance=getEnumDistance(temp_double,temp_center); if(temp_distance<distance){ index=i; distance=temp_distance; } } DataPro newvalue=new DataPro(); newvalue.set(value, new IntWritable(1)); context.write(new IntWritable(index), newvalue); } public static double getEnumDistance(double[] source,double[] other){ // get the distance double distance=0.0; if(source.length!=other.length){ return Double.MAX_VALUE; } for(int i=0;i<source.length;i++){ distance+=(source[i]-other[i])*(source[i]-other[i]); } distance=Math.sqrt(distance); return distance; } }红色代码部分是读取文件值,然后赋值为这个job的全局变量值,这样这个map任务就可以把 centers当作全局变量来使用了(同时centers里面存放了centers文件中的值)
Reducer:
package org.fansy.date927; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> { public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{ // get dimension first int dimension=0; for(DataPro val:values){ String[] datastr=val.getCenter().toString().split("\t"); dimension=datastr.length; break; } double[] sum=new double[dimension]; int sumCount=0; for(DataPro val:values){ String[] datastr=val.getCenter().toString().split("\t"); sumCount+=val.getCount().get(); for(int i=0;i<dimension;i++){ sum[i]+=Double.parseDouble(datastr[i]); } } // calculate the new centers // double[] newcenter=new double[dimension]; StringBuffer sb=new StringBuffer(); for(int i=0;i<dimension;i++){ sb.append(sum[i]/sumCount+"\t"); } context.write(null, new Text(sb.toString())); } }
DataPro:
package org.fansy.date927; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class DataPro implements WritableComparable<DataPro>{ private Text center; private IntWritable count; public DataPro(){ set(new Text(),new IntWritable()); } public void set(Text text, IntWritable intWritable) { // TODO Auto-generated method stub this.center=text; this.count=intWritable; } public Text getCenter(){ return center; } public IntWritable getCount(){ return count; } @Override public void readFields(DataInput arg0) throws IOException { // TODO Auto-generated method stub center.readFields(arg0); count.readFields(arg0); } @Override public void write(DataOutput arg0) throws IOException { // TODO Auto-generated method stub center.write(arg0); count.write(arg0); } @Override public int compareTo(DataPro o) { // TODO Auto-generated method stub int cmp=count.compareTo(o.count); if(cmp!=0){ return cmp; } return center.compareTo(o.center); } }这里自定义了一个DataPro数据类型,主要是为了为以后编写真正的k-means算法时使用combiner做准备,具体思想可以参考上篇combine操作。
输出文件如下:
0.39999999999999997 0.20000000000000004 0.4000000000000001 5.0 5.2 5.4 10.3 10.4 10.5这篇文章参考了 http://www.cnblogs.com/zhangchaoyang/articles/2634365.html部分实现,在那篇文章中的k-means思想的主要思想是:使用map读入centers文件值,然后把数据文件data作为一个全局量,然后reduce在进行求中心点的操作。(或许我理解错了也说不定) 做完这一步后,如果要编写K-means算法就可以说是已经实现了大半了,剩下的就是设置下输入和输出路径,然后进行迭代了。
相关文章推荐
- hadoop全局变量问题
- Hadoop学习笔记八之 combine 以及常用命令行 和全局变量
- Hadoop中的全局变量
- hadoop全局变量与数据传递
- Hadoop 学习研究(八): 多Job任务和hadoop中的全局变量
- Hadoop 全局变量与数据传递
- Hadoop 学习笔记 (十) MapReduce实现排序 全局变量
- javaScript中的全局变量和局部变量;及javaScript函数作用域;
- VC中创建DLL,导出全局变量,函数和类
- android 全局变量 (application) 心得
- [Python学习] 专题六.局部变量、全局变量global、导入模块变量
- 正确得到线程退出信息的方法详解-变量存储退出信息结构、使用动态存储的方式退出信息结构、使用全局变量方式推相互信息结构、使用main函数中的局部变量存储退出信息结构
- ios 全局变量的 定义使用
- python的局部变量和全局变量区别
- Linux设置JAVA的全局环境变量时的大坑
- C++中的内存!(转载)堆 栈 全局/static变量区 常量区
- 全局变量、局部变量、静态全局变量、静态局部变量的区别
- php 全局变量 ,局部变量和global
- android中context及全局变量小析 .
- freemark 配置全局变量的方法