您的位置:首页 > 编程语言

Hadoop那些事儿(四)---MapReduce编程实例(基础)

2017-02-17 11:33 381 查看

前言

上一篇文章,以WordCount为例讲了一下MapReduce的代码结构及运行机制,这篇文章将通过几个简单的例子进一步认识MapReduce。

1.数据检索

问题描述

假设有很多条数据,我们从中查找包含某个字符串的语句。

解决方案

这个问题比较简单,首先在Map中获取当前读取的文件的文件名作为key,将要解析的数据按句号分割,逐句判断,如果包含指定的字符串则作为value输出。在Reduce中对属于同一文件的语句进行合并,然后输出。

测试数据

输入:

in1.txt:

浔阳江头夜送客,枫叶荻花秋瑟瑟。主人下马客在船,举酒欲饮无管弦。醉不成欢惨将别,别时茫茫江浸月。忽闻水上琵琶声,主人忘归客不发。寻声暗问弹者谁?琵琶声停欲语迟。移船相近邀相见,添酒回灯重开宴。千呼万唤始出来,犹抱琵琶半遮面。转轴拨弦三两声,未成曲调先有情。弦弦掩抑声声思,似诉平生不得志。低眉信手续续弹,说尽心中无限事。轻拢慢捻抹复挑,初为霓裳后六幺。大弦嘈嘈如急雨,小弦切切如私语。嘈嘈切切错杂弹,大珠小珠落玉盘。间关莺语花底滑,幽咽泉流冰下难。冰泉冷涩弦凝绝,凝绝不通声暂歇。别有幽愁暗恨生,此时无声胜有声。银瓶乍破水浆迸,铁骑突出刀枪鸣。曲终收拨当心画,四弦一声如裂帛。东船西舫悄无言,唯见江心秋月白。
沉吟放拨插弦中,整顿衣裳起敛容。自言本是京城女,家在虾蟆陵下住。十三学得琵琶成,名属教坊第一部。曲罢曾教善才服,妆成每被秋娘妒。五陵年少争缠头,一曲红绡不知数。钿头银篦击节碎,血色罗裙翻酒污。今年欢笑复明年,秋月春风等闲度。弟走从军阿姨死,暮去朝来颜色故。门前冷落鞍马稀,老大嫁作商人妇。商人重利轻别离,前月浮梁买茶去。去来江口守空船,绕船月明江水寒。夜深忽梦少年事,梦啼妆泪红阑干。
我闻琵琶已叹息,又闻此语重唧唧。同是天涯沦落人,相逢何必曾相识!我从去年辞帝京,谪居卧病浔阳城。浔阳地僻无音乐,终岁不闻丝竹声。住近湓江地低湿,黄芦苦竹绕宅生。其间旦暮闻何物?杜鹃啼血猿哀鸣。春江花朝秋月夜,往往取酒还独倾。岂无山歌与村笛?呕哑嘲哳难为听。今夜闻君琵琶语,如听仙乐耳暂明。莫辞更坐弹一曲,为君翻作《琵琶行》。感我此言良久立,却坐促弦弦转急。凄凄不似向前声,满座重闻皆掩泣。座中泣下谁最多?江州司马青衫湿。


in2.txt:

