您的位置:首页 > 编程语言 > Java开发

将eclipse下编译的class文件打包成hadoop集群可以运行的jar包的过程

2012-12-06 20:29 661 查看

把eclipse下编译的class文件打包成hadoop集群可用文件的过程

首先要在eclipse下运行成功,在工程项目下打bin文件夹为jar文件,

                          压缩方式为: jar -cvf sort.jar -C bin/ .

      这个是一个排序程序,代码如下

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

   public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{

    private static IntWritable data= new IntWritable();

    public void map(Object key, Text value,Context context)throws IOException, InterruptedException{

     String line = value.toString();

     data.set(Integer.parseInt(line));

     context.write(data, new IntWritable(1));

    }

   }

  

   public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{

    private static IntWritable linenum = new IntWritable(1);

   

    public void reduce (IntWritable key, Iterable<IntWritable> value, Context context)throws IOException , InterruptedException{

     for(IntWritable val:value){

      context.write(linenum, key);

     }

     linenum = new IntWritable(linenum.get()+1);

    }

   }

  

   public static void main(String[] args)throws Exception{

    Configuration conf = new Configuration();

    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if(otherArgs.length!=2){

     System.err.println("Usage: wordcount <in> <out>");

     System.exit(2);

    }

    Job job=new Job(conf,"Sort");

    job.setJarByClass(Sort.class);

    job.setMapperClass(Map.class);

    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(IntWritable.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0:1 );

   }

}

压缩完以后就可以在集群上运行该程序了,运行方式为:

hadoop jar  sort.jar  Sort /tmp/sort/inputfile/ /tmp/sort/outputfile

然后在项目中刷新后即可看到输出结果:



顺便再加一些关于添加bash程序和python程序到hadoop流中的方法:

首先说一下bash文件,例如是reduce.sh,内容很简单就是grep xxx

要调用它的方式为:

hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer reduce.sh -file reduce.sh

再说一个python的例子,假如文件叫reduce.py

调用方式:

hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper reduce.py -reducer aggregate -file reduce.py

这里解释一下,aggregate这个是hadoop提供的一个包,它提供的是一个reduce函数以及一个combine函数。它的功能是类似求和、取极值等功能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