Hadoop学习之路(二十六)MapReduce的API使用(三)
影评案例
数据及需求
数据格式
movies.dat 3884条数据
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller
users.dat 6041条数据
1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117 7::M::35::1::06810 8::M::25::12::11413 9::M::25::17::61614 10::F::35::1::95370
ratings.dat 1000210条数据
1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368
数据解释
1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码
2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型
3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳
用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
需求统计
(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
(3)求movieid = 2116这部电影各年龄段(因为年龄就只有7个,就按这个7个分就好了)的平均影评(年龄段,评分)
(4)求最喜欢看电影(影评次数最多)的那位女性评最高分的10部电影的平均影评分(人,电影名,影评)
(5)求好片(评分>=4.0)最多的那个年份的最好看的10部电影
(6)求1997年上映的电影中,评分最高的10部Comedy类电影
(7)该影评库中各种类型电影中评价最高的5部电影(类型,电影名,平均影评分)
(8)各年评分最高的电影类型(年份,类型,影评分)
(9)每个地区最高评分的电影名,把结果存入HDFS(地区,电影名,电影评分)
代码实现
1、求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)
分析:此问题涉及到2个文件,ratings.dat和movies.dat,2个文件数据量倾斜比较严重,此处应该使用mapjoin方法,先将数据量较小的文件预先加载到内存中
MovieMR1_1.java
public class MoviesDemo4 { public static void main(String[] args) throws Exception { Configuration conf1 = new Configuration(); FileSystem fs1 = FileSystem.get(conf1); Job job1 = Job.getInstance(conf1); job1.setJarByClass(MoviesDemo4.class); job1.setMapperClass(MoviesDemo4Mapper1.class); job1.setReducerClass(MoviesDemo4Reducer1.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(DoubleWritable.class); Configuration conf2 = new Configuration(); FileSystem fs2 = FileSystem.get(conf2); Job job2 = Job.getInstance(conf2); job2.setJarByClass(MoviesDemo4.class); job2.setMapperClass(MoviesDemo4Mapper2.class); job2.setReducerClass(MoviesDemo4Reducer2.class); job2.setMapOutputKeyClass(Moviegoers.class); job2.setMapOutputValueClass(NullWritable.class); job2.setOutputKeyClass(Moviegoers.class); job2.setOutputValueClass(NullWritable.class); Path inputPath1 = new Path("D:\\MR\\hw\\movie\\3he1"); Path outputPath1 = new Path("D:\\MR\\hw\\movie\\outpu4_1"); if(fs1.exists(outputPath1)) { fs1.delete(outputPath1,true); } FileInputFormat.setInputPaths(job1, inputPath1); FileOutputFormat.setOutputPath(job1, outputPath1); Path inputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_1"); Path outputPath2 = new Path("D:\\MR\\hw\\movie\\outpu4_2"); if(fs2.exists(outputPath2)) { fs2.delete(outputPath2,true); } FileInputFormat.setInputPaths(job2, inputPath2); FileOutputFormat.setOutputPath(job2, outputPath2); JobControl control = new JobControl("MoviesDemo4"); ControlledJob ajob = new ControlledJob(job1.getConfiguration()); ControlledJob bjob = new ControlledJob(job2.getConfiguration()); bjob.addDependingJob(ajob); control.addJob(ajob); control.addJob(bjob); Thread thread = new Thread(control); thread.start(); while(!control.allFinished()) { thread.sleep(1000); } System.exit(0); } /** * 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027 * * 用户ID::电影ID::评分::评分时间戳::电影名字::电影类型::性别::年龄::职业::邮政编码 * 0 1 2 3 4 5 6 7 8 9 * * 1、key:用户ID * 2、value:电影名+评分 * * */ public static class MoviesDemo4Mapper1 extends Mapper<LongWritable, Text, Text, Text>{ Text outKey = new Text(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("::"); String strKey = split[0]; String strValue = split[4]+"\t"+split[2]; if(split[6].equals("F")) { outKey.set(strKey); outValue.set(strValue); context.write(outKey, outValue); } } } //统计每位女性的评论总数 public static class MoviesDemo4Reducer1 extends Reducer<Text, Text, Text, IntWritable>{ IntWritable outValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int count = 0; for(Text value : values) { count++; } outValue.set(count); context.write(key, outValue); } } //对第一次MapReduce的输出结果进行降序排序 public static class MoviesDemo4Mapper2 extends Mapper<LongWritable, Text,Moviegoers,NullWritable>{ Moviegoers outKey = new Moviegoers(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); outKey.setName(split[0]); outKey.setCount(Integer.parseInt(split[1])); context.write(outKey, NullWritable.get()); } } //排序之后取第一个值(评论最多的女性ID和评论次数) public static class MoviesDemo4Reducer2 extends Reducer<Moviegoers,NullWritable, Moviegoers,NullWritable>{ int count = 0; @Override protected void reduce(Moviegoers key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { for(NullWritable nvl : values) { count++; if(count > 1) { return; } context.write(key, nvl); } } } }View Code
(2)
- Hadoop学习之路(二十七)MapReduce的API使用(四)
- Hadoop学习之路(二十八)MapReduce的API使用(五)
- Hadoop学习之路(二十五)MapReduce的API使用(二)
- Hadoop学习记录(4)|MapReduce原理|API操作使用
- hadoop生态系统学习之路(十二)cloudera manager的简单使用
- hadoop生态系统学习之路(五)hbase的简单使用
- hadoop学习之HDFS(2.8):hdfs的javaAPI使用及示例
- hadoop学习笔记:创建maven项目与使用hdfs的读写API
- Hadoop学习笔记:HDFS的java API使用
- Hadoop学习之路(十七)MapReduce框架Partitoner分区
- Hadoop学习之路(二十二)MapReduce的输入和输出
- 阿里封神谈hadoop学习之路 封神 2016-04-14 16:03:51 浏览3283 评论3 发表于: 阿里云E-MapReduce >> 开源大数据周刊 hadoop 学生 spark
- Hadoop学习之路(二十三)MapReduce中的shuffle详解
- Hadoop学习之路(十八)MapReduce框架Combiner分区
- Hadoop学习之路(十三)MapReduce的初识
- jmeter学习之路一:学会使用jmeter API
- Hadoop学习--HBase与MapReduce的使用
- hadoop2.7.2学习笔记05-hadoop文件系统API定义-本文档使用到的专用符号
- Hadoop学习之路(十四)MapReduce的核心运行机制
- hadoop生态系统学习之路(七)impala的简单使用以及与hive的区别