汉皇重色思倾国,御宇多年求不得。
杨家有女初长成,养在深闺人未识。
天生丽质难自弃,一朝选在君王侧。
回眸一笑百媚生,六宫粉黛无颜色。
春寒赐浴华清池,温泉水滑洗凝脂。
侍儿扶起娇无力,始是新承恩泽时。
云鬓花颜金步摇,芙蓉帐暖度春宵。
春宵苦短日高起,从此君王不早朝。
承欢侍宴无闲暇,春从春游夜专夜。
后宫佳丽三千人,三千宠爱在一身。
金屋妆成娇侍夜,玉楼宴罢醉和春。
姊妹弟兄皆列土,可怜光彩生门户。
遂令天下父母心,不重生男重生女。
骊宫高处入青云,仙乐风飘处处闻。
缓歌谩舞凝丝竹,尽日君王看不足。
渔阳鼙鼓动地来,惊破霓裳羽衣曲。
九重城阙烟尘生,千乘万骑西南行。
翠华摇摇行复止,西出都门百余里。
六军不发无奈何,宛转蛾眉马前死。
花钿委地无人收,翠翘金雀玉搔头。
君王掩面救不得,回看血泪相和流。
黄埃散漫风萧索,云栈萦纡登剑阁。
峨嵋山下少人行,旌旗无光日色薄。
蜀江水碧蜀山青,圣主朝朝暮暮情。
行宫见月伤心色,夜雨闻铃肠断声。
天旋地转回龙驭,到此踌躇不能去。
马嵬坡下泥土中,不见玉颜空死处。
君臣相顾尽沾衣,东望都门信马归。
归来池苑皆依旧,太液芙蓉未央柳。
芙蓉如面柳如眉,对此如何不泪垂。
春风桃李花开日,秋雨梧桐叶落时。
西宫南内多秋草,落叶满阶红不扫。
梨园弟子白发新,椒房阿监青娥老。
夕殿萤飞思悄然,孤灯挑尽未成眠。
迟迟钟鼓初长夜,耿耿星河欲曙天。
鸳鸯瓦冷霜华重,翡翠衾寒谁与共。
悠悠生死别经年,魂魄不曾来入梦。
临邛道士鸿都客,能以精诚致魂魄。
为感君王辗转思,遂教方士殷勤觅。
排空驭气奔如电,升天入地求之遍。
上穷碧落下黄泉,两处茫茫皆不见。
忽闻海上有仙山,山在虚无缥渺间。
楼阁玲珑五云起,其中绰约多仙子。
中有一人字太真,雪肤花貌参差是。
金阙西厢叩玉扃,转教小玉报双成。
闻道汉家天子使,九华帐里梦魂惊。
揽衣推枕起徘徊,珠箔银屏迤逦开。
云鬓半偏新睡觉,花冠不整下堂来。
风吹仙袂飘飘举,犹似霓裳羽衣舞。
玉容寂寞泪阑干,梨花一枝春带雨。
含情凝睇谢君王,一别音容两渺茫。
昭阳殿里恩爱绝,蓬莱宫中日月长。
回头下望人寰处,不见长安见尘雾。
惟将旧物表深情,钿合金钗寄将去。
钗留一股合一扇,钗擘黄金合分钿。
但教心似金钿坚,天上人间会相见。
临别殷勤重寄词,词中有誓两心知。
七月七日长生殿,夜半无人私语时。
在天愿作比翼鸟,在地愿为连理枝。
天长地久有时尽,此恨绵绵无绝期。


in3.txt:

春江潮水连海平,海上明月共潮生。
滟滟随波千万里,何处春江无月明!
江流宛转绕芳甸,月照花林皆似霰;
空里流霜不觉飞,汀上白沙看不见。
江天一色无纤尘,皎皎空中孤月轮。
江畔何人初见月?江月何年初照人?
人生代代无穷已,江月年年只相似。
不知江月待何人,但见长江送流水。
白云一片去悠悠,青枫浦上不胜愁。
谁家今夜扁舟子?何处相思明月楼?
可怜楼上月徘徊,应照离人妆镜台。
玉户帘中卷不去,捣衣砧上拂还来。
此时相望不相闻,愿逐月华流照君。
鸿雁长飞光不度,鱼龙潜跃水成文。
昨夜闲潭梦落花,可怜春半不还家。
江水流春去欲尽,江潭落月复西斜。
斜月沉沉藏海雾,碣石潇湘无限路。
不知乘月几人归,落月摇情满江树。


预期结果:

