您的位置:首页 > 运维架构

Hadoop的“全局变量”

2012-09-27 16:29 134 查看
以前有做过在Hadoop编写程序时使用全局变量的想法,但是最后却没有实现,上网查才看到说Hadoop不支持全局变量。但是有时候编程的时候又会用到,比如编写k-means算法的时候,如果可以有个全局变量存储中心点该多好呀。其实在hadoop中确实是有相关的实现的,比如可以在mapper中的setup函数中读取一个小文件,然后从这个文件中取出全局变量的值。
那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:

0.0	0.2	0.4
0.3	0.2	0.4
0.4	0.2	0.4
0.5	0.2	0.4
5.0	5.2	5.4
6.0	5.2	6.4
4.0	5.2	4.4
10.3	10.4	10.5
10.3	10.4	10.5
10.3	10.4	10.5
而且还有一个小数据文件(中心点)如下:

0	0	0
5	5	5
10	10	10
我想做的事情就是把输入数据按照中心点求平均值,即首先我把输入数据分类,比如倒数三行应该都是属于(10,10,10)这个中心点的,那么我的map就把倒数三行的key都赋值为2,然后value值还是保持这三行不变。在reduce阶段,我求出相同key的sum值,同时求出一共的行数count,最后我用sum/count得到我想要的按照中心点求出的平均值了。
下面贴代码:
KmeansDriver:

package org.fansy.date927;

import java.io.IOException;

//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class KmeansDriver {
	/**
	 *   k-means algorithm program  
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf=new Configuration();
		// set the centers data file
		Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers");
		DistributedCache.addCacheFile(centersFile.toUri(), conf);
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: KmeansDriver <in> <out>");
	      System.exit(2);
	    }
	    Job job = new Job(conf, "kmeans job");
	    job.setJarByClass(KmeansDriver.class);
	    job.setMapperClass(KmeansM.class);
	    job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(DataPro.class);
	    job.setNumReduceTasks(2);
	    	    //    job.setCombinerClass(KmeansC.class);
	    job.setReducerClass(KmeansR.class);
	    job.setOutputKeyClass(NullWritable.class);
	    job.setOutputValueClass(Text.class);
	    
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    
	    if(!job.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }	
	}

}
上面代码中加红的部分比较重要,是mapper的setup函数实现读取文件数据的关键;
Mapper:

package org.fansy.date927;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{
	private static Log log=LogFactory.getLog(KmeansM.class);
	
	private double[][] centers;
	private int dimention_m;  //  this is the k 
	private int dimention_n;   //  this is the features 

	
    static enum Counter{Fansy_Miss_Records};
	@Override
	public void setup(Context context) throws IOException,InterruptedException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		@SuppressWarnings("resource")
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>();
		ArrayList<Double> center=null;
		//  get the file data
		while((line=br.readLine())!=null){
			center=new ArrayList<Double>();
			String[] str=line.split("\t");
			for(int i=0;i<str.length;i++){
				center.add(Double.parseDouble(str[i]));
			}
			temp_centers.add(center);
		}
		//  fill the centers 
		@SuppressWarnings("unchecked")
		ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{});
		 dimention_m=temp_centers.size();
		 dimention_n=newcenters[0].size();
		centers=new double[dimention_m][dimention_n];
		for(int i=0;i<dimention_m;i++){
			Double[] temp_double=newcenters[i].toArray(new Double[]{});
			for(int j=0;j<dimention_n;j++){
				centers[i][j]=temp_double[j];
		//		System.out.print(temp_double[j]+",");
			}
	//		System.out.println();
		}
	}
			
	public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
		String[] values=value.toString().split("\t");
		
		if(values.length!=dimention_n){
			context.getCounter(Counter.Fansy_Miss_Records).increment(1);
			return;
		}
		double[] temp_double=new double[values.length];
		for(int i=0;i<values.length;i++){
			temp_double[i]=Double.parseDouble(values[i]);
		}
		//  set the index
		double distance=Double.MAX_VALUE;
		double temp_distance=0.0;
		int index=0;
		for(int i=0;i<dimention_m;i++){
			double[] temp_center=centers[i];
			temp_distance=getEnumDistance(temp_double,temp_center);
			if(temp_distance<distance){
				 index=i;
				distance=temp_distance;
			}
		}
		DataPro newvalue=new DataPro();
		newvalue.set(value, new IntWritable(1));
		context.write(new IntWritable(index), newvalue);
		
	}
	public static double getEnumDistance(double[] source,double[] other){  //  get the distance
		double distance=0.0;
		if(source.length!=other.length){
			return Double.MAX_VALUE;
		}
		for(int i=0;i<source.length;i++){
			distance+=(source[i]-other[i])*(source[i]-other[i]);
		}
		distance=Math.sqrt(distance);
		return distance;
	}
}
红色代码部分是读取文件值,然后赋值为这个job的全局变量值,这样这个map任务就可以把 centers当作全局变量来使用了(同时centers里面存放了centers文件中的值)
Reducer:

package org.fansy.date927;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> {
	
	public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{
		// get dimension first
		int dimension=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			dimension=datastr.length;
			break;
		}
		double[] sum=new double[dimension];
		int sumCount=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}
		//  calculate the new centers
//		double[] newcenter=new double[dimension];
		StringBuffer sb=new StringBuffer();
		for(int i=0;i<dimension;i++){
			sb.append(sum[i]/sumCount+"\t");
		}
		context.write(null, new Text(sb.toString()));
	}
}

DataPro:

package org.fansy.date927;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class DataPro implements WritableComparable<DataPro>{

	private Text center;
	private IntWritable count;
	
	public DataPro(){
		set(new Text(),new IntWritable());
	}
	public void set(Text text, IntWritable intWritable) {
		// TODO Auto-generated method stub
		this.center=text;
		this.count=intWritable;
	}
	
	public Text getCenter(){
		return center;
	}
	public IntWritable getCount(){
		return count;
	}
	
	
	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		center.readFields(arg0);
		count.readFields(arg0);
	}

	@Override
	public void write(DataOutput arg0) throws IOException {
		// TODO Auto-generated method stub
		center.write(arg0);
		count.write(arg0);
	}

	@Override
	public int compareTo(DataPro o) {
		// TODO Auto-generated method stub
		int cmp=count.compareTo(o.count);
		if(cmp!=0){
			return cmp;
		}
		return center.compareTo(o.center);
	}

}
这里自定义了一个DataPro数据类型,主要是为了为以后编写真正的k-means算法时使用combiner做准备,具体思想可以参考上篇combine操作。
输出文件如下:

0.39999999999999997	0.20000000000000004	0.4000000000000001	
5.0	5.2	5.4	
10.3	10.4	10.5
这篇文章参考了 http://www.cnblogs.com/zhangchaoyang/articles/2634365.html部分实现,在那篇文章中的k-means思想的主要思想是:使用map读入centers文件值,然后把数据文件data作为一个全局量,然后reduce在进行求中心点的操作。(或许我理解错了也说不定) 做完这一步后,如果要编写K-means算法就可以说是已经实现了大半了,剩下的就是设置下输入和输出路径,然后进行迭代了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: