您的位置:首页 > 运维架构

MapReduce案例11——影评分析1(两表联合查询)

2018-03-17 23:24 232 查看
多表联合常用方式有两种:reduceJoin和mapjoin,其中reducejoin容易造成数据倾斜,对于并发执行的数据文件来说,常用mapjoin,在mapper阶段就完成数据连接,一般不会造成数据倾斜,即使倾斜,数据量也会很小。
使用条件,一张数据量很大的表和一张数据量很小的表,将数据量小的表提前加载到各个节点的内存中去,在执行map阶段,通过内连接完成组合。
题目:现有如此三份数据:
1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

题目:
(1)求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)本案例user.dat和moives.dat数据量较小,ratings.dat数据量较大,首先进行moives.dat和ratings.dat表的联合
mapjoin代码:/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.filmCritic;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
*/
public class FilmCriticMR {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");//只能使用集群
System.setProperty("HADOOP_USER_NAME", "hadoop");
FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(FilmCriticMR.class);
job.setMapperClass(FilmCriticMR_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);//不需要reduce任务

String maxinputpath = args[0];
String mininputpath = args[1];
String outpath = args[2];

Path maxPath = new Path(maxinputpath);
URI uri = new URI(mininputpath);//最小文件路径

job.addCacheFile(uri);//job加载缓存文件
Path outputPath = new Path(outpath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, maxPath);
FileOutputFormat.setOutputPath(job, outputPath);
boolean isdone = job.waitForCompletion(true);
System.exit(isdone ? 0 : 1);
}

public static class FilmCriticMR_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
//执行map任务之前提前加载小文件,将小文件加载到moivemap中去
private static Map<String, String> moivemap = new HashMap<>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// 9::Sudden Death (1995)::Action
// 通过context获取所有的缓存文件,并且获取到缓存文件的路径等信息
// URI[] files = context.getCacheFiles();//不能使用
// String strpath = files[0].toString();
Path[] localCacheFiles = context.getLocalCacheFiles();
String strpath = localCacheFiles[0].toUri().toString();
//通过字符流读取小文件内容,并且将于大文件连接的字段作为key,其他值作为value,放到moivemap中
BufferedReader bf = new BufferedReader(new FileReader(new File(strpath)));
String readLine = null;
while((readLine = bf.readLine()) != null){
String[] reads = readLine.split("::");
String movieid = reads[0];
String moviename = reads[1];
String movietype = reads[2];

moivemap.put(movieid, moviename + "::" + movietype);
}
IOUtils.closeStream(bf);//关闭资源
}

@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("::");
//1::1193::5::978300760
//提取电影属性
String userid = reads[0];
String movieid = reads[1];
int rate = Integer.parseInt(reads[2]);
long ts = Long.parseLong(reads[3]);
//通过movieid 在moivemap中获取电影名称和电影类型
String moiveInfo = moivemap.get(movieid);
String[] moiveInfos = moiveInfo.split("::");
String moiveName = moiveInfos[0];
String moiveType = moiveInfos[1];
//将信息组合输出
String kk = userid + "::" + movieid + "::" + rate + "::" + ts + "::" + moiveName + "::" + moiveType;
kout.set(kk);
context.write(kout, NullWritable.get());
}
}

}运行时需要将项目打包成jar文件,上传到集群环境,然后将数据上传,记住对应参数与文件的关系,然后运行。部分结果如下:1::1193::5::978300760::One Flew Over the Cuckoo's Nest (1975)::Drama
1::661::3::978302109::James and the Giant Peach (1996)::Animation|Children's|Musical
1::914::3::978301968::My Fair Lady (1964)::Musical|Romance
1::3408::4::978300275::Erin Brockovich (2000)::Drama
1::2355::5::978824291::Bug's Life, A (1998)::Animation|Children's|Comedy
1::1197::3::978302268::Princess Bride, The (1987)::Action|Adventure|Comedy|Romance
1::1287::5::978302039::Ben-Hur (1959)::Action|Adventure|Drama
1::2804::5::978300719::Christmas Story, A (1983)::Comedy|Drama
1::594::4::978302268::Snow White and the Seven Dwarfs (1937)::Animation|Children's|Musical
1::919::4::978301368::Wizard of Oz, The (1939)::Adventure|Children's|Drama|Musical
1::595::5::978824268::Beauty and the Beast (1991)::Animation|Children's|Musical
1::938::4::978301752::Gigi (1958)::Musical已经完成拼接,接下来的内容可以根据业务要求在对应的数据上进行操作,为了方便测试,可以将拼接好的数据下载到本地(如果拼接数据很大,需要在集群环境运行)
求被评分次数最多的10部电影,并给出评分次数(电影名,评分次数)业务需求:topN的求取,以电影名(或者电影id)为key值,通过电影名和评分次数进行降序排序,然后取前10条记录
根据业务需求,需要分两步,第一步求取不同电影的评分次数全部文件,第二步,对上一步文件结果进行排序,并且取前10行记录
代码:/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.filmCritic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import lpj.filmBean.MoiveRate;
import lpj.model.MoiveRateGroup;
/**
*
*/
public class MovRatTop10MR {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);//默认使用本地