in1.txt 春江花朝秋月夜,往往取酒还独倾---|---去来江口守空船,绕船月明江水寒---|---商人重利轻别离,前月浮梁买茶去---|---今年欢笑复明年,秋月春风等闲度---|---东船西舫悄无言,唯见江心秋月白---|---醉不成欢惨将别,别时茫茫江浸月---|---
in2.txt 七月七日长生殿,夜半无人私语时---|---昭阳殿里恩爱绝,蓬莱宫中日月长---|---行宫见月伤心色,夜雨闻铃肠断声---|---
in3.txt 不知乘月几人归,落月摇情满江树---|---斜月沉沉藏海雾,碣石潇湘无限路---|---江水流春去欲尽,江潭落月复西斜---|---此时相望不相闻,愿逐月华流照君---|---可怜楼上月徘徊,应照离人妆镜台---|---谁家今夜扁舟子?何处相思明月楼?---|---不知江月待何人,但见长江送流水---|---人生代代无穷已,江月年年只相似---|---江畔何人初见月?江月何年初照人?---|---江天一色无纤尘,皎皎空中孤月轮---|---江流宛转绕芳甸,月照花林皆似霰;---|---滟滟随波千万里,何处春江无月明!---|---春江潮水连海平,海上明月共潮生---|---


以上例子是检索文件中包含“”字的诗句。

看图说话

通过下面的图来看具体的流程:



代码

package train;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import train.InvertedIndex.Combine;
import train.InvertedIndex.Map;
import train.InvertedIndex.Reduce;

