您的位置:首页 > 大数据 > Hadoop

Hadoop学习之路(二十六)MapReduce的API使用(三)

2018-03-24 16:01 399 查看

 

影评案例

数据及需求

数据格式

movies.dat  3884条数据

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller

users.dat  6041条数据

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370

ratings.dat  1000210条数据

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368

数据解释

1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

需求统计

(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)

代码实现

1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)

分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中

MovieMR1_1.java

public class MoviesDemo4 {

public static void main(String[] args) throws Exception {

Configuration conf1 = new Configuration();
FileSystem fs1 = FileSystem.get(conf1);
Job job1 = Job.getInstance(conf1);

job1.setJarByClass(MoviesDemo4.class);
job1.setMapperClass(MoviesDemo4Mapper1.class);
job1.setReducerClass(MoviesDemo4Reducer1.class);

job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(DoubleWritable.class);

Configuration conf2 = new Configuration();
FileSystem fs2 = FileSystem.get(conf2);
Job job2 = Job.getInstance(conf2);

job2.setJarByClass(MoviesDemo4.class);
job2.setMapperClass(MoviesDemo4Mapper2.class);
job2.setReducerClass(MoviesDemo4Reducer2.class);

job2.setMapOutputKeyClass(Moviegoers.class);
job2.setMapOutputValueClass(NullWritable.class);
job2.setOutputKeyClass(Moviegoers.class);
job2.setOutputValueClass(NullWritable.class);

Path inputPath1 = new Path("D:\\MR\\hw\\movie\\3he1");
Path outputPath1 = new Path("D:\\MR\\hw\\movie\\outpu4_1");

if(fs1.exists(outputPath1)) {
fs1.delete(outputPath1,true);
}

FileInputFormat.setInputPaths(job1, inputPath1);
FileOutputFormat.setOutputPath(job1, outputPath1);

Path inputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_1");
Path outputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_2");

if(fs2.exists(outputPath2)) {
fs2.delete(outputPath2,true);
}

FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);

JobControl control = new JobControl("MoviesDemo4");

ControlledJob ajob = new ControlledJob(job1.getConfiguration());
ControlledJob bjob = new ControlledJob(job2.getConfiguration());

bjob.addDependingJob(ajob);

control.addJob(ajob);
control.addJob(bjob);

Thread thread = new Thread(control);
thread.start();

while(!control.allFinished()) {
thread.sleep(1000);
}
System.exit(0);
}

/**
* 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027
*
* 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码
* 0        1     2      3        4       5     6   7    8    9
*
* 1、key:用户ID
* 2、value:电影名+评分
*
* */
public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{

Text outKey = new Text();
Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String[] split = value.toString().split("::");

String strKey = split[0];
String strValue = split[4]+"\t"+split[2];

if(split[6].equals("F")) {
outKey.set(strKey);
outValue.set(strValue);
context.write(outKey, outValue);
}

}

}

//统计每位女性的评论总数
public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{

IntWritable outValue = new IntWritable();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

int count = 0;
for(Text value : values) {
count++;
}
outValue.set(count);
context.write(key, outValue);
}

}

//对第一次MapReduce的输出结果进行降序排序
public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{

Moviegoers outKey = new Moviegoers();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String[] split = value.toString().split("\t");

outKey.setName(split[0]);
outKey.setCount(Integer.parseInt(split[1]));
context.write(outKey, NullWritable.get());
}

}

//排序之后取第一个值(评论最多的女性ID和评论次数)
public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{

int count = 0;

@Override
protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {

for(NullWritable nvl : values) {
count++;
if(count > 1) {
return;
}
context.write(key, nvl);
}

}

}

}
View Code

 

(2)

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: