Hadoop 2.6 使用MapReduce实现基于物品的推荐系统
2017-10-11 13:27
716 查看
一、基于物品的推荐系统
1、余弦相似度
例如,如果两个向量完全相同,则其夹角为0度,cos = 1
如果两个向量相互垂直,则其夹角为90度,cos=0,此时相似度最低
2、基于物品的协同过滤推荐算法
思想:给用户推荐那些和他们之前喜欢的商品相似的商品
步骤:
二、输入
将useraction.txt文件上传到Hadoop HDFS /input目录下
hadoop fs -put useraction.txt /input
(如果没有该目录则创建之)
hadoop fs -mkdir /input
每一行代表一次用户行为,其中第一列为用户ID,第二列为商品ID,第三列为事件分值
三、代码实现
step1:
根据用户的行为列表计算用户-商品的评分矩阵
step2:
计算每两行的相似度,最终形成一个相似度矩阵
step3:转置推荐列表(为什么要转置以及如何转置的细节请看我之前的博文)
step4:相似度矩阵 × 转置评分矩阵=推荐列表(矩阵乘法请看之前博文)
5、推荐列表去重
6、调度程序
每个程序可单独运行查看结果,也可串行
7、各步的结果截图
1、余弦相似度
例如,如果两个向量完全相同,则其夹角为0度,cos = 1
如果两个向量相互垂直,则其夹角为90度,cos=0,此时相似度最低
2、基于物品的协同过滤推荐算法
思想:给用户推荐那些和他们之前喜欢的商品相似的商品
步骤:
二、输入
将useraction.txt文件上传到Hadoop HDFS /input目录下
hadoop fs -put useraction.txt /input
(如果没有该目录则创建之)
hadoop fs -mkdir /input
A,1,1 C,3,5 B,2,3 B,5,3 B,6,5 A,2,10 C,3,10 C,4,5 C,1,5 A,1,1 A,6,5 A,4,3
每一行代表一次用户行为,其中第一列为用户ID,第二列为商品ID,第三列为事件分值
三、代码实现
step1:
根据用户的行为列表计算用户-商品的评分矩阵
package hadoop2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step1 {
/***
* input: /input/useraction.txt
* userID,itemID,score
* A,1,1 C,3,5 B,2,3 B,5,3 B,6,5 A,2,10 C,3,10 C,4,5 C,1,5 A,1,1 A,6,5 A,4,3
* output:
* (itemID,userID_score)
* ("1","A_1")
("3","C_5")
("2","B_3")
("5","B_3")
("6","B_5")
("2","A_10")
("3","C_10")
("4","C_5")
("1","C_5")
("1","A_1")
("6","A_5")
("4","A_3")
*
* 即map操作是将(用户ID,物品ID,行为分值)转为(物品ID,用户ID,行为分值)
* @author chenjie
*
*/
public static class Mapper1 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//System.out.println("map,key=" + key + ",value=" + value.toString());
String values[] = value.toString().split(",");
String userID = values[0];
String itemID = values[1];
String score = values[2];
outKey.set(itemID);
outValue.set(userID + "_" + score);
context.write(outKey, outValue);
System.out.println("(\"" + itemID + "\",\"" + userID + "_" + score + "\")");
}
}
/***
* input:
* itemID [userID_socre...]
* ("1",["A_1","C_5","A_1"])
("2",["A_10","B_3"])
("3",["C_10","C_5"])
("4",["A_3","C_5"])
("5",["B_3"])
("6",["A_5","B_5"])
output:
itemID [userID_sumScore...]
1 A_2,C_5
2 A_10,B_3
3 C_15
4 A_3,C_5
5 B_3
6 A_5,B_5
即reduce操作是将(物品ID,用户ID,行为分值)中对于物品ID和用户ID相同的行为分值进行累加
如 ("1",["A_1","C_5","A_1"])中对于1号物品,A号用户,1+1=2
那么将1号物品,A号用户,总分2分存在map中,(1,“A_2”)
同理将1号物品,C号用户,总分5分存在map中,(1,“C_5”)
...
然后将1号物品的所有信息输出 key:1 value:A_2,C_5
同理将2号物品的所有信息输出 key:2 value:A_10,B_3
...
* @author chenjie
*
*/
public static class Reducer1 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String itemID = key.toString();
StringBuilder log = new StringBuilder();
log.append("(\"" + itemID + "\",[");
Map<String,Integer> map = new HashMap<String,Integer>();
for(Text value : values)
{
log.append("\"" + value + "\",");
String userID = value.toString().split("_")[0];
String score = value.toString().split("_")[1];
if(map.get(userID) == null)
{
map.put(userID, Integer.valueOf(score));
}
else
{
Integer preScore = map.get(userID);
map.put(userID, preScore + Integer.valueOf(score));
}
}
if(log.toString().endsWith(","))
log.deleteCharAt(log.length()-1);
log.append("])");
System.out.println(log);
StringBuilder sb = new StringBuilder();
for(Map.Entry<String, Integer> entry : map.entrySet())
{
String userID = entry.getKey();
String score = String.valueOf(entry.getValue());
sb.append(userID + "_" + score + ",");
}
String line = null;
if(sb.toString().endsWith(","))
{
line = sb.substring(0, sb.length()-1);
}
outKey.set(itemID);
outValue.set(line);
context.write(outKey, outValue);
}
}
private static final String INPATH = "/input/useraction.txt";//输入文件路径
private static final String OUTPATH = "/output/tuijian1";//输出文件路径
private static final String HDFS = "hdfs://pc1:9000";//HDFS路径
public int run() throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer1.class); //为job设置Reduce类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class); //设置输出key的类型
job.setOutputValueClass(Text.class);// 设置输出value的类型
//TODO
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;
/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/
}
public static void main(String[] args)
{
try {
new Step1().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
step2:
计算每两行的相似度,最终形成一个相似度矩阵
package hadoop2; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * *计算每两行的相似度,最终形成一个相似度矩阵 * * input * itemID [userID_sumScore...] 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 同时将input拷贝一份到缓存cache中,然后对input和cache的每一行就行求值 output: itemID [itemID_cos...] 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88 3 4_0.86,3_1.00,1_0.93 4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49 5 2_0.29,5_1.00,6_0.71 6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36 * @author chenjie * */ public class Step2 { /*** * input: * itemID [userID_sumScore...] 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 cache : = input output: 1 1_1.00 1 2_0.36 1 3_0.93 1 4_0.99 1 6_0.26 2 1_0.36 2 2_1.00 2 4_0.49 2 5_0.29 2 6_0.88 3 1_0.93 3 3_1.00 3 4_0.86 4 1_0.99 4 2_0.49 4 3_0.86 4 4_1.00 4 6_0.36 5 2_0.29 5 5_1.00 5 6_0.71 6 1_0.26 6 2_0.88 6 4_0.36 6 5_0.71 6 6_1.00 * @author chenjie * */ public static class Mapper2 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<String>(); private DecimalFormat df = new DecimalFormat("0.00"); /*** * 将文件缓存到内存中,每一行为一个字符串,是所有行构成list */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileReader fr = new FileReader("itemUserScore1"); BufferedReader br = new BufferedReader(fr); String line = null; while((line = br.readLine()) != null) { cacheList.add(line); } fr.close(); br.close(); } /*** * 以 * value :1 A_2,C_5 cacheList : 2 A_10,B_3 为例 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("map,key=" + key + ",value=" + value.toString()); String[] rowAndline = value.toString().split("\t"); //获得行号 //rowAndline : 1 A_2,C_5 String row_matrix1 = rowAndline[0]; //row_matrix1 :1 String[] column_value_array_matrix1 = rowAndline[1].split(","); //获得各列 //rowAndline[1] : A_2,C_5 //column_value_array_matrix1 : [A_2,C_5] //|x|=sqrt(x1^2+x2^2+...) double denominator1 = 0; //定义向量1的模 for(String colunm : column_value_array_matrix1)//对于向量1的每一个分量 { String score = colunm.split("_")[1]; denominator1 += Double.valueOf(score) * Double.valueOf(score); //计算分量的平方并累加到模 } denominator1 = Math.sqrt(denominator1);//开跟号得到模 for(String line : cacheList)// 以line 2 A_10,B_3 为例 { String[] rowAndline2 = line.toString().split("\t"); //rowAndline2 : 2 A_10,B_3 String row_matrix2 = rowAndline2[0]; //row_matrix2 :2 String[] column_value_array_matrix2 = rowAndline2[1].split(","); //column_value_array_matrix2 : A_10,B_3 double denominator2 = 0;//求向量2的模 for(String colunm : column_value_array_matrix2) { String score = colunm.split("_")[1]; denominator2 += Double.valueOf(score) * Double.valueOf(score); } denominator2 = Math.sqrt(denominator2); int numerator = 0; //保存成绩累加结果 for(String column_value_matrix1 : column_value_array_matrix1)//对于向量1的每一列(分量) A_2,C_5 { String column_maxtrix1 = column_value_matrix1.split("_")[0]; //获得用户ID String value_matrix1 = column_value_matrix1.split("_")[1]; //获得分数 for(String column_value_matrix2 : column_value_array_matrix2)//对于向量2的每一列(分量) A_10,B_3 { String column_maxtrix2 = column_value_matrix2.split("_")[0]; //获得用户ID String value_matrix2 = column_value_matrix2.split("_")[1]; //获得分数 //如果是同一个分量 if(column_maxtrix2.equals(column_maxtrix1))//这里也体现了为什么要标明列号,只有列号明确且相等,才证明是同一个位置的分量 { numerator += Integer.valueOf(value_matrix1) * Integer.valueOf(value_matrix2); //numerator += 2×10 } } } double cos = numerator / (denominator1 * denominator2); //求余弦 if(cos == 0) continue; outKey.set(row_matrix1);//输出的key值设置为左侧矩阵的行号 outValue.set(row_matrix2 + "_" + df.format(cos));//输出的value值设置为右侧转置矩阵的行号(实际矩阵的列号)_该位置的值 context.write(outKey, outValue); System.out.println(outKey + "\t" + outValue); } } } /*** * input: * ("1",["1_1.00","2_0.36","3_0.93","4_0.99","6_0.26"]) ("2",["1_0.36","2_1.00","4_0.49","5_0.29","6_0.88"]) ("3",["4_0.86","3_1.00","1_0.93"]) ("4",["1_0.99","4_1.00","6_0.36","3_0.86","2_0.49"]) ("5",["2_0.29","5_1.00","6_0.71"]) ("6",["1_0.26","5_0.71","6_1.00","2_0.88","4_0.36"]) output: 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88 3 4_0.86,3_1.00,1_0.93 4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49 5 2_0.29,5_1.00,6_0.71 6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36 即将分量连起来 得到最终的相似度矩阵 * * @author chenjie * */ public static class Reducer2 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //System.out.println(ReduceUtils.getReduceInpt(key, values)); //只能遍历一次? StringBuilder sb = new StringBuilder(); for(Text text : values) { sb.append(text + ","); } String line = ""; if(sb.toString().endsWith(",")) { line = sb.substring(0,sb.length()-1); } outKey.set(key); outValue.set(line); context.write(outKey, outValue); } } //private static final String INPATH = "/input/itemUserScore1.txt"; private static final String INPATH = "/output/tuijian1/part-r-00000"; private static final String OUTPATH = "/output/tuijian2"; //private static final String CACHE = "/input/itemUserScore1.txt"; private static final String CACHE = "/output/tuijian1/part-r-00000"; private static final String HDFS = "hdfs://pc1:9000"; public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); @SuppressWarnings("deprecation") Job job = new Job(conf, "step2");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(Step2.class); job.setMapperClass(Mapper2.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer2.class); //为job设置Reduce类 job.addCacheArchive(new URI(CACHE + "#itemUserScore1")); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 //TODO //job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step2().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
step3:转置推荐列表(为什么要转置以及如何转置的细节请看我之前的博文)
package hadoop2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 矩阵相乘: * 1、转置评分矩阵 * 2、相似度矩阵 与 (转置评分矩阵) * 这里进行1:转置 * * input: * 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 output: A 6_5,4_3,2_10,1_2 B 6_5,5_3,2_3 C 4_5,3_15,1_5 * @author chenjie * */ public class Step3 { public static class Mapper3 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); //对于每一行,以第一行为例 //key : 1 //value : "1 1_0,2_3,3_-1,4_2,5_-3" @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] rowAndline = value.toString().split("\t"); //rowAndline : {"1","1_0,2_3,3_-1,4_2,5_-3"} String row = rowAndline[0]; //row "1" String[] lines = rowAndline[1].split(","); //rowAndline[1] : "1_0,2_3,3_-1,4_2,5_-3" //lines : {"1_0","2_3","3_-1","4_2","5_-3"} for(String line : lines)//对于每一列,以第一列为例,line "1_0" { String colunm = line.split("_")[0]; //colunm : 1 String valueStr = line.split("_")[1]; //valueStr : 0 outKey.set(colunm); //将列作为行 outValue.set(row + "_" + valueStr); //将行作为列 context.write(outKey, outValue); // 产生(1,"1_0") } //循环结束,对于{"1_0","2_3","3_-1","4_2","5_-3"} //产生(1,"1_0") 第一行,第一列_0 (2,"1_3") 第二行,第一列_3 (3,"1_-1") (4,"1_2")(5,"1_-3") /* 目标转置矩阵 0 1 1 -2 3 3 1 2 -1 5 4 -1 2 -2 -1 1 -3 -1 2 2 */ //正好对应于转置矩阵的第一列 } /* 所有map操作产生 ("1","1_0") ("2","1_3") ("3","1_-1") ("4","1_2") ("5","1_-3") ("1","2_1") ("2","2_3") ("3","2_5") ("4","2_-2") ("5","2_-1") ("1","3_0") ("2","3_1") ("3","3_4") ("4","3_-1") ("5","3_2") ("1","4_-2") ("2","4_2") ("3","4_-1") ("4","4_1") ("5","4_2") */ } /* Reduce任务,将map操作产生的所有键值对集合进行合并,生成转置矩阵的存储表示 key值相同的值会组成值的集合 如: key:"1"时 values:{"3_0","1_0","4_-2","2_1"} 注意:这里就是为什么要进行列标号的原因,values的顺序不一定就是原来矩阵列的顺序 */ public static class Reducer3 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text text : values) { sb.append(text + ","); } //sb : "3_0,1_0,4_-2,2_1," //注意这里末尾有个逗号 String line = ""; if(sb.toString().endsWith(",")) { line = sb.substring(0,sb.length()-1); } //去掉逗号 //line : "3_0,1_0,4_-2,2_1" outKey.set(key); outValue.set(line); //("1","3_0,1_0,4_-2,2_1") context.write(outKey, outValue); } } private static final String INPATH = "hdfs://pc1:9000/output/tuijian1/part-r-00000";//输入文件路径 private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian3";//输出文件路径 private static final String HDFS = "hdfs://pc1:9000";//HDFS路径 public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); Job job = new Job(conf, "step3");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(Step3.class); job.setMapperClass(Mapper3.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer3.class); //为job设置Reduce类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step3().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { e.printStackTrace(); } } }
step4:相似度矩阵 × 转置评分矩阵=推荐列表(矩阵乘法请看之前博文)
package hadoop2; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 矩阵相乘: * 1、转置评分矩阵 * 2、相似度矩阵 与 (转置评分矩阵) * 这里进行2:相似度矩阵 与 (转置评分矩阵)相乘 * input: 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88 3 4_0.86,3_1.00,1_0.93 4 1_0.99,4_1.00,6_0.36,3_0.86,2_0.49 5 2_0.29,5_1.00,6_0.71 6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36 * * cache: * A 6_5,4_3,2_10,1_2 B 6_5,5_3,2_3 C 4_5,3_15,1_5 output: 1 A_9.87,B_2.38,C_23.90 2 A_16.59,B_8.27,C_4.25 3 C_23.95,A_4.44 4 B_3.27,C_22.85,A_11.68 5 A_6.45,B_7.42 6 C_3.10,A_15.40,B_9.77 如: map 1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26 × A 6_5,4_3,2_10,1_2 = 1.00*2+0.36*10+0.99*3+0.26*5 =9.87 生成(1,A_9.9) reduce 将所有的合并生成推荐列表 * @author chenjie * */ public class Step4 { public static class Mapper4 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<String>(); private DecimalFormat df = new DecimalFormat("0.00"); /*** * 将保存右侧矩阵的文件缓存到内存中,每一行为一个字符串,是所有行构成list */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileReader fr = new FileReader("myfile"); BufferedReader br = new BufferedReader(fr); String line = null; while((line = br.readLine()) != null) { cacheList.add(line); System.out.println("----------------------cache line :" + line); } fr.close(); br.close(); } /* 左侧矩阵逻辑形式 * 1 2 -2 0 * 3 3 4 -3 * -2 0 2 3 * 5 3 -1 2 * -4 2 0 2 * 左侧矩阵物理形式 * 1 1_1,2_2,3_-2,4_0 * 2 1_3,2_3,3_4,4_-3 * 3 1_-2,2_0,3_2,4_3 * 4 1_5,2_3,3_-1,4_2 * 5 1_-4,2_2,3_0,4_2 * * 右侧矩阵(已转置)物理形式 * 1 3_0,1_0,4_-2,2_1 2 3_1,4_2,2_3,1_3 3 4_-1,1_-1,3_4,2_5 4 1_2,3_-1,4_1,2_-2 5 4_2,3_2,1_-3,2_-1 key: "1" value: "1 1_1,2_2,3_-2,4_0" * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("-------------------map,key=" + key + "value=" + value); String[] rowAndline = value.toString().split("\t"); //获得行号 //rowAndline : {"1","1_1,2_2,3_-2,4_0"} String row_matrix1 = rowAndline[0]; //row_matrix1 :"1" String[] column_value_array_matrix1 = rowAndline[1].split(","); //获得各列 //rowAndline[1] : "1_1,2_2,3_-2,4_0" //column_value_array_matrix1 : {"1_1","2_2","3_-2","4_0"} for(String line : cacheList)// 以line:"3 4_-1,1_-1,3_4,2_5"为例 { String[] rowAndline2 = line.toString().split("\t"); //rowAndline2 : {"3","4_-1,1_-1,3_4,2_5"} String row_matrix2 = rowAndline2[0]; //获得转置矩阵line行的行号(原右矩阵的列号) String[] column_value_array_matrix2 = rowAndline2[1].split(","); //rowAndline2[1] : "4_-1,1_-1,3_4,2_5" //column_value_array_matrix2 : {"4_-1","1,-1","3_4","2_5"} double result = 0; //保存成绩累加结果 for(String column_value_matrix1 : column_value_array_matrix1)//对于左侧矩阵line行的每一列(分量) "1_1","2_2","3_-2","4_0" { String column_maxtrix1 = column_value_matrix1.split("_")[0]; //获得列号 String value_matrix1 = column_value_matrix1.split("_")[1]; //获得该列的值 for(String column_value_matrix2 : column_value_array_matrix2)//对于右侧矩阵的line行的每一列(分量) "4_-1","1,-1","3_4","2_5" { String column_maxtrix2 = column_value_matrix2.split("_")[0]; //获得列号 String value_matrix2 = column_value_matrix2.split("_")[1]; //获得该列的值 if(column_maxtrix2.equals(column_maxtrix1))//这里也体现了为什么要标明列号,只有列号明确且相等,才证明是同一个位置的分量 { result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2); //result += 1 * (-1) //result += 2 * 5 //result += -2 * 4 //result += 0 * (-1) } } } if(result == 0) continue; outKey.set(row_matrix1);//输出的key值设置为左侧矩阵的行号 outValue.set(row_matrix2 + "_" +df.format(result));//输出的value值设置为右侧转置矩阵的行号(实际矩阵的列号)_该位置的值 context.write(outKey, outValue); //("1","3_1") } //("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9") //("2","1_9")... //.... } } public static class Reducer4 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); /** * 将map产生的key-value对进行组合,拼接成结果矩阵的物理形式 * ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9") * ("2","1_9")... * ... * 对于key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9") * 会将其组合 * key : "1" * values : {"2_7","3_1","2_4","4_0","5_9"} * */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text text : values) { sb.append(text + ","); } // sb : "2_7,3_1,2_4,4_0,5_9," String line = ""; if(sb.toString().endsWith(",")) { line = sb.substring(0,sb.length()-1); } //line :"2_7,3_1,2_4,4_0,5_9" outKey.set(key); outValue.set(line); context.write(outKey, outValue); // ("1","2_7,3_1,2_4,4_0,5_9") } } private static final String INPATH = "hdfs://pc1:9000/output/tuijian2/part-r-00000"; private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian4"; private static final String CACHE = "hdfs://pc1:9000/output/tuijian3/part-r-00000"; private static final String HDFS = "hdfs://pc1:9000"; public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(Step4.class); job.setMapperClass(Mapper4.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer4.class); //为job设置Reduce类 job.addCacheArchive(new URI(CACHE + "#myfile")); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step4().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
5、推荐列表去重
package hadoop2; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * 去掉推荐列表中,用户已经操作过的商品,例如用户A已经购买过iphone7,则将iphone7从推荐列表中删除 * input:相似度矩阵 * 1 A_9.87,B_2.38,C_23.90 2 A_16.59,B_8.27,C_4.25 3 C_23.95,A_4.44 4 B_3.27,C_22.85,A_11.68 5 A_6.45,B_7.42 6 C_3.10,A_15.40,B_9.77 * cache:操作记录 * 1 A_2,C_5 2 A_10,B_3 3 C_15 4 A_3,C_5 5 B_3 6 A_5,B_5 map: 例如 1商品的推荐列表:1 A_9.87,B_2.38,C_23.90 1商品的操作记录:1 A_2,C_5 则对于1商品,由于A已经有2分,C已经右5分 应该把A和C从1的推荐列表中删除, 只保留B 而最终是要根据用户来推荐商品,于是将用户作为key,物品和推荐度作为value返回 (B,1_2.38) reduce: 将同一用户推荐的商品合并输出 output: A 5_6.45,3_4.44 B 4_3.27,1_2.38 C 6_3.10,2_4.25 * @author chenjie * */ public class Step5 { public static class Mapper5 extends Mapper<LongWritable,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); private List<String> cacheList = new ArrayList<String>(); private DecimalFormat df = new DecimalFormat("0.00"); /*** * 将保存右侧矩阵的文件缓存到内存中,每一行为一个字符串,是所有行构成list */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileReader fr = new FileReader("itemUserScore3"); BufferedReader br = new BufferedReader(fr); String line = null; while((line = br.readLine()) != null) { cacheList.add(line); System.out.println("----------------------cache line :" + line); } fr.close(); br.close(); } /** * 以 * 1商品的推荐列表:1 A_9.87,B_2.38,C_23.90 1商品的操作记录:1 A_2,C_5 为例 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("-------------------map,key=" + key + "value=" + value); String item_matrix1 = value.toString().split("\t")[0]; //推荐列表商品号 1 String[] user_score_array_matrix1 = value.toString().split("\t")[1].split(","); //推荐列表 A_9.87,B_2.38,C_23.90 for(String line : cacheList)//商品的操作记录列表 { String item_matrix2 = line.toString().split("\t")[0]; //操作记录商品号 1 String[] user_score_array_matrix2 = line.toString().split("\t")[1].split(","); //操作记录 A_2,C_5 if(item_matrix1.equals(item_matrix2))//如果推荐列表商品号==操作记录商品号,证明是同一商品,才能操作 { for(String user_score : user_score_array_matrix1)//对于推荐列表中每一个用户 A_9.87,B_2.38,C_23.90 { boolean flag = false;//默认操作过标志位 String user_matrix1 = user_score.split("_")[0]; //用户ID String score_matrix1 = user_score.split("_")[1]; //推荐度 for(String user_score2 : user_score_array_matrix2)//对于操作记录中的每一条记录 A_2,C_5 { String user_matrix2 = user_score2.split("_")[0]; //用户ID if(user_matrix1.equals(user_matrix2))//如果两个ID相等 如A_9.87 和A_2 则证明用户A操作过该商品 { flag = true; } } if(flag == false)//如果用户A没有操作过该物品 { outKey.set(user_matrix1);//将用户ID作为Key outValue.set(item_matrix1 + "_" +score_matrix1 );//将商品ID_推荐度作为value context.write(outKey, outValue);//写入结果集 } } } } } } public static class Reducer5 extends Reducer<Text,Text,Text,Text> { private Text outKey = new Text(); private Text outValue = new Text(); /** * 将map产生的key-value对进行组合,拼接成结果矩阵的物理形式 * ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9") * ("2","1_9")... * ... * 对于key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9") * 会将其组合 * key : "1" * values : {"2_7","3_1","2_4","4_0","5_9"} * */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for(Text text : values) { sb.append(text + ","); } // sb : "2_7,3_1,2_4,4_0,5_9," String line = ""; if(sb.toString().endsWith(",")) { line = sb.substring(0,sb.length()-1); } //line :"2_7,3_1,2_4,4_0,5_9" outKey.set(key); outValue.set(line); context.write(outKey, outValue); // ("1","2_7,3_1,2_4,4_0,5_9") } } private static final String INPATH = "hdfs://pc1:9000/output/tuijian4/part-r-00000"; private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian5"; private static final String CACHE = "hdfs://pc1:9000/output/tuijian1/part-r-00000"; private static final String HDFS = "hdfs://pc1:9000"; public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"}; String[] otherArgs = {INPATH,OUTPATH}; //这里需要配置参数即输入和输出的HDFS的文件路径 if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //conf.set("fs.defaultFS",HDFS); // JobConf conf1 = new JobConf(WordCount.class); Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 设置job名称和 job.setJarByClass(Step5.class); job.setMapperClass(Mapper5.class); //为job设置Mapper类 //job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类 job.setReducerClass(Reducer5.class); //为job设置Reduce类 job.addCacheArchive(new URI(CACHE + "#itemUserScore3")); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //设置输出key的类型 job.setOutputValueClass(Text.class);// 设置输出value的类型 job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径 FileSystem fs = FileSystem.get(conf); Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } return job.waitForCompletion(true) ? 1 : -1; /*Configuration conf = new Configuration(); conf.set("fs.defaultFS",HDFS); Job job = Job.getInstance(conf,"step1"); job.setJarByClass(Step1.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(INPATH); if(fs.exists(inPath)) { //FileInputFormat.addInputPath(conf, inPath); } Path outPath = new Path(OUTPATH); if(fs.exists(outPath)) { fs.delete(outPath, true); }*/ } public static void main(String[] args) { try { new Step5().run(); } catch (ClassNotFoundException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
6、调度程序
每个程序可单独运行查看结果,也可串行
package hadoop2; import java.io.IOException; import java.net.URISyntaxException; public class JobRunner { public static void main(String[] args) { int result1 = -1; int result2 = -1; int result3 = -1; int result4 = -1; int result5 = -1; try { result1 = new Step1().run(); } catch (Exception e) { result1 = -1; } if(result1 == 1) { System.out.println("Step1 run success"); try { result2 = new Step2().run(); } catch (ClassNotFoundException | IOException | InterruptedException | URISyntaxException e) { result2 = -1; } } else { System.out.println("Step1 run failed"); } if(result2 == 1) { System.out.println("Step2 run success"); try { result3 = new Step3().run(); } catch (Exception e) { result3 = -1; } } else { System.out.println("Step2 run failed"); } if(result3 == 1) { System.out.println("Step3 run success"); try { result4 = new Step4().run(); } catch (Exception e) { result4 = -1; } } else { System.out.println("Step3 run failed"); } if(result4 == 1) { System.out.println("Step4 run success"); try { result5 = new Step5().run(); } catch (Exception e) { result5 = -1; } } else { System.out.println("Step4 run failed"); } if(result5 == 1) { System.out.println("Step5 run success"); System.out.println("job finished "); } else { System.out.println("Step5 run failed"); } } }
7、各步的结果截图
相关文章推荐
- Hadoop 2.6 使用MapReduce实现基于内容的推荐系统
- Hadoop 2.6 使用MapReduce实现基于用户的推荐系统
- 用Hadoop流实现mapreduce版推荐系统基于物品的协同过滤算法
- Hadoop应用开发--基于MapReduce推荐系统的实现
- 基于hadoop下的mahout推荐系统实现
- 使用Python MrJob的MapReduce实现电影推荐系统
- 使用Python MrJob的MapReduce实现电影推荐系统
- 使用python构建基于hadoop的mapreduce日志分析平台 推荐
- 基于hadoop下的mahout推荐系统实现
- 使用Python MrJob的MapReduce实现电影推荐系统
- 使用Python MrJob的MapReduce实现电影推荐系统
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (2)
- Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
- Azure HDInsights实战:使用Azure Hadoop和Mahout实现推荐功能
- 基于mapreduce的Hadoop join实现
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (1)
- 使用Python实现Hadoop MapReduce程序
- 使用SVD方法实现电影推荐系统
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- Extmail:实现基于虚拟账号的web邮件系统 推荐