/**
* 查找包含指定字符串的句子
* @author hadoop
*
*/
public class Search {
public static class Map extends Mapper<Object,Text,Text,Text>{
private static final String word = "月";
private FileSplit fileSplit;
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName().toString();
//按句号分割
StringTokenizer st = new StringTokenizer(value.toString(),"。");
while(st.hasMoreTokens()){
String line = st.nextToken().toString();
if(line.indexOf(word)>=0){
context.write(new Text(fileName),new Text(line));
}
}
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
String lines = "";
for(Text value:values){
lines += value.toString()+"---|---";
}
context.write(key, new Text(lines));
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/search_in","hdfs://localhost:9000/user/hadoop/output/search_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage search <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"search");
//配置作业各个类
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


在map中,通过context.getInputSplit()获取到数据所在的文件,然后将读取的数据按句号分隔,并遍历,如果包含指定字符“月”,则将文件名作为key,该句作value写出。

在reduce中是一个简单的合并的过程。

2.最大值 最小值 平均数

问题描述

给定一批数字,获取其中的最大值 最小值 以及求得平均数

解决方案

这个问题也很简单,首先在map中读取数据并进行切割,定义一个递增的数字作key,切下来的数字作为value.在reduce中遍历value,计算数量并求和同时比较大小获取最大最小值,最后求其平均数

测试数据

输入

in1.txt

1 1 1 1 1 1 1 1 1 1
5 5 5 5 5 5 5 5 5 5


in2.txt

5 8 10 17 32
8 9 13 32 21


预期结果

平均数  11
最大值  32
最小值  1


看图说话



代码

package train;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import test.WordCount;

/**
* 计算平均数
* @author hadoop
*
*/
public class Average1 {

public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{
private static IntWritable no = new IntWritable(1);  //计数作为key
private Text number = new Text();  //存储切下的数字
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
number.set(st.nextToken());
context.write(no, new IntWritable(Integer.parseInt(number.toString())));
}
}
}
public static class Reduce extends Reducer<IntWritable,IntWritable,Text,IntWritable>{
//定义全局变量
int count = 0;   //数字的数量
int sum = 0;     //数字的总和
int max = -2147483648;
int min = 2147483647;
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
for(IntWritable val:values){
if(val.get()>max){
max = val.get();
}
if(val.get()<min){
min = val.get();
}
count++;
sum+=val.get();
}
int average = (int)sum/count;  //计算平均数
//System.out.println(sum+"--"+count+"--"+average);
context.write(new Text("平均数"), new IntWritable(average));
context.write(new Text("最大值"), new IntWritable(max));
context.write(new Text("最小值"), new IntWritable(min));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//conf.set("mapred.job.tracker", "localhost:9001");
conf.addResource("config.xml");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average1_in","hdfs://localhost:9000/user/hadoop/output/average1_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"average1 ");
//配置作业各个类
job.setJarByClass(Average1.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//Mapper的输出类型
*强调内容*   job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


3.平均成绩

问题描述

给定三个输入文件,每个文件中分别写有多个学生的数学 英语 语文成绩,求每个学生三科的平均成绩。

解决方案

这个问题同样很简单,在map中解析数据并以学生名字作为key,成绩作为value输出。

测试数据

输入:

in1.txt

张三 80
李四 83
王五 91
赵六 88


in2.txt

张三 92
李四 100
王五 94
赵六 88


in3.txt

张三 89
李四 98
王五 84
赵六 93


预期结果

张三  87
李四  93
王五  89
赵六  89


看图说话



代码

package train;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import train.Average1.Map;
import train.Average1.Reduce;
/**
* 计算每个学生的平均成绩
* @author hadoop
*
*/
public class Average2 {

public static class Map extends Mapper<Object,Text,Text,IntWritable>{

public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
//按行分割数据
StringTokenizer st = new StringTokenizer(value.toString(),"\n");
while(st.hasMoreTokens()){
//按空格分割每行数据
StringTokenizer stl = new StringTokenizer(st.nextToken());
String name = stl.nextToken();
String score = stl.nextToken();
//名字 分数
context.write(new Text(name), new IntWritable(Integer.parseInt(score)));
}
}
}
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{

public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int count = 0;   //数量
int sum = 0;     //总和
for(IntWritable val:values){
count++;
sum+=val.get();
}
int average = (int)sum/count;  //计算平均数
System.out.println(sum+"--"+count+"--"+average);
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
//conf.set("mapred.job.tracker", "localhost:9001");
conf.addResource("config.xml");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average2_in","hdfs://localhost:9000/user/hadoop/output/average2_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"average1 ");
//配置作业各个类
job.setJarByClass(Average2.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//Mapper的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


4.数据去重

问题描述

给定几组数据,对数据进行去重操作并输出

解决方案

在shuffing洗牌阶段时,会按照key进行归类,所以数据到达reduce方法时,key值是唯一的,只要将从文件中读取的数据作为key值输出即可,而value值置空即可。

测试数据

输入

in1.txt

Etoak-001
Etoak-002
Etoak-003
Etoak-002
Etoak-004
Etoak-005
Etoak-006
Etoak-001
Etoak-007
Etoak-008


in2.txt

Etoak-009
Etoak-010
Etoak-011
Etoak-012
Etoak-013
Etoak-009
Etoak-014
Etoak-015
Etoak-011
Etoak-016


预期结果:

Etoak-001
Etoak-002
Etoak-003
Etoak-004
Etoak-005
Etoak-006
Etoak-007
Etoak-008
Etoak-009
Etoak-010
Etoak-011
Etoak-012
Etoak-013
Etoak-014
Etoak-015
Etoak-016


看图说话



代码

package train;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import test.WordCount;

/**
*  数据去重
* @author hadoop
*
*/
public class Duplicate {

//输出键Text,输出值为Text
public static class Map extends Mapper<Object,Text,Text,Text>{
//在Map中直接将从文件中接收到的数据的value作为key写到输出中,value为空即可
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
context.write(value, new Text(""));
}
}
//上面map的阶段的结果经过shuffle洗牌后将传递给reduce

//在reduce阶段,直接将获取到的数据的key作为输出key,value置空
public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
context.write(key, new Text(""));
System.out.println(key);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/duplicate_in","hdfs://localhost:9000/user/hadoop/output/duplicate_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage Duplicate <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"duplicate");
//配置作业各个类
job.setJarByClass(Duplicate.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


5.排序

问题描述

将给定的一组数据按升序进行排序,并给出每个数字的次序

解决方案

使用mapreduce默认的排序规则,对于Intwritable类型的数据按照key值大小进行排序

测试数据

输入:

in1.txt:

9
0
14
999
15
88
9


in2.txt:

65
54
32
21
10


in3.txt:

1
0
9
21
8


预期结果:

1   0
1   0
2   1
3   8
4   9
4   9
4   9
5   10
6   14
7   15
8   21
8   21
9   32
10  54
11  65
12  88
13  999


看图说话



代码

package train;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import train.Duplicate.Map;
import train.Duplicate.Reduce;

/**
* 升序排序(使用mapreduce提供的默认排序规则)
* 对于IntWritable类型的数据,按key值大小进行排序
* @author hadoop
*
*/
public class Sort {
//将输入数据的value装换为int类型并作为key输出
public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{
private static IntWritable numble = new IntWritable();
private static final IntWritable one = new IntWritable(1);
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String line  = value.toString();
numble.set(Integer.parseInt(line));
context.write(numble,  one);
}
}
//全局num确定每个数字的顺序位次
//遍历values来确定每个数字输出的次数
public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
private static IntWritable num = new IntWritable(1);
public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{

//System.out.println(key+"  "+num);
for(IntWritable value:values){
context.write(num, key);
System.out.println(key+"--"+value+"--"+num);
}
num = new IntWritable(num.get()+1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/sort_in","hdfs://localhost:9000/user/hadoop/output/sort_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage Sort <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"sort");
//配置作业各个类
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


注意,这个代码不需要设置combin,否则结果会不一致,因为会多一次合并

6.倒排索引

问题描述

有多条数据,对数据按照属性值进行分组,比如对于多条语句,按所含的单词进行分组

测试数据

输入:

in1.txt

Life is brief , and then you die, you know ?


in2.txt:

Innovation distinguishes between a leader and a follower


in3.txt

We're here to put a dent in the universe . Otherwise why else even be here ?


预期结果:

,   in1.txt:1;
.   in3.txt:1;
?   in3.txt:1;
Innovation  in2.txt:1;
Life    in1.txt:1;
Otherwise   in3.txt:1;
We're   in3.txt:1;
a   in3.txt:1;in2.txt:2;
and in2.txt:1;in1.txt:1;
be  in3.txt:1;
between in2.txt:1;
brief   in1.txt:1;
dent    in3.txt:1;
die,    in1.txt:1;
distinguishes   in2.txt:1;
else    in3.txt:1;
even    in3.txt:1;
follower    in2.txt:1;
here    in3.txt:2;
in  in3.txt:1;
is  in1.txt:1;
know    in1.txt:1;
leader  in2.txt:1;
put in3.txt:1;
the in3.txt:1;
then    in1.txt:1;
to  in3.txt:1;
universe    in3.txt:1;
why in3.txt:1;
you in1.txt:2;
?   in1.txt:1;


看图说话



代码

package train;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
* 倒排索引
* @author hadoop
*
*/
public class InvertedIndex {

//输出值:key为单词+文件地址  value为频数,均指定1
public static class Map extends Mapper<Object,Text,Text,Text>{
private Text keyStr = new Text();
private Text valueStr = new Text();
private FileSplit fileSplit;
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
//获取输入文件信息
fileSplit = (FileSplit)context.getInputSplit();
//按空格切割
StringTokenizer st = new StringTokenizer(value.toString().trim());
while(st.hasMoreTokens()){
String filePath = fileSplit.getPath().getName().toString();
keyStr.set(st.nextToken()+":"+filePath);
valueStr.set("1");
context.write(keyStr,valueStr);
}
}
}
//合并频数
//输出:key为单词  value为文件地址+频数
public static class Combine extends Reducer<Text,Text,Text,Text>{
private Text newValue = new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
int sum = 0;
//合并频数
for(Text value:values){
sum += Integer.parseInt(value.toString());
}
//拆分原有key,将单词作为新key,文件地址+频数 作为value
int index = key.toString().indexOf(":");
String word = key.toString().substring(0,index);
String filePath = key.toString().substring(index+1,key.toString().length());
key.set(word);
newValue.set(filePath+":"+sum);
context.write(key,newValue);
}
}
//将每个单词对应的多个文件及频数整合到一行
public static class Reduce extends Reducer<Text,Text,Text,Text>{
Text newValue = new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
String files = "";
for(Text value:values){
files += value+";";
}
newValue.set(files);
context.write(key,newValue);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/invertedIndex_in","hdfs://localhost:9000/user/hadoop/output/invertedIndex_out"};
//检查运行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage invertedIndex <int> <out>");
System.exit(2);
}
//配置作业名
Job job = new Job(conf,"invertedIndex");
//配置作业各个类
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop mapreduce 编程