您的位置:首页 > 运维架构

Hadoop之——以1.x版本和0.x版本分别实现单词统计功能

2015-05-25 23:45 375 查看
转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45998833

本文提供一个以Hadoop MapReduce方式统计文本中每个单词的数量的例子,包含1.x版本和0.x版本的实现,同时简要说明了两个版本的不同,不多说,直接上代码

一、Hadoop 1.x版本的实现

package com.lyz.hadoop.count;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* 利用Hadoop MapReduce统计文本中每个单词的数量
* @author liuyazhuang
*/
public class WordCount {
//要统计的文件位置
static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello";
//统计结果输出的位置
static final String OUT_PATH = "hdfs://liuyazhuang:9000/out";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
//如果已经存在输出文件,则先删除已存在的输出文件
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

final Job job = new Job(conf , WordCount.class.getSimpleName());
//1.1指定读取的文件位于哪里
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
job.setInputFormatClass(TextInputFormat.class);

//1.2 指定自定义的map类
job.setMapperClass(MyMapper.class);
//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//1.3 分区
job.setPartitionerClass(HashPartitioner.class);
//有一个reduce任务运行
job.setNumReduceTasks(1);

//1.4 TODO 排序、分组

//1.5 TODO 规约

//2.2 指定自定义reduce类
job.setReducerClass(MyReducer.class);
//指定reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outPath);
//指定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);

//把job提交给JobTracker运行
job.waitForCompletion(true);
}

/**
* KEYIN	即k1		表示行的偏移量
* VALUEIN	即v1		表示行文本内容
* KEYOUT	即k2		表示行中出现的单词
* VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = v1.toString().split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));
}
};
}

/**
* KEYIN	即k2		表示行中出现的单词
* VALUEIN	即v2		表示行中出现的单词的次数
* KEYOUT	即k3		表示文本中出现的不同单词
* VALUEOUT	即v3		表示文本中出现的不同单词的总次数
*
*/
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
long times = 0L;
for (LongWritable count : v2s) {
times += count.get();
}
ctx.write(k2, new LongWritable(times));
};
}
}
控制台打印信息



运行结果



二、Hadoop 0.x版本的实现

package com.lyz.hadoop.old;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;

import com.lyz.hadoop.count.WordCount;

/**
* Hadoop中的一些老的API用法
* Hadoop版本1.x的包一般是mapreduce
* Hadoop版本0.x的包一般是mapred
* @author liuyazhuang
*
*/
public class OldApp {
//要统计的文件位置
static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello";
//统计结果输出的位置
static final String OUT_PATH = "hdfs://liuyazhuang:9000/out";
/**
* Hadoop老版本与新版本相比,不同点是:
*  1:不再使用Job,而是使用JobConf
*  2、包名是mapred而不是mapreduce
*  3、不使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(JobConf对象)
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
//如果已经存在输出文件,则先删除已存在的输出文件
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

final JobConf job = new JobConf(conf , WordCount.class);
//1.1指定读取的文件位于哪里
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
job.setInputFormat(TextInputFormat.class);

//1.2 指定自定义的map类
job.setMapperClass(MyMapper.class);
//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//1.3 分区
job.setPartitionerClass(HashPartitioner.class);
//有一个reduce任务运行
job.setNumReduceTasks(1);

//1.4 TODO 排序、分组

//1.5 TODO 规约

//2.2 指定自定义reduce类
job.setReducerClass(MyReducer.class);
//指定reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outPath);
//指定输出文件的格式化类
job.setOutputFormat(TextOutputFormat.class);

//把job提交给JobTracker运行
JobClient.runJob(job);
}

/**
* 新api extends Mapper
* 老api extends MapReduceBase implements Mapper
* @author liuyazhuang
*
*/
static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{

@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String[] splited = value.toString().split("\t");
for (String word : splited) {
output.collect(new Text(word), new LongWritable(1));
}
}
}

/**
* 新api extends Reducer
* 老api extends MapReduceBase implements Reducer
* @author liuyazhuang
*
*/
static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
long times = 0;
while (values.hasNext()) {
times += values.next().get();
}
output.collect(key, new LongWritable(times));
}
}
}
控制台打印信息



运行结果



总结:

1、包名不同

Hadoop版本1.x的包一般是mapreduce

Hadoop版本0.x的包一般是mapred

2、作业处理类不同

Hadoop版本1.x用Job,

Hadoop版本0.x使用JobConf

3、 作业提交方式不同

Hadoop版本1.x使用job.waitForCompletion(true)提交作业

Hadoop版本0.x使用JobClient.runJob(JobConf对象)提交作业

4、Mapper实现不同

Hadoop版本1.x api extends Mapper

Hadoop版本1.x api extends MapReduceBase implements Mapper

5、Reducer实现不同

Hadoop版本1.x api extends Reducer

Hadoop版本1.x api extends MapReduceBase implements Reducer
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: