您的位置:首页 > 运维架构

MapReduce案例11——影评分析2(三表联合查询)

2018-03-18 11:51 417 查看
题目要求:(MapReduce案例11——影评分析1)为基础现有如此三份数据:
1、users.dat 数据格式为: 2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码

2、movies.dat 数据格式为: 2::Jumanji (1995)::Adventure|Children's|Fantasy
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型

3、ratings.dat 数据格式为: 1::1193::5::978300760
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳

用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
(2)分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)根据题目要求,数据在不同的三个表中,其中用户表和电影表数据量较小,可以将两个表加载到内存,然后通过mapjoin将三个表进行连接,业务实现与影评分析1中一致,是求topN的问题
首先进行三表联合,作为数据基础/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.filmCritic;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
*/
public class MapjoinThreeTables {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群
FileSystem fs = FileSystem.get(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(MapjoinThreeTables.class);
job.setMapperClass(MapjoinThreeTables_Mapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
String inputpath = args[0];
String outpath = args[1];
//两个小文件地址
URI uri1 = new URI(args[2]);
URI uri2 = new URI(args[3]);
job.addCacheFile(uri1);//不能漏掉!!!
job.addCacheFile(uri2);
Path inputPath = new Path(inputpath);
Path outputPath = new Path(outpath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}

FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
boolean isdone = job.waitForCompletion(true);
System.exit(isdone ? 0 : 1);
}

public static class MapjoinThreeTables_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
private static Map<String, String> moivemap = new HashMap<>();
private static Map<String, String> usersmap = new HashMap<>();
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context)throws IOException, InterruptedException {

Path[] paths = context.getLocalCacheFiles();
//1::Toy Story (1995)::Animation|Children's|Comedy
//通过地址读取电影数据
String strmoive = paths[0].toUri().toString();
BufferedReader bf1 = new BufferedReader(new FileReader(new File(strmoive)));
String stringLine = null;
while((stringLine = bf1.readLine()) != null){
String[] reads = stringLine.split("::");
String moiveid = reads[0];
String moiveInfo = reads[1] + "::" + reads[2];
moivemap.put(moiveid, moiveInfo);
}
//1::F::1::10::48067
//通过地址读取用户数据
String struser = paths[1].toUri().toString();
BufferedReader bf2 = new BufferedReader(new FileReader(new File(struser)));
String stringLine2 = null;
while((stringLine2 = bf2.readLine()) != null){
String[] reads = stringLine2.split("::");
String userid = reads[0];
String userInfo = reads[1] + "::" + reads[2] + "::" + reads[3] + "::" + reads[4];
usersmap.put(userid, userInfo);
}
//关闭资源
IOUtils.closeStream(bf1);
IOUtils.closeStream(bf2);

}
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads1 = value.toString().trim().split("::");
//1::1193::5::978300760 :用户ID,电影ID,评分,评分时间戳
//通过电影id和用户id在对应的map中获取信息,正常来说通过表的外键设置,ratings不存在空信息,如果存在空信息,需要进行map.contain判断
String struser = usersmap.get(reads1[0]);
String strmoive = moivemap.get(reads1[1]);
//进行三表拼接,数据格式:userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
String [] userinfo = struser.split("::");//gender, age, occupation, zipcode
String [] moiveinfo = strmoive.split("::");//movieName, movieType
String kk = reads1[0] + "::" + reads1[1] + "::" + reads1[2] + "::" + reads1[3] + "::"
+ userinfo[0] + "::" + userinfo[1] + "::" + userinfo[2] + "::" + userinfo[3] + "::"
+ moiveinfo[0] + "::" + moiveinfo[1];
kout.set(kk);
context.write(kout, NullWritable.get());

}
}

}拼接结果:截取部分1::1193::5::978300760::F::1::10::48067::One Flew Over the Cuckoo's Nest (1975)::Drama
1::661::3::978302109::F::1::10::48067::James and the Giant Peach (1996)::Animation|Children's|Musical
1::914::3::978301968::F::1::10::48067::My Fair Lady (1964)::Musical|Romance
1::3408::4::978300275::F::1::10::48067::Erin Brockovich (2000)::Drama
1::2355::5::978824291::F::1::10::48067::Bug's Life, A (1998)::Animation|Children's|Comedy
1::1197::3::978302268::F::1::10::48067::Princess Bride, The (1987)::Action|Adventure|Comedy|Romance
1::1287::5::978302039::F::1::10::48067::Ben-Hur (1959)::Action|Adventure|Drama
1::2804::5::978300719::F::1::10::48067::Christmas Story, A (1983)::Comedy|Drama
1::594::4::978302268::F::1::10::48067::Snow White and the Seven Dwarfs (1937)::Animation|Children's|Musical
1::919::4::978301368::F::1::10::48067::Wizard of Oz, The (1939)::Adventure|Children's|Drama|Musical
1::595::5::978824268::F::1::10::48067::Beauty and the Beast (1991)::Animation|Children's|Musical
1::938::4::978301752::F::1::10::48067::Gigi (1958)::Musical业务要求:分别求男性,女性当中评分最高的10部电影(性别,电影名,评分)
思路:评分求取标准:总评分(所有评分之和)/评分人数
第一步:按照性别,电影名为key进行分组,求取对应的评分
第二步:按照性别分组,并且在组内按照评分降序排序
创建性别,电影名,评分实体:代码为/**
* @author: lpj
* @date: 2018年3月18日 上午11:15:58
* @Description:
*/
package lpj.filmBean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
*
*/
public class FMRbean implements WritableComparable<FMRbean>{
private String sex;
private Str
4000
ing mname;
private double rate;
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getMname() {
return mname;
}
public void setMname(String mname) {
this.mname = mname;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
/**
* @param sex
* @param mname
* @param rate
*/
public FMRbean(String sex, String mname, double rate) {
super();
this.sex = sex;
this.mname = mname;
this.rate = rate;
}
/**
*
*/
public FMRbean() {
super();
// TODO Auto-generated constructor stub
}
@Override
public String toString() {
return sex + "\t" + mname + "\t" + rate;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(sex);
out.writeUTF(mname);
out.writeDouble(rate);
}
@Override
public void readFields(DataInput in) throws IOException {
sex = in.readUTF();
mname = in.readUTF();
rate = in.readDouble();
}
@Override
public int compareTo(FMRbean o) {
int diff = this.sex.compareTo(o.sex);
double diff2 = this.rate - o.rate;
if (diff == 0) {
return diff2 > 0 ? -1 : 1;
}else {
return diff;
}
}

}设置分组依据group代码为:/**
* @author: lpj
* @date: 2018年3月18日 上午11:21:57
* @Description:
*/
package lpj.filmBean;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
*
*/
public class FMRgroup extends WritableComparator{
/**
*
*/
public FMRgroup() {
super(FMRbean.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
FMRbean f1 = (FMRbean)a;
FMRbean f2 = (FMRbean)b;
return f1.getSex().compareTo(f2.getSex());
}

}

主体实现代码如下:使用jobcontrol进行任务连接/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.filmCritic;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import lpj.filmBean.FMRbean;
import lpj.filmBean.FMRgroup;
/**
*
*/
public class FMTop10MR {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);//默认使用本地
Job job = Job.getInstance(conf);
job.setJarByClass(FMTop10MR.class);
job.setMapperClass(FMTop10MR_Mapper.class);
job.setReducerClass(FMTop10MR_Reducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

Path inputPath = new Path("d:/a/homework11_2.txt");
Path outputPath = new Path("d:/a/homework11_2_1");
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
//-----------------------------------------------
FileSystem fs2 = FileSystem.get(conf);//默认使用本地
Job job2 = Job.getInstance(conf);
job2.setJarByClass(FMTop10MR.class);
job2.setMapperClass(FMTop10MR2_Mapper.class);
job2.setReducerClass(FMTop10MR2_Reducer.class);

job2.setMapOutputKeyClass(FMRbean.class);
job2.setMapOutputValueClass(NullWritable.class);
job2.setOutputKeyClass(FMRbean.class);
job2.setOutputValueClass(NullWritable.class);
job2.setGroupingComparatorClass(FMRgroup.class);//设置分组
Path inputPath2 = new Path("d:/a/homework11_2_1");
Path outputPath2 = new Path("d:/a/homework11_2_2");
if (fs2.exists(outputPath2)) {
fs2.delete(outputPath2, true);
}
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);

ControlledJob aJob = new ControlledJob(job.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
aJob.setJob(job);
bJob.setJob(job2);

JobControl jc = new JobControl("jc");
jc.addJob(aJob);
jc.addJob(bJob);
bJob.addDependingJob(aJob);

Thread thread = new Thread(jc);
thread.start();
while(!jc.allFinished()){
thread.sleep(1000);
}
jc.stop();

}

public static class FMTop10MR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("::");
//1::1193::5::978300760::F::1::10::48067::One Flew Over the Cuckoo's Nest (1975)::Drama
//性别,电影名,评分
String sex = reads[4];
String mName = reads[8];
int rate = Integer.parseInt(reads[2]);
//评分理解为:总的评分数目/评论人数
//按照性别和电影名进行分组
String kk = sex + "\t" +mName;
String vv = reads[2];
kout.set(kk);
valueout.set(vv);
context.write(kout, valueout);
}
}
public static class FMTop10MR_Reducer extends Reducer<Text, Text, Text, DoubleWritable>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
int totalRate = 0;
int rateNum = 0;
double avgRate = 0;
for(Text text : values){
int rate = Integer.parseInt(text.toString());
totalRate += rate;
rateNum ++;
}
avgRate = 1.0 * totalRate / rateNum;
DoubleWritable vv = new DoubleWritable(avgRate);
context.write(key, vv);
}

}
//--------------------------------------------------
public static class FMTop10MR2_Mapper extends Mapper<LongWritable, Text, FMRbean, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
FMRbean fmr = new FMRbean();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] reads = value.toString().trim().split("\t");
fmr.setSex(reads[0]);
fmr.setMname(reads[1]);
fmr.setRate(Double.parseDouble(reads[2]));
context.write(fmr, NullWritable.get());
}
}
public static class FMTop10MR2_Reducer extends Reducer<FMRbean, NullWritable, FMRbean, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void reduce(FMRbean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
int count = 0;
//求取前10
for(NullWritable inv : values){
count ++;
if (count <= 10) {
context.write(key, NullWritable.get());
}else {
return;
}
}
}

}

}第一步中间结果:(截取部分)F Young and Innocent (1937) 2.5
F Your Friends and Neighbors (1998) 2.888888888888889
F Zed & Two Noughts, A (1985) 3.5
F Zero Effect (1998) 3.864406779661017
F Zeus and Roxanne (1997) 2.7777777777777777
F eXistenZ (1999) 3.0985915492957745
M $1,000,000 Duck (1971) 2.761904761904762
M 'Night Mother (1986) 3.3529411764705883
M 'Til There Was You (1997) 2.7333333333333334
M 'burbs, The (1989) 2.962085308056872
M ...And Justice for All (1979) 3.6890243902439024
M 1-900 (1994) 3.0
M 10 Things I Hate About You (1999) 3.3119658119658117
M 101 Dalmatians (1961) 3.5
M 101 Dalmatians (1996) 2.911214953271028
M 12 Angry Men (1957) 4.328421052631579最后结果:F One Little Indian (1973) 5.0
F Clean Slate (Coup de Torchon) (1981) 5.0
F Battling Butler (1926) 5.0
F Other Side of Sunday, The (S�ndagsengler) (1996) 5.0
F 24 7: Twenty Four Seven (1997) 5.0
F Belly (1998) 5.0
F Country Life (1994) 5.0
F For the Moment (1994) 5.0
F Saltmen of Tibet, The (1997) 5.0
F Dancemaker (1998) 5.0
M Lured (1947) 5.0
M Bells, The (1926) 5.0
M Gate of Heavenly Peace, The (1995) 5.0
M Dangerous Game (1993) 5.0
M Baby, The (1973) 5.0
M Angela (1995) 5.0
M Schlafes Bruder (Brother of Sleep) (1995) 5.0
M Smashing Time (1967) 5.0
M Follow the Bitch (1998) 5.0
M Ulysses (Ulisse) (1954) 5.0总结:评分标准的设置存在漏洞,当电影只有一个人进行评分时,单人的评分即为整部电影的评分,后续操作可以将评分人数设置一个权重,作为评分排序标准一部分
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息