Job job = Job.getInstance(conf);
job.setJarByClass(MovRatTop10MR.class);
job.setMapperClass(MovRatTop10MR_Mapper.class);
job.setReducerClass(MovRatTop10MR_Reducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

Path inputPath = new Path("d:/a/homework11.txt");
Path outputPath = new Path("d:/a/homework11");
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
//--------------------------------------------------
FileSystem fs2 = FileSystem.get(conf);//默认使用本地

Job job2 = Job.getInstance(conf);
job2.setJarByClass(MovRatTop10MR.class);
j
b741
ob2.setMapperClass(MovRatTop10MR2_Mapper.class);
job2.setReducerClass(MovRatTop10MR2_Reducer.class);

job2.setOutputKeyClass(MoiveRate.class);
job2.setOutputValueClass(NullWritable.class);
Path inputPath2 = new Path("d:/a/homework11");
Path outputPath2 = new Path("d:/a/homework11_1");
if (fs2.exists(outputPath2)) {
fs2.delete(outputPath2, true);
}
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);

ControlledJob aJob = new ControlledJob(job.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
aJob.setJob(job);
bJob.setJob(job2);

JobControl jc = new JobControl("JC");
jc.addJob(aJob);
jc.addJob(bJob);
bJob.addDependingJob(aJob);

Thread thread = new Thread(jc);
thread.start();
while(!jc.allFinished()){
thread.sleep(1000);
}
jc.stop();
}

public static class MovRatTop10MR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("::");
//1::938::4::978301752::Gigi (1958)::Musical
String kk = reads[1];
String vv = reads[4];
kout.set(kk);
valueout.set(vv);
context.write(kout, valueout);
}
}
public static class MovRatTop10MR_Reducer extends Reducer<Text, Text, Text, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
int rateNum = 0;
String moiveName = "";
for(Text text : values){
rateNum++;
moiveName = text.toString();
}
String kk = key.toString() + "\t" + moiveName + "\t" + rateNum;
kout.set(kk);
context.write(kout, NullWritable.get());
}
}
//reduce阶段不能实现排序,所以需要在使用另一个MapReduce进行排序,取前10
public static class MovRatTop10MR2_Mapper extends Mapper<LongWritable, Text, MoiveRate, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
MoiveRate mm = new MoiveRate();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("\t");
//981 Dangerous Ground (1997) 4
mm.setMovieid(reads[0]);
mm.setMoviename(reads[1]);
mm.setRateNum(Integer.parseInt(reads[2]));
context.write(mm, NullWritable.get());//指定降序排序
}
}
public static class MovRatTop10MR2_Reducer extends Reducer<MoiveRate, NullWritable, MoiveRate, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
int count = 0;
@Override
protected void reduce(MoiveRate key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
//981 Dangerous Ground (1997) 4
for(NullWritable inv : values){
count ++;
if (count <= 10) {
context.write(key, NullWritable.get());//取前10
}else {
return;
}
}

}
}

}

第一个MapReduce运行结果:截取部分1 Toy Story (1995) 2077
10 GoldenEye (1995) 888
100 City Hall (1996) 128
1000 Curdled (1996) 20
1002 Ed's Next Move (1996) 8
1003 Extreme Measures (1996) 121
1004 Glimmer Man, The (1996) 101
1005 D3: The Mighty Ducks (1996) 142
1006 Chamber, The (1996) 78
1007 Apple Dumpling Gang, The (1975) 232
1008 Davy Crockett, King of the Wild Frontier (1955) 97
1009 Escape to Witch Mountain (1975) 291
101 Bottle Rocket (1996) 253
1010 Love Bug, The (1969) 242
1011 Herbie Rides Again (1974) 135
1012 Old Yeller (1957) 301
1013 Parent Trap, The (1961) 258
1014 Pollyanna (1960) 136
1015 Homeward Bound: The Incredible Journey (1993) 234
1016 Shaggy Dog, The (1959) 156
1017 Swiss Family Robinson (1960) 276
1018 That Darn Cat! (1965) 123
1019 20,000 Leagues Under the Sea (1954) 575
102 Mr. Wrong (1996) 60
1020 Cool Runnings (1993) 392
1021 Angels in the Outfield (1994) 247
1022 Cinderella (1950) 577
1023 Winnie the Pooh and the Blustery Day (1968) 221
1024 Three Caballeros, The (1945) 126
1025 Sword in the Stone, The (1963) 293第二个MapReduce运行结果(top10)2858 American Beauty (1999) 3428
260 Star Wars: Episode IV - A New Hope (1977) 2991
1196 Star Wars: Episode V - The Empire Strikes Back (1980) 2990
1210 Star Wars: Episode VI - Return of the Jedi (1983) 2883
480 Jurassic Park (1993) 2672
2028 Saving Private Ryan (1998) 2653
589 Terminator 2: Judgment Day (1991) 2649
2571 Matrix, The (1999) 2590
1270 Back to the Future (1985) 2583
593 Silence of the Lambs, The (1991) 2578
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息