MapReduce实现商品推荐算法(用户购买向量*商品同现矩阵)
基于Hadoop的商品推荐系统
推荐结果=用户的购买向量*物品的相似度矩阵
物品的相似度:物品的共现次数(也可以使用欧氏距离等)
预备工作
1.项目名:GRMS
2.添加Maven依赖:pom.xml
3.创建包:
com.briup.bigdata.project.grms
|--step1
|--step2
|--...
|--utils
4.将集群上的四个xml配置文件放到resources目录中。
5.在HDFS集群的根目录下创建目录:
/grms
|--rawdata/matrix.txt
|--step1
|--...
6.初始数据:matrix.txt
10001 20001 1
10001 20002 1
10001 20005 1
10001 20006 1
10001 20007 1
10002 20003 1
10002 20004 1
10002 20006 1
10003 20002 1
10003 20007 1
10004 20001 1
10004 20002 1
10004 20005 1
10004 20006 1
10005 20001 1
10006 20004 1
10006 20007 1
这里1000开头的是用户编号,2000开头的是商品编号,最后一列是购买次数<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.briup.bigdata.project.grms</groupId> <artifactId>GRMS</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.9</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> </dependencies> <build> <finalName>grms</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
//以上版本信息根据各人使用版本进行调整
8. 计算用户购买商品的列表
类名:UserBuyGoodsList.java
方法:
UserBuyGoodsList
UserBuyGoodsListMapper
UserBuyGoodsListReducer
代码实现
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; import java.io.IOException; import java.util.Iterator; public class UserBuyGoodsList extends Configured implements Tool { static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tuple=value.toString().split("\t"); context.write(new Text(tuple[0]),new Text(tuple[1])); } } static class UserBuyGoodsListReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException { Iterator<Text> iterator=values.iterator(); StringBuilder builder=new StringBuilder(); while(iterator.hasNext()){ builder.append(iterator.next().toString()+","); } String result=builder.substring(0,builder.length()-1); context.write(key,new Text(result)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path in = new Path(conf.get("in")); Path out = new Path(conf.get("out")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(UserBuyGoodsList.class); job.setMapperClass(UserBuyGoodsListMapper.class); job.setReducerClass(UserBuyGoodsListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,in); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new GoodsConcurrenceList(),args)); } }以上思路:在Map端中对初始数据按行读取,每行数据按”/t”分隔放入数组,也就是说用户数据进入tuple[0],商品数据进入tuple[1],提交给reduce端处理,利用迭代器将key值相同的value值分别append到stringbuilder中(注意,不建议使用string,因为使用string + 会新生成一个字符串,在大数据中更消耗内存),以逗号分隔,最后在context中写入时注意substring最后一个字符(那个逗号没有意义)
run方法中配置作业,这种方法会相对比较浪费精力因为每次写一个新的类就要重新配置一次,本文最后会推荐一个将所有配置写成一个类,这样重新配置的时候可以轻松且可视一些。
结果数据:
10001 20001,20005,20006,20007,20002
10002 20006,20003,20004
10003 20002,20007
10004 20001,20002,20005,20006
10005 20001
10006 20004,20007
9.计算商品的共现关系
文件:GoodsCooccurrenceList.java
类名:GoodsCooccurrenceList
GoodsCooccurrenceListMapper
GoodsCooccurrenceListReducer
数据来源:第1步的计算结果
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class GoodsConcurrenceList extends Configured implements Tool{ private final static Text K = new Text(); private final static IntWritable V = new IntWritable(1); static class GoodsConcurrenceListMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens=value.toString().split("\t"); String[] items =tokens[1].split(","); for(int i=0;i<items.length;i++){ String itemA=items[i]; for(int j=0;j<items.length;j++){ String itemB=items[j]; K.set(itemA+"/t"+itemB); context.write(K,V); } } } } static class GoodsConcurrenceListReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum =0; for(IntWritable value :values ){ sum += value.get(); } V.set(sum); context.write(key, V); } } public int run(String[] strings) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf,this.getClass().getSimpleName()); Path in = new Path(conf.get("in")); Path out = new Path(conf.get("out")); job.setJarByClass(GoodsConcurrenceList.class); job.setMapperClass(GoodsConcurrenceListMapper.class); job.setReducerClass(GoodsConcurrenceListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,in); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new GoodsConcurrenceList(),args)); } }以上思路:为了方便参数的设定,我定义了两个final的静态数据,分别是K,V 其实可以直接把计算结果放入context中。首先我们把UserBuyGoodsList得到的数据按行读取按”\t”分隔,但是同现矩阵是不需要用户数据的,所以要把tokens[1]再按”,”分隔存入数组items[],这里我们设计一个for循环,为了得到同现的商品编号,itemA与itemB匹配一次(设为K值)则偏移量(V)加一提交给reduce端(这里itemA和itemB都是从0开始的,也就是说会出现自己匹配自己的情况,这些我们在后面可以进行去重(也可以在矩阵相乘时忽略掉),这时可以脑补一下map端提交的数据应该是20001 20001 1 20001 200011 .... 也就是reduce端需要将同现的偏移量累加才能形成同现矩阵,所以用for循环迭代地将value值加到sum中再输出就可以了。
结果数据:
20001 20001 3
20001 20002 2
(数据过多浪费地方就不贴了)
10.计算商品共现矩阵
文件:GoodsConcurrenceMatrix
类名:GoodsConcurrenceMatrixMapper
GoodsConcurrenceMatrixReducer
将共现次数记为矩阵进入计算
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; public class GoodsConcurrenceMatrix extends Configured implements Tool { static class GoodsConcurrenceMatrixMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); String s[] = value.toString().split("\t"); sb.append(s[1]).append(":").append(s[2]); context.write(new Text(s[0]),new Text(sb.toString())); } } static class GoodsConcurrenceMatrixReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); Iterator<Text> iterator = values.iterator(); if(iterator.hasNext()) builder.append(iterator.next()).append(","); context.write(key,new Text(builder.toString().substring(0,builder.length()-1))); } } public int run(String[] strings) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf,this.getClass().getSimpleName()); Path in = new Path(conf.get("in")); Path out = new Path(conf.get("out")); job.setJarByClass(GoodsConcurrenceMatrix.class); job.setMapperClass(GoodsConcurrenceMatrixMapper.class); job.setReducerClass(GoodsConcurrenceMatrixReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,in); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run( new GoodsConcurrenceMatrix(),args)); } }以上思路:由于我们已经把同现次数算出来了,只要把后面两项数据以”:”append进stringbuilder并提交给reduce端,而reduce端只需要将key值相同的value以”,”append起来输出即可(这里使用了if(iterator.hasNext()) 也可以使用循环语句)。 计算结果:
20001 20001:3,20002:2,20005:2,20006:2,20007:1
20002 20001:2,20002:3,20005:2,20006:2,20007:2
20003 20003:1,20004:1,20006:1
20004 20003:1,20004:2,20006:1,20007:1
20005 20001:2,20002:2,20005:2,20006:2,20007:1
20006 20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
20007 20001:1,20002:2,20004:1,20005:1,20006:1,20007:3
11. 计算用户的购买向量
文件:UserBuyGoodsVector.java
类名:UserBuyGoodsVector
UserBuyGoodsVectorMapper
UserBuyGoodsVectorReducer
源数据:第1步的结果或者最原始数据。
10001 20001,20005,20006,20007,20002
10002 20006,20003,20004
10003 20002,20007
10004 20001,20002,20005,20006
10005 20001
10006 20004,20007
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.*; import java.io.IOException; public class UserBuyGoodsVector extends Configured implements Tool{ //源数据是UserBuyGoodsList的结果 static class UserBuyGoodsVectorMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s[] = value.toString().split("\t"); String vs[]= s.toString().split(","); for(String v:vs){ context.write(new Text(v),new Text(s[0]+":1")); } } } static class UserBuyGoodsVectorReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuilder sb=new StringBuilder(); for(Text value : values){ sb.append(value.toString()).append(","); } context.write(key,new Text(sb.substring(0,sb.length()-1))); } } public int run(String[] strings) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); Path in = new Path(conf.get("in")); Path out = new Path(conf.get("out")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(UserBuyGoodsVector.class); job.setMapperClass(UserBuyGoodsVectorMapper.class); job.setReducerClass(UserBuyGoodsVectorReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,in); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new UserBuyGoodsVector(),args)); } }以上思路:如果无法一步到位,可以使用两次MapReduce达到目的,此代码尽可能的简化。
用两个字符串数组存储用户编号和商品编号,s[1]中以逗号分隔存储在vs中,分别作为key值,s[0]每一项记一次作为value值,提交给reduce端。这里我们可以理解为,每个商品对其被购买的用户编号记了一次数,接下来就要把他们合并。于是reduce端中创建一个stringbuilder把所有同key值的value用”,”append即可。
计算结果:
20001 10001:1,10004:1,10005:1
20002 10001:1,10003:1,10004:1
20003 10002:1
20004 10002:1,10006:1
20005 10001:1,10004:1
20006 10001:1,10002:1,10004:1
20007 10001:1,10003:1,10006:1
12. 商品共现矩阵乘以用户购买向量,形成临时的推荐结果。
文件:MultiplyGoodsMatrixAndUserVector.java
类名:MultiplyGoodsMatrixAndUserVectorFirstMapper
MultiplyGoodsMatrixAndUserVectorSecondMapper
文件:MultiplyGoodsMatrixAndUserVectorReducer
思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。
1.保证两个Mapper的Key要相同。
2.两个Mapper的数据输出的Key和Value的数据类型是一致的。
3.在作业配置中,对于Mapper端的配置需要使用MultipleInputs.addInputPath(job,数据的输入路径,数据输入的格式控制器.class,执行的Mapper类.class);
原始数据:第3步和第4步的结果数据。
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MultiplyGoodsMatrixAndUserVector extends Configured implements Tool{ static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s[]=value.toString().split("\t"); context.write(new Text(s[0]),new Text("m"+s[1]));//处理矩阵 } } static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s[]=value.toString().split("\t"); context.write(new Text(s[0]),new Text("v"+s[1]));//处理用户购买向量 } } static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String ms[]=null; String vs[]=null; for(Text value:values){ String str=value.toString(); if(str.charAt(0)=='m'){ ms=str.substring(1).split(","); } if(str.charAt(0)=='v'){ vs=str.substring(1).split(","); } } for (String m : ms) { for (String v : vs) { String[] mss = m.split(":"); String[] vss = v.split(":"); long vv = Long.parseLong(vss[1]); long mm = Long.parseLong(mss[1]); context.write(new Text(vss[0]+","+mss[0]),new Text((vv*mm)+"")); } } } } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MultiplyGoodsMatrixAndUserVector(),args)); } public int run(String[] strings) throws Exception { Configuration conf=getConf(); Path in1=new Path(conf.get("in1")); Path in2=new Path(conf.get("in2")); Path out=new Path(conf.get("out")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); MultipleInputs.addInputPath(job,in1,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorFirstMapper.class); MultipleInputs.addInputPath(job,in2,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorSecondMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MultiplyGoodsMatrixAndUserVectorReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:1; } }以上思路:由于要对两个矩阵相乘,故必须用两个Mapper将商品编号拎出来作为key值,将后面的内容作为value值都提交给reduce端,而这两个提交给reduce的数据必须标记出来,分别以”m”,”v”标记。在reduce端中,针对’m’开头的字符串,取’m’后的数据按’,’分隔存入数组ms,vs同理。现在我们得到的数据是有”:”的,而”:”前面的数据并不参与矩阵的乘法计算,所以再做一个for循环,分别将两个矩阵中所有数据按”:”分隔,将后面的数据转化为Long类型相乘,前面的数据整合到一起作为key值输出。
该类使用了两个map作为输入,所以在run方法中配置作业时要注意使用MultipleInputs.addInputPath的方法。
计算结果:
10001,20001 2
10001,20001 2
10001,20001 3
10001,20001 1
10001,20001 2
10001,20002 3
10001,20002 2
10001,20002 2
(数据过多浪费地方就不贴了)
13. 对第5步计算的推荐的零散结果进行求和。
文件:MakeSumForMultiplication.java
MakeSumForMultiplicationMapper
MakeSumForMultiplicationReducer
原始数据:第5步的计算结果
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MakeSumForMultiplication extends Configured implements Tool{ static class MakeSumForMultiplicationMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s[] = value.toString().split("\t"); context.write(new Text(s[0]),new Text(s[1])); } } static class MakeSumForMultiplicationReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable value : values) { sum = value.get(); } context.write(key,new IntWritable(sum)); } } public int run(String[] strings) throws Exception { Configuration conf = getConf(); Path in = new Path(conf.get("in")); Path out = new Path(conf.get("out")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(MakeSumForMultiplication.class); job.setMapperClass(MakeSumForMultiplicationMapper.class); job.setReducerClass(MakeSumForMultiplicationReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,in); TextOutputFormat.setOutputPath(job,out); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MakeSumForMultiplication(),args)); } }以上思路:这里就是一个简单的词频统计了,将数据按”\t”分隔,前面为key,后面为value,提交给reduce端,key值相同的相加即可。
计算结果:
10001,20001 10
10001,20002 11
10001,20003 1
10001,20004 2
10001,20005 9
(数据过多浪费地方就不贴了)
14.数据去重,在推荐结果中去掉用户已购买的商品信息。
文件:DuplicateDataForResult.java
类名:DuplicateDataForResultFirstMapper
DuplicateDataForResultSecondMapper
DuplicateDataForResultReducer
数据来源:
1.FirstMapper处理用户的购买列表数据。
2.SecondMapper处理 MakeSumForMutiplication的推荐结果数据。
代码实现:
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.*; import java.io.IOException; import java.util.*; public class DuplicateDataForResult extends Configured implements Tool{ static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] strs=value.toString().split("[\t]"); for(String s : strs[1].split(",")){ context.write(new Text(strs[0]+","+s),new Text("r"+value.toString())); } } } static class DuplicateDataForResultSecondMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] strs=value.toString().split("[\t]"); context.write(new Text(strs[0]),new Text("u"+value.toString())); } } static class DuplicateDataForResultReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ Map<String,String> map=new HashMap<String, String>(); List<String> list=new ArrayList<String>(); for(Text value : values){ String val=value.toString(); if(val.charAt(0)=='r') list.add(key.toString()); if(val.charAt(0)=='u') map.put(key.toString(),val.substring(1)); } for(String str : list){ map.remove(str); } for(String str : map.keySet()){ String[] strs=map.get(str).split(","); context.write(new Text(strs[0]),new Text(strs[1])); } } } public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path in1=new Path(conf.get("in1")); Path in2=new Path(conf.get("in2")); Path out=new Path(conf.get("out")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); MultipleInputs.addInputPath(job,in1,TextInputFormat.class,DuplicateDataForResultFirstMapper.class); MultipleInputs.addInputPath(job,in2,TextInputFormat.class,DuplicateDataForResultSecondMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(DuplicateDataForResultReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,out); job.setNumReduceTasks(1); return job.waitForCompletion(true)?0:1; } }以上思路:将来自于UserBuyGoodsList中的数据中所有用户与其相应购买的商品编号用","并在一起提交给reduce,这样可以与从MakeSumForMultiplication得到的数据有相同的key值,由于是两个Mapper提交的数据,为了区分我们分别给他坐上标记"r","u"。在reduce端中,根据其来源不同分别做处理:将用户已购买商品信息存入List,将推荐信息存入HashMap,这里HashMap的key值在形式上和List相同,这样List就可以作为Map的一个索引,凡是存在于List中的,都是用户已经购买了的所以不需要推荐,于是将索引相同的Map数据remove掉,最后这个Map中存储的就是我们要的数据了,因为下一步要将数据存储进数据库所以输出的数据不需要有","了,用一个for循环输出所有相应key值的value(这个循环中的map.keySet()就是按顺序循环操作对应key值的map)
计算结果:
10001 20004 210001 20003 1
10002 20002 2
10002 20007 2
10002 20001 2
10002 20005 2
10003 20006 3
10003 20005 3
10003 20001 3
10003 20004 1
10004 20007 5
10004 20004 1
10004 20003 1
10005 20006 2
10005 20002 2
10005 20005 2
10005 20007 1
10006 20006 2
10006 20002 2
10006 20005 1
10006 20003 1
10006 20001 1
15.将推荐结果保存到MySQL数据库中
注意:
a.保证表提前存在。
grms.results(uid varchar(20),
gid varchar(20),
exp int)
b.通过MR程序将HDFS集群上的数据保存到MySQL数据库中的时候,只能将最终输出的Key值保存到数据库中。
c.自定义最终输出的Key的数据类型。自定义的类实现WritableComparable<自定义的类>,但是作为将数据从HDFS集群输出到MySQL数据库中的Key,还要实现DBWritable接口。
readFields(ResultSet rs)
write(PrepareStatement ps)
A impl WC,DBW{
private String uid;
private String gid;
private int exp;
readFields(ResultSet rs){
uid=rs.getString(1);
}
write(PrepareStatement ps){
ps.setString(1,uid);
ps.setString(2,gid);
ps.setInt(1,exp);
}
}
d.在作业配置中,需要使用DBConfiguration.setConfiguration()指定连接数据库的相关参数。
参数1:和当前作业相关的配置对象,Configuration对象要通过Job对象来获取;
参数2:"com.mysql.jdbc.Driver"
参数3:"jdbc:mysql://ip:port/grms"
参数4和5:"用户名"和"密码"。
e.数据输出的格式控制需要使用DBOutputFormat。
DBOutputFormat.setOutput();有三个参数:
参数1:Job对象。
参数2:数据库表名
参数3:可变长参数,指的是往数据库中插入的列名。
insert into 数据库表名 values(?,?,?);
文件:SaveRecommendResultToDB.java
类名:SaveRecommendResultToDBMapper<LW,Text,Text,Text>
SaveRecommendResultToDBReducer<Text,Text,自定义的Key,NullWritable>
数据来源:第7步的结果数据。
数据去向:MySQL数据库,grms.result
package com.briup.bigdata.project.grms; import java.io.IOException; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; public class SaveRecommendResultToDB extends Configured implements Tool{ static class SaveRecommendResultToDBMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] strs=value.toString().split("\t"); context.write(new Text(strs[0]+"\t"+strs[1]),new Text(strs[2])); } } static class SaveRecommendResultToDBReducer extends Reducer<Text,Text,RecommendResultDB,NullWritable>{ private RecommendResultDB rrdb=new RecommendResultDB(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ String[] strs=key.toString().split("\t"); rrdb.setUid(strs[0]); rrdb.setGid(strs[1]); rrdb.setExp(Integer.parseInt(values.iterator().next().toString())); context.write(rrdb,NullWritable.get()); } } public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path in=new Path(conf.get("in")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(SaveRecommendResultToDBMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,in); job.setReducerClass(SaveRecommendResultToDBReducer.class); job.setOutputKeyClass(RecommendResultDB.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(DBOutputFormat.class); Properties prop=new Properties(); prop.load(this.getClass().getResourceAsStream("/db.properties")); DBConfiguration.configureDB(job.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password")); DBOutputFormat.setOutput(job,prop.getProperty("grms.tblname"),"uid","gid","exp"); return job.waitForCompletion(true)?0:1; } }
package com.briup.bigdata.project.grms; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Objects; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; public class RecommendResultDB implements DBWritable,WritableComparable<RecommendResultDB>{ private String uid; private String gid; private int exp; public RecommendResultDB(){ } public RecommendResultDB(String uid,String gid,int exp){ this.uid=uid; this.gid=gid; this.exp=exp; } @Override public int compareTo(RecommendResultDB o){ int uidComp=this.uid.compareTo(o.uid); int gidComp=this.gid.compareTo(o.gid); int indexComp=this.exp-o.exp; return uidComp==0?(gidComp==0?indexComp:gidComp):uidComp; } @Override public void write(DataOutput out) throws IOException{ out.writeUTF(uid); out.writeUTF(gid); out.writeInt(exp); } @Override public void readFields(DataInput in) throws IOException{ uid=in.readUTF(); gid=in.readUTF(); exp=in.readInt(); } @Override public void write(PreparedStatement preparedStatement) throws SQLException{ preparedStatement.setString(1,uid); preparedStatement.setString(2,gid); preparedStatement.setInt(3,exp); } @Override public void readFields(ResultSet resultSet) throws SQLException{ if(resultSet==null) return; uid=resultSet.getString(1); gid=resultSet.getString(2); exp=resultSet.getInt(3); } @Override public boolean equals(Object o){ if(this==o) return true; if(!(o instanceof RecommendResultDB)) return false; RecommendResultDB that=(RecommendResultDB)o; return getExp()==that.getExp()&&Objects.equals(getUid(),that.getUid())&&Objects.equals(getGid(),that.getGid()); } @Override public int hashCode(){ return Objects.hash(getUid(),getGid(),getExp()); } public String getUid(){ return uid; } public void setUid(String uid){ this.uid=uid; } public String getGid(){ return gid; } public void setGid(String gid){ this.gid=gid; } public int getExp(){ return exp; } public void setExp(int exp){ this.exp=exp; } @Override public String toString(){ return "RecommendResultDB{"+"uid='"+uid+'\''+", gid='"+gid+'\''+", exp="+exp+'}'; } }以上思路:设计一个RecommendResultDB类 在RecommendResulttoDB中的reduce端,将map拆分好的三个字符串按照uid,gid,exp的顺序存储进数据库。
16. 构建作业流对象(JobControl),让程序自行提交作业。
文件:GoodsRecommendationManagementSystemJobController.java
类名:GoodsRecommendationManagementSystemJobController
1.可以看到我们上面设计了9个类,其中一个是对RecommendResulttoDB的没有输入输出,为了方便我们一次性对所有作业配置,分别创建step1到step8的Job对象,然后进行各自的作业配置。
2.创建8个ControlledJob对象,将上一步的Job对象转化成可被控制的作业。
3.对可被控制的作业添加依赖关系。
4.构建JobControl对象,将8个可被控制的作业逐个添加。
5.构建线程对象,并启动线程,执行作业。
package com.briup.bigdata.project.grms; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.Properties; public class GoodsRecommendationManagemetSystemJobController extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); Path in1=new Path(conf.get("in1")); Path out1=new Path(conf.get("out1")); Path out2=new Path(conf.get("out2")); Path out3=new Path(conf.get("out3")); Path out4=new Path(conf.get("out4")); Path out5=new Path(conf.get("out5")); Path out6=new Path(conf.get("out6")); Path out7=new Path(conf.get("out7")); //--step1-- Job job1=Job.getInstance(conf,UserBuyGoodsList.class.getSimpleName()); job1.setJarByClass(this.getClass()); job1.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job1,in1); job1.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); job1.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job1,out1); //--step2-- Job job2=Job.getInstance(conf,GoodsConcurrenceList.class.getSimpleName()); job2.setJarByClass(this.getClass()); job2.setMapperClass(GoodsConcurrenceList.GoodsConcurrenceListMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job2,out1); job2.setReducerClass(GoodsConcurrenceList.GoodsConcurrenceListReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job2,out2); //--step3-- Job job3=Job.getInstance(conf,GoodsConcurrenceMatrix.class.getSimpleName()); job3.setJarByClass(this.getClass()); job3.setMapperClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixMapper.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(Text.class); job3.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job3,out2); job3.setReducerClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); job3.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job3,out3); //--step4-- Job job4=Job.getInstance(conf,UserBuyGoodsVector.class.getSimpleName()); job4.setJarByClass(this.getClass()); job4.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class); job4.setMapOutputKeyClass(Text.class); job4.setMapOutputValueClass(Text.class); job4.setInputFormatClass(TextInputFormat.class); // 数据来源:第1步的计算结果或者原始数据 TextInputFormat.addInputPath(job4,out1); job4.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class); job4.setOutputKeyClass(Text.class); job4.setOutputValueClass(Text.class); job4.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job4,out4); //--step5-- Job job5=Job.getInstance(conf,MultiplyGoodsMatrixAndUserVector.class.getSimpleName()); job5.setJarByClass(this.getClass()); // 数据来源:第1步的计算结果或者原始数据 MultipleInputs.addInputPath(job5,out3,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class); MultipleInputs.addInputPath(job5,out4,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class); job5.setMapOutputKeyClass(Text.class); job5.setMapOutputValueClass(Text.class); job5.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class); job5.setOutputKeyClass(Text.class); job5.setOutputValueClass(Text.class); job5.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job5,out5); //--step6-- Job job6=Job.getInstance(conf,MakeSumForMultiplication.class.getSimpleName()); job6.setJarByClass(this.getClass()); job6.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class); job6.setMapOutputKeyClass(Text.class); job6.setMapOutputValueClass(LongWritable.class); job6.setInputFormatClass(TextInputFormat.class); // 数据来源:第5步的计算结果 TextInputFormat.addInputPath(job6,out5); job6.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class); job6.setOutputKeyClass(Text.class); job6.setOutputValueClass(LongWritable.class); job6.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job6,out6); //--step7-- Job job7=Job.getInstance(conf,DuplicateDataForResult.class.getSimpleName()); job7.setJarByClass(this.getClass()); MultipleInputs.addInputPath(job7,out1,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultFirstMapper.class); MultipleInputs.addInputPath(job7,out6,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultSecondMapper.class); job7.setMapOutputKeyClass(Text.class); job7.setMapOutputValueClass(Text.class); job7.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class); job7.setOutputKeyClass(Text.class); job7.setOutputValueClass(Text.class); job7.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job7,out7); job7.setNumReduceTasks(1); //--step8-- Job job8=Job.getInstance(conf,SaveRecommendResultToDB.class.getSimpleName()); job8.setJarByClass(this.getClass()); job8.setMapperClass(SaveRecommendResultToDB.SaveRecommendResultToDBMapper.class); job8.setMapOutputKeyClass(Text.class); job8.setMapOutputValueClass(Text.class); job8.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job8,out7); job8.setReducerClass(SaveRecommendResultToDB.SaveRecommendResultToDBReducer.class); job8.setOutputKeyClass(RecommendResultDB.class); job8.setOutputValueClass(NullWritable.class); job8.setOutputFormatClass(DBOutputFormat.class); Properties prop=new Properties(); prop.load(this.getClass().getResourceAsStream("/db.properties")); DBConfiguration.configureDB(job8.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password")); DBOutputFormat.setOutput(job8,prop.getProperty("grms.tblname"),"uid","gid","exp"); // JobController ControlledJob cj1=new ControlledJob(job1.getConfiguration()); cj1.setJob(job1); ControlledJob cj2=new ControlledJob(job2.getConfiguration()); cj2.setJob(job2); ControlledJob cj3=new ControlledJob(job3.getConfiguration()); cj3.setJob(job3); ControlledJob cj4=new ControlledJob(job4.getConfiguration()); cj4.setJob(job4); ControlledJob cj5=new ControlledJob(job5.getConfiguration()); cj5.setJob(job5); ControlledJob cj6=new ControlledJob(job6.getConfiguration()); cj6.setJob(job6); ControlledJob cj7=new ControlledJob(job7.getConfiguration()); cj7.setJob(job7); ControlledJob cj8=new ControlledJob(job8.getConfiguration()); cj8.setJob(job8); // 添加作业之间的依赖关系 cj2.addDependingJob(cj1); cj3.addDependingJob(cj2); cj4.addDependingJob(cj1); cj5.addDependingJob(cj3); cj5.addDependingJob(cj4); cj6.addDependingJob(cj5); cj7.addDependingJob(cj1); cj7.addDependingJob(cj6); cj8.addDependingJob(cj7); // 创建JobControl对象,添加ControlledJob JobControl jc=new JobControl(this.getClass().getSimpleName()); jc.addJob(cj1); jc.addJob(cj2); jc.addJob(cj3); jc.addJob(cj4); jc.addJob(cj5); jc.addJob(cj6); jc.addJob(cj7); jc.addJob(cj8); // 构建线程类对象,执行作业 Thread thread=new Thread(jc); thread.start(); do{ for(ControlledJob cj : jc.getRunningJobList()){ cj.getJob().monitorAndPrintJob(); } }while(!jc.allFinished()); return 0; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new GoodsRecommendationManagemetSystemJobController(),args)); } }到了这里我们就可以把上面所有的类中的main函数都注释掉,打包jar然后在yarn集群中执行GoodsRecommendationManagementSystemJobController程序了,注意-Din和-Dout的使用
我们的商品推荐算法已经基本上完成了。
有条件的可以写一个脚本用于Yarn集群下执行shell命令免得一个一个输-D,挺累的。
附加:db.properties
grms.driver=com.mysql.jdbc.Driver grms.url=jdbc:mysql://ud2:5721/grms grms.username=root grms.password=root grms.tblname=resultsJobUtils类,用于将所有作业配置(除了那些导入多个Mapper的)简化,请根据代码自行简化
package com.briup.bigdata.project.grms.utils; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; public class JobUtil{ private static Job job; private static String in; private static String out; private static Configuration configuration; public static void setConf( // 定义setConf方法 Configuration c, // MapReduce作业传递的整个作业的配置对象 Class cz, // MapReduce作业志行的jar包中包含的主类的镜像 String name, // 作业的名字 String vin, // 数据的输入路径 String vout // 数据的输出路径 ){ try{ if(c==null){ throw new Exception("配置信息不能为null。"); } job=Job.getInstance(c,name); // 构建Job对象,设置配置对象和作业名 job.setJarByClass(cz); // 提供执行的作业的主类的镜像 in=vin; // 将数据的输入路径传递给全局变量 out=vout; // 将数据的输出路径传递给全局变变量 configuration=c; }catch(Exception e){ e.printStackTrace(); } } public static void setMapper( // 定义setMapper方法 Class<? extends Mapper> x, // 设置作业中运行的Mapper类的镜像参数 Class<? extends Writable> y, // 设置作业中Mapper的Key的数据类型参数 Class<? extends Writable> z, // 设置作业中Mapper的Value的数据类型参数 Class<? extends InputFormat> o // 设置作业中数据输入的格式参数 ){ try{ job.setMapperClass(x); job.setMapOutputKeyClass(y); job.setMapOutputValueClass(z); job.setInputFormatClass(o); o.getMethod("addInputPath",Job.class,Path.class).invoke(null,job,new Path(in)); }catch(Exception e){ e.printStackTrace(); } } public static void setReducer( // 定义setReducer方法 Class<? extends Reducer> a, // 设置作业中运行的Reducer类的镜像参数 Class<? extends Writable> b, // 设置作业中Reducer的Key的数据类型参数 Class<? extends Writable> c, // 设置作业中Reducer的Value的数据类型参数 Class<? extends OutputFormat> d, // 设置作业中数据输出的格式参数 int rnum // 设置Reducer的个数 ){ try{ job.setReducerClass(a); job.setOutputKeyClass(b); job.setOutputValueClass(c); job.setOutputFormatClass(d); d.getMethod("setOutputPath",Job.class,Path.class).invoke(null,job,new Path(out)); job.setNumReduceTasks(rnum); }catch(Exception e){ e.printStackTrace(); } } public static void setTotalSort(float a,int b,int c) throws InterruptedException, IOException, ClassNotFoundException, URISyntaxException{ job.setPartitionerClass(TotalOrderPartitioner.class); InputSampler.writePartitionFile(job,new RandomSampler(a,b,c)); job.addCacheFile(new URI(TotalOrderPartitioner.getPartitionFile(getConfiguration()))); } public static void setSecondarySort(Class<? extends WritableComparator> g,Class<? extends WritableComparator> s,Class<? extends Partitioner> p) throws ClassNotFoundException{ job.setPartitionerClass(p); job.setGroupingComparatorClass(g); job.setSortComparatorClass(s); } public static void setCombiner(boolean flag,Class<? extends Reducer> combiner){ if(flag&&combiner!=null) job.setCombinerClass(combiner); } public static int commit() throws Exception{ return job.waitForCompletion(true)?0:1; // 提交作业 } public static Job getJob(){ return job; } public static void setJob(Job xyz){ JobUtil.job=xyz; } public static Configuration getConfiguration(){ return job.getConfiguration(); } public static void setConfiguration(Configuration configuration){ JobUtil.configuration=configuration; } }
阅读更多
- “购买过该商品的用户还购买了”的商品推荐功能实现
- 一些算法的MapReduce实现——矩阵-向量乘法实现
- “购买过该商品的用户还浏览了”的商品推荐功能实现
- 【笔记3】用pandas实现矩阵数据格式的推荐算法 (基于用户的协同)
- 浏览过该商品的用户最终购买推荐模型实现
- 根据用户行为推荐商品问题的建模方法(也可以用来预测用户购买行为)
- Python用户推荐系统曼哈顿算法实现完整代码
- 【甘道夫】Mapreduce实现矩阵乘法的算法思路
- 一些算法的MapReduce实现——矩阵相乘一步实现
- 【甘道夫】Mapreduce实现矩阵乘法的算法思路
- Python用户推荐系统曼哈顿算法实现
- 使用MapReduce实现矩阵向量相乘
- 基于矩阵分解推荐算法之交替最小二乘法(ALS)--附实现代码
- 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一) 推荐
- MapReduce框架中矩阵相乘的算法思路及其实现
- 【笔记】电商网站, 用户可以自定义推荐橱窗的个数, 实现商品推荐的大体思路
- MapReduce之推荐算法实现
- 【笔记4】用pandas实现条目数据格式的推荐算法 (基于用户的协同)
- 【笔记5】用pandas实现矩阵数据格式的推荐算法 (基于物品的协同)
- 算法系列:矩阵相乘算法的MapReduce实现