您的位置:首页 > 移动开发

[Hadoop] Hadoop 链式任务 : ChainMapper and ChainReducer的使用

2015-12-26 13:11 489 查看
注意:

1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 

2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。

比如:

Map1 -> Map2 -> Reducer -> Map3 -> Map4

 

(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

 

 

任务介绍:

这个任务需要两步完成:

1. 对一篇文章进行WordCount

2. 统计出现次数超过5词的单词

 

WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

package hadoop_in_action_exersice;  

  

import java.io.IOException;  

import java.util.Iterator;  

import java.util.StringTokenizer;  

  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapred.FileInputFormat;  

import org.apache.hadoop.mapred.FileOutputFormat;  

import org.apache.hadoop.mapred.JobClient;  

import org.apache.hadoop.mapred.JobConf;  

import org.apache.hadoop.mapred.MapReduceBase;  

import org.apache.hadoop.mapred.Mapper;  

import org.apache.hadoop.mapred.OutputCollector;  

import org.apache.hadoop.mapred.Reducer;  

import org.apache.hadoop.mapred.Reporter;  

import org.apache.hadoop.mapred.TextInputFormat;  

import org.apache.hadoop.mapred.TextOutputFormat;  

  

public class ChainedJobs {  

  

    public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  

  

        private final static IntWritable one = new IntWritable(1);  

        public static final int LOW_LIMIT = 5;  

        @Override  

        public void map(LongWritable key, Text value,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)  

                throws IOException {  

            String line = value.toString();  

            StringTokenizer st = new StringTokenizer(line);  

            while(st.hasMoreTokens())  

                output.collect(new Text(st.nextToken()), one);  

              

        }  

          

    }  

      

    public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  

  

        @Override  

        public void reduce(Text key, Iterator<IntWritable> values,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)  

                throws IOException {  

            int sum = 0;  

            while(values.hasNext()) {  

                sum += values.next().get();  

            }  

            output.collect(key, new IntWritable(sum));  

        }  

          

    }  

      

      

    public static void main(String[] args) throws IOException {  

          

          

        JobConf conf = new JobConf(ChainedJobs.class);  

        conf.setJobName("wordcount");           //设置一个用户定义的job名称  

        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  

        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  

        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  

        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  

        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  

        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  

        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  

  

        // Remove output folder before run job(s)  

        FileSystem fs=FileSystem.get(conf);  

        String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  

        Path op=new Path(outputPath);          

        if (fs.exists(op)) {  

            fs.delete(op, true);  

            System.out.println("存在此输出路径,已删除!!!");  

        }  

          

        FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  

        FileOutputFormat.setOutputPath(conf, new Path(outputPath));  

        JobClient.runJob(conf);         //运行一个job  

    }  

      

}  

 

上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。

为了方便理解,上面的输入的例子如下:

Java代码  


accessed    3  

accessible  4  

accomplish  1  

accounting  7  

accurately  1  

acquire 1  

across  1  

actual  1  

actually    1  

add 3  

added   2  

addition    1  

additional  4  

 

old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 

新的API会方便简洁很多

 

下面是增加了一个Mapper 来过滤

Java代码  


public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {  

  

    @Override  

    public void map(Text key, IntWritable value,  

            OutputCollector<Text, IntWritable> output, Reporter reporter)  

            throws IOException {  

          

        if(value.get() >= LOW_LIMIT) {  

            output.collect(key, value);  

        }  

          

    }  

}  

 这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出

所以,目前为止,任务链如下:

TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 

 

所以我们的main函数改成下面的样子:

Java代码  


public static void main(String[] args) throws IOException {  

      

      

    JobConf conf = new JobConf(ChainedJobs.class);  

    conf.setJobName("wordcount");           //设置一个用户定义的job名称  

//        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  

//        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  

//        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  

//        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  

//        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  

//        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  

//        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  

  

    // Step1 : mapper forr word count   

    JobConf wordCountMapper  = new JobConf(false);  

    ChainMapper.addMapper(conf,   

            TokenizeMapper.class,   

            LongWritable.class,     // input key type   

            Text.class,             // input value type  

            Text.class,             // output key type  

            IntWritable.class,      // output value type  

            false,                  //byValue or byRefference 传值还是传引用  

            wordCountMapper);  

      

    // Step2: reducer for word count  

    JobConf wordCountReducer  = new JobConf(false);  

    ChainReducer.setReducer(conf,   

            TokenizeReducer.class,   

            Text.class,   

            IntWritable.class,   

            Text.class,   

            IntWritable.class,   

            false,   

            wordCountReducer);  

      

        // Step3: mapper used as filter  

    JobConf rangeFilterMapper  = new JobConf(false);  

    ChainReducer.addMapper(conf,   

            RangeFilterMapper.class,   

            Text.class,   

            IntWritable.class,   

            Text.class,   

            IntWritable.class,   

            false,   

            rangeFilterMapper);  

      

      

    // Remove output folder before run job(s)  

    FileSystem fs=FileSystem.get(conf);  

    String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  

    Path op=new Path(outputPath);          

    if (fs.exists(op)) {  

        fs.delete(op, true);  

        System.out.println("存在此输出路径,已删除!!!");  

    }  

      

    FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  

    FileOutputFormat.setOutputPath(conf, new Path(outputPath));  

    JobClient.runJob(conf);         //运行一个job  

}  

  下面是运行结果的一部分:

Java代码  


a   40  

and 26  

are 12  

as  6  

be  7  

been    8  

but 5  

by  5  

can 12  

change  5  

data    5  

files   7  

for 28  

from    5  

has 7  

have    8  

if  6  

in  27  

is  16  

it  13  

more    8  

not 5  

of  23  

on  5  

outputs 5  

see 6  

so  11  

that    11  

the 54  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: