您的位置:首页 > 运维架构

Hadoop 2.6 使用MapReduce实现基于物品的推荐系统

2017-10-11 13:27 716 查看
一、基于物品的推荐系统

1、余弦相似度





例如,如果两个向量完全相同,则其夹角为0度,cos = 1

如果两个向量相互垂直,则其夹角为90度,cos=0,此时相似度最低

2、基于物品的协同过滤推荐算法

思想:给用户推荐那些和他们之前喜欢的商品相似的商品

步骤:









二、输入

将useraction.txt文件上传到Hadoop HDFS /input目录下

hadoop fs -put useraction.txt /input

(如果没有该目录则创建之)

hadoop fs -mkdir /input

A,1,1
C,3,5
B,2,3
B,5,3
B,6,5
A,2,10
C,3,10
C,4,5
C,1,5
A,1,1
A,6,5
A,4,3


每一行代表一次用户行为,其中第一列为用户ID,第二列为商品ID,第三列为事件分值

三、代码实现

step1:

根据用户的行为列表计算用户-商品的评分矩阵

package hadoop2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Step1 {

/***
* input: /input/useraction.txt
* userID,itemID,score
* A,1,1 C,3,5 B,2,3 B,5,3 B,6,5 A,2,10 C,3,10 C,4,5 C,1,5 A,1,1 A,6,5 A,4,3
* output:
* (itemID,userID_score)
* ("1","A_1")
("3","C_5")
("2","B_3")
("5","B_3")
("6","B_5")
("2","A_10")
("3","C_10")
("4","C_5")
("1","C_5")
("1","A_1")
("6","A_5")
("4","A_3")
*
* 即map操作是将(用户ID,物品ID,行为分值)转为(物品ID,用户ID,行为分值)
* @author chenjie
*
*/
public static class Mapper1 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//System.out.println("map,key=" + key + ",value=" + value.toString());
String values[] = value.toString().split(",");
String userID = values[0];
String itemID = values[1];
String score = values[2];
outKey.set(itemID);
outValue.set(userID + "_" + score);
context.write(outKey, outValue);
System.out.println("(\"" + itemID + "\",\"" + userID + "_" + score + "\")");
}
}

/***
* input:
* itemID [userID_socre...]
* ("1",["A_1","C_5","A_1"])
("2",["A_10","B_3"])
("3",["C_10","C_5"])
("4",["A_3","C_5"])
("5",["B_3"])
("6",["A_5","B_5"])

output:
itemID [userID_sumScore...]
1 A_2,C_5
2 A_10,B_3
3 C_15
4 A_3,C_5
5 B_3
6 A_5,B_5

即reduce操作是将(物品ID,用户ID,行为分值)中对于物品ID和用户ID相同的行为分值进行累加
如 ("1",["A_1","C_5","A_1"])中对于1号物品,A号用户,1+1=2
那么将1号物品,A号用户,总分2分存在map中,(1,“A_2”)
同理将1号物品,C号用户,总分5分存在map中,(1,“C_5”)
...
然后将1号物品的所有信息输出 key:1 value:A_2,C_5
同理将2号物品的所有信息输出 key:2 value:A_10,B_3
...
* @author chenjie
*
*/
public static class Reducer1 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String itemID = key.toString();
StringBuilder log = new StringBuilder();
log.append("(\"" + itemID + "\",[");
Map<String,Integer> map = new HashMap<String,Integer>();
for(Text value : values)
{
log.append("\"" + value + "\",");
String userID = value.toString().split("_")[0];
String score = value.toString().split("_")[1];
if(map.get(userID) == null)
{
map.put(userID, Integer.valueOf(score));
}
else
{
Integer preScore = map.get(userID);
map.put(userID, preScore + Integer.valueOf(score));
}
}
if(log.toString().endsWith(","))
log.deleteCharAt(log.length()-1);
log.append("])");
System.out.println(log);
StringBuilder sb = new StringBuilder();
for(Map.Entry<String, Integer> entry : map.entrySet())
{
String userID = entry.getKey();
String score = String.valueOf(entry.getValue());
sb.append(userID + "_" + score + ",");
}
String line = null;
if(sb.toString().endsWith(","))
{
line = sb.substring(0, sb.length()-1);
}
outKey.set(itemID);
outValue.set(line);
context.write(outKey, outValue);
}

}

private static final String INPATH = "/input/useraction.txt";//输入文件路径
private static final String OUTPATH = "/output/tuijian1";//输出文件路径
private static final String HDFS = "hdfs://pc1:9000";//HDFS路径

public int run() throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer1.class); //为job设置Reduce类

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class); //设置输出key的类型
job.setOutputValueClass(Text.class);// 设置输出value的类型

//TODO
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类 设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类 设置输出路径

FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

return job.waitForCompletion(true) ? 1 : -1;

/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/

}

public static void main(String[] args)
{
try {
new Step1().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}

}


step2:

计算每两行的相似度,最终形成一个相似度矩阵

package hadoop2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
*
*计算每两行的相似度,最终形成一个相似度矩阵
*
* input
* itemID [userID_sumScore...]
1	A_2,C_5
2	A_10,B_3
3	C_15
4	A_3,C_5
5	B_3
6	A_5,B_5
同时将input拷贝一份到缓存cache中,然后对input和cache的每一行就行求值
output:
itemID	[itemID_cos...]
1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
3	4_0.86,3_1.00,1_0.93
4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
5	2_0.29,5_1.00,6_0.71
6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
* @author chenjie
*
*/
public class Step2 {

/***
* input:
* itemID [userID_sumScore...]
1	A_2,C_5
2	A_10,B_3
3	C_15
4	A_3,C_5
5	B_3
6	A_5,B_5
cache : = input
output:
1	1_1.00
1	2_0.36
1	3_0.93
1	4_0.99
1	6_0.26
2	1_0.36
2	2_1.00
2	4_0.49
2	5_0.29
2	6_0.88
3	1_0.93
3	3_1.00
3	4_0.86
4	1_0.99
4	2_0.49
4	3_0.86
4	4_1.00
4	6_0.36
5	2_0.29
5	5_1.00
5	6_0.71
6	1_0.26
6	2_0.88
6	4_0.36
6	5_0.71
6	6_1.00
* @author chenjie
*
*/
public static class Mapper2 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();

private DecimalFormat df = new DecimalFormat("0.00");

/***
* 	将文件缓存到内存中,每一行为一个字符串,是所有行构成list
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileReader fr = new FileReader("itemUserScore1");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line = br.readLine()) != null)
{
cacheList.add(line);
}
fr.close();
br.close();
}

/***
* 	以
* 	value :1	A_2,C_5
cacheList : 2	A_10,B_3
为例
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("map,key=" + key + ",value=" + value.toString());
String[] rowAndline = value.toString().split("\t");
//获得行号
//rowAndline : 1	A_2,C_5
String row_matrix1 = rowAndline[0];
//row_matrix1 :1
String[] column_value_array_matrix1 = rowAndline[1].split(",");
//获得各列
//rowAndline[1] : A_2,C_5
//column_value_array_matrix1 : [A_2,C_5]

//|x|=sqrt(x1^2+x2^2+...)
double denominator1 = 0;
//定义向量1的模
for(String colunm : column_value_array_matrix1)//对于向量1的每一个分量
{
String score = colunm.split("_")[1];
denominator1 +=  Double.valueOf(score) * Double.valueOf(score);
//计算分量的平方并累加到模
}
denominator1 = Math.sqrt(denominator1);//开跟号得到模

for(String line : cacheList)// 以line 2	A_10,B_3  为例
{
String[] rowAndline2 = line.toString().split("\t");
//rowAndline2 : 2	A_10,B_3
String row_matrix2 = rowAndline2[0];
//row_matrix2 :2
String[] column_value_array_matrix2 = rowAndline2[1].split(",");
//column_value_array_matrix2 : A_10,B_3

double denominator2 = 0;//求向量2的模
for(String colunm : column_value_array_matrix2)
{
String score = colunm.split("_")[1];
denominator2 +=  Double.valueOf(score) * Double.valueOf(score);
}
denominator2 = Math.sqrt(denominator2);

int numerator = 0;
//保存成绩累加结果
for(String column_value_matrix1 : column_value_array_matrix1)//对于向量1的每一列(分量) A_2,C_5
{
String column_maxtrix1 = column_value_matrix1.split("_")[0];
//获得用户ID
String value_matrix1 = column_value_matrix1.split("_")[1];
//获得分数

for(String column_value_matrix2 : column_value_array_matrix2)//对于向量2的每一列(分量) A_10,B_3
{
String column_maxtrix2 = column_value_matrix2.split("_")[0];
//获得用户ID
String value_matrix2 = column_value_matrix2.split("_")[1];
//获得分数

//如果是同一个分量
if(column_maxtrix2.equals(column_maxtrix1))//这里也体现了为什么要标明列号,只有列号明确且相等,才证明是同一个位置的分量
{
numerator += Integer.valueOf(value_matrix1) * Integer.valueOf(value_matrix2);
//numerator += 2×10
}
}
}

double cos = numerator / (denominator1 * denominator2);
//求余弦
if(cos == 0)
continue;
outKey.set(row_matrix1);//输出的key值设置为左侧矩阵的行号
outValue.set(row_matrix2 + "_" + df.format(cos));//输出的value值设置为右侧转置矩阵的行号(实际矩阵的列号)_该位置的值
context.write(outKey, outValue);
System.out.println(outKey + "\t" + outValue);
}
}
}

/***
* input:
*  ("1",["1_1.00","2_0.36","3_0.93","4_0.99","6_0.26"])
("2",["1_0.36","2_1.00","4_0.49","5_0.29","6_0.88"])
("3",["4_0.86","3_1.00","1_0.93"])
("4",["1_0.99","4_1.00","6_0.36","3_0.86","2_0.49"])
("5",["2_0.29","5_1.00","6_0.71"])
("6",["1_0.26","5_0.71","6_1.00","2_0.88","4_0.36"])

output:
1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
3	4_0.86,3_1.00,1_0.93
4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
5	2_0.29,5_1.00,6_0.71
6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36

即将分量连起来
得到最终的相似度矩阵
*
* @author chenjie
*
*/
public static class Reducer2 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//System.out.println(ReduceUtils.getReduceInpt(key, values));
//只能遍历一次?
StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
}

}

//private static final String INPATH = "/input/itemUserScore1.txt";
private static final String INPATH = "/output/tuijian1/part-r-00000";
private static final String OUTPATH = "/output/tuijian2";

//private static final String CACHE = "/input/itemUserScore1.txt";
private static final String CACHE = "/output/tuijian1/part-r-00000";
private static final String HDFS = "hdfs://pc1:9000";

public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "step2");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step2.class);
job.setMapperClass(Mapper2.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer2.class); //为job设置Reduce类

job.addCacheArchive(new URI(CACHE + "#itemUserScore1"));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);        //设置输出key的类型
job.setOutputValueClass(Text.class);//  设置输出value的类型

//TODO
//job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类   设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类  设置输出路径

FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

return job.waitForCompletion(true) ? 1 : -1;

/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/

}

public static void main(String[] args)
{
try {
new Step2().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


step3:转置推荐列表(为什么要转置以及如何转置的细节请看我之前的博文)

package hadoop2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* 矩阵相乘:
* 1、转置评分矩阵
* 2、相似度矩阵 与 (转置评分矩阵)
* 这里进行1:转置
*
* input:
* 	1	A_2,C_5
2	A_10,B_3
3	C_15
4	A_3,C_5
5	B_3
6	A_5,B_5
output:
A	6_5,4_3,2_10,1_2
B	6_5,5_3,2_3
C	4_5,3_15,1_5
* @author chenjie
*
*/
public class Step3 {
public static class Mapper3 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

//对于每一行,以第一行为例
//key : 1
//value : "1	1_0,2_3,3_-1,4_2,5_-3"
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] rowAndline = value.toString().split("\t");
//rowAndline : {"1","1_0,2_3,3_-1,4_2,5_-3"}
String row = rowAndline[0];
//row "1"
String[] lines = rowAndline[1].split(",");
//rowAndline[1] : "1_0,2_3,3_-1,4_2,5_-3"
//lines : {"1_0","2_3","3_-1","4_2","5_-3"}
for(String line : lines)//对于每一列,以第一列为例,line "1_0"
{
String colunm = line.split("_")[0];
//colunm : 1
String valueStr = line.split("_")[1];
//valueStr : 0
outKey.set(colunm);
//将列作为行
outValue.set(row + "_" + valueStr);
//将行作为列
context.write(outKey, outValue);
// 产生(1,"1_0")
}
//循环结束,对于{"1_0","2_3","3_-1","4_2","5_-3"}
//产生(1,"1_0") 第一行,第一列_0    (2,"1_3")  第二行,第一列_3		(3,"1_-1") (4,"1_2")(5,"1_-3")
/*
目标转置矩阵
0	1	1	-2
3	3	1	2
-1	5	4	-1
2	-2	-1	1
-3	-1	2	2
*/
//正好对应于转置矩阵的第一列
}
/*
所有map操作产生
("1","1_0")	("2","1_3") 	("3","1_-1")	("4","1_2")		("5","1_-3")
("1","2_1")	("2","2_3") 	("3","2_5")	    ("4","2_-2")	("5","2_-1")
("1","3_0")	("2","3_1")	    ("3","3_4")		("4","3_-1")	("5","3_2")
("1","4_-2")  ("2","4_2")	    ("3","4_-1")	("4","4_1")		("5","4_2")
*/

}

/*
Reduce任务,将map操作产生的所有键值对集合进行合并,生成转置矩阵的存储表示
key值相同的值会组成值的集合
如:
key:"1"时
values:{"3_0","1_0","4_-2","2_1"}
注意:这里就是为什么要进行列标号的原因,values的顺序不一定就是原来矩阵列的顺序
*/

public static class Reducer3 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
//sb : "3_0,1_0,4_-2,2_1,"
//注意这里末尾有个逗号
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//去掉逗号
//line : "3_0,1_0,4_-2,2_1"
outKey.set(key);
outValue.set(line);
//("1","3_0,1_0,4_-2,2_1")
context.write(outKey, outValue);
}

}

private static final String INPATH = "hdfs://pc1:9000/output/tuijian1/part-r-00000";//输入文件路径
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian3";//输出文件路径
private static final String HDFS = "hdfs://pc1:9000";//HDFS路径

public int run() throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step3");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step3.class);
job.setMapperClass(Mapper3.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer3.class); //为job设置Reduce类

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);        //设置输出key的类型
job.setOutputValueClass(Text.class);//  设置输出value的类型

job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类   设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类  设置输出路径
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

return job.waitForCompletion(true) ? 1 : -1;

/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/

}

public static void main(String[] args)
{
try {
new Step3().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}

}


step4:相似度矩阵 × 转置评分矩阵=推荐列表(矩阵乘法请看之前博文)

package hadoop2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* 矩阵相乘:
* 1、转置评分矩阵
* 2、相似度矩阵 与 (转置评分矩阵)
* 这里进行2:相似度矩阵 与 (转置评分矩阵)相乘
* input:
1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
3	4_0.86,3_1.00,1_0.93
4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
5	2_0.29,5_1.00,6_0.71
6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
*
* cache:
* 	A	6_5,4_3,2_10,1_2
B	6_5,5_3,2_3
C	4_5,3_15,1_5

output:
1	A_9.87,B_2.38,C_23.90
2	A_16.59,B_8.27,C_4.25
3	C_23.95,A_4.44
4	B_3.27,C_22.85,A_11.68
5	A_6.45,B_7.42
6	C_3.10,A_15.40,B_9.77

如:
map
1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
×
A	6_5,4_3,2_10,1_2
=
1.00*2+0.36*10+0.99*3+0.26*5
=9.87
生成(1,A_9.9)
reduce 将所有的合并生成推荐列表
* @author chenjie
*
*/
public class Step4 {
public static class Mapper4 extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();

private DecimalFormat df = new DecimalFormat("0.00");

/***
* 	将保存右侧矩阵的文件缓存到内存中,每一行为一个字符串,是所有行构成list
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileReader fr = new FileReader("myfile");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line = br.readLine()) != null)
{
cacheList.add(line);
System.out.println("----------------------cache line :" + line);
}
fr.close();
br.close();
}

/*	左侧矩阵逻辑形式
* 1	2	-2	0
* 3	3	4	-3
* -2	0	2	3
* 5	3	-1	2
* -4	2	0	2
* 左侧矩阵物理形式
* 1	1_1,2_2,3_-2,4_0
* 2	1_3,2_3,3_4,4_-3
* 3	1_-2,2_0,3_2,4_3
* 4	1_5,2_3,3_-1,4_2
* 5	1_-4,2_2,3_0,4_2
*
* 右侧矩阵(已转置)物理形式
*  1	3_0,1_0,4_-2,2_1
2	3_1,4_2,2_3,1_3
3	4_-1,1_-1,3_4,2_5
4	1_2,3_-1,4_1,2_-2
5	4_2,3_2,1_-3,2_-1

key: "1"
value: "1	1_1,2_2,3_-2,4_0"
* */
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println("-------------------map,key=" + key + "value=" + value);
String[] rowAndline = value.toString().split("\t");
//获得行号
//rowAndline : {"1","1_1,2_2,3_-2,4_0"}
String row_matrix1 = rowAndline[0];
//row_matrix1 :"1"
String[] column_value_array_matrix1 = rowAndline[1].split(",");
//获得各列
//rowAndline[1] : "1_1,2_2,3_-2,4_0"
//column_value_array_matrix1 : {"1_1","2_2","3_-2","4_0"}
for(String line : cacheList)// 以line:"3		4_-1,1_-1,3_4,2_5"为例
{
String[] rowAndline2 = line.toString().split("\t");
//rowAndline2 : {"3","4_-1,1_-1,3_4,2_5"}
String row_matrix2 = rowAndline2[0];
//获得转置矩阵line行的行号(原右矩阵的列号)
String[] column_value_array_matrix2 = rowAndline2[1].split(",");
//rowAndline2[1] : "4_-1,1_-1,3_4,2_5"
//column_value_array_matrix2 : {"4_-1","1,-1","3_4","2_5"}
double result = 0;
//保存成绩累加结果
for(String column_value_matrix1 : column_value_array_matrix1)//对于左侧矩阵line行的每一列(分量) "1_1","2_2","3_-2","4_0"
{
String column_maxtrix1 = column_value_matrix1.split("_")[0];
//获得列号
String value_matrix1 = column_value_matrix1.split("_")[1];
//获得该列的值

for(String column_value_matrix2 : column_value_array_matrix2)//对于右侧矩阵的line行的每一列(分量) "4_-1","1,-1","3_4","2_5"
{
String column_maxtrix2 = column_value_matrix2.split("_")[0];
//获得列号
String value_matrix2 = column_value_matrix2.split("_")[1];
//获得该列的值

if(column_maxtrix2.equals(column_maxtrix1))//这里也体现了为什么要标明列号,只有列号明确且相等,才证明是同一个位置的分量
{
result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2);
//result += 1 * (-1)
//result += 2 * 5
//result += -2 * 4
//result += 0 * (-1)
}
}
}
if(result == 0)
continue;

outKey.set(row_matrix1);//输出的key值设置为左侧矩阵的行号
outValue.set(row_matrix2 + "_" +df.format(result));//输出的value值设置为右侧转置矩阵的行号(实际矩阵的列号)_该位置的值
context.write(outKey, outValue);
//("1","3_1")
}
//("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
//("2","1_9")...
//....
}
}

public static class Reducer4 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

/**
* 将map产生的key-value对进行组合,拼接成结果矩阵的物理形式
* ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* ("2","1_9")...
* ...
* 对于key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* 会将其组合
* key : "1"
* values : {"2_7","3_1","2_4","4_0","5_9"}
*
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
// sb : "2_7,3_1,2_4,4_0,5_9,"
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//line :"2_7,3_1,2_4,4_0,5_9"
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
// ("1","2_7,3_1,2_4,4_0,5_9")
}

}

private static final String INPATH = "hdfs://pc1:9000/output/tuijian2/part-r-00000";
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian4";

private static final String CACHE = "hdfs://pc1:9000/output/tuijian3/part-r-00000";
private static final String HDFS = "hdfs://pc1:9000";

public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step4.class);
job.setMapperClass(Mapper4.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer4.class); //为job设置Reduce类

job.addCacheArchive(new URI(CACHE + "#myfile"));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);        //设置输出key的类型
job.setOutputValueClass(Text.class);//  设置输出value的类型

job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类   设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类  设置输出路径

FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;

/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/

}

public static void main(String[] args)
{
try {
new Step4().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


5、推荐列表去重

package hadoop2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
* 去掉推荐列表中,用户已经操作过的商品,例如用户A已经购买过iphone7,则将iphone7从推荐列表中删除
* input:相似度矩阵
*	1	A_9.87,B_2.38,C_23.90
2	A_16.59,B_8.27,C_4.25
3	C_23.95,A_4.44
4	B_3.27,C_22.85,A_11.68
5	A_6.45,B_7.42
6	C_3.10,A_15.40,B_9.77
* cache:操作记录
*  1	A_2,C_5
2	A_10,B_3
3	C_15
4	A_3,C_5
5	B_3
6	A_5,B_5

map:
例如
1商品的推荐列表:1		A_9.87,B_2.38,C_23.90
1商品的操作记录:1		A_2,C_5
则对于1商品,由于A已经有2分,C已经右5分
应该把A和C从1的推荐列表中删除,
只保留B
而最终是要根据用户来推荐商品,于是将用户作为key,物品和推荐度作为value返回
(B,1_2.38)

reduce:
将同一用户推荐的商品合并输出

output:
A	5_6.45,3_4.44
B	4_3.27,1_2.38
C	6_3.10,2_4.25
* @author chenjie
*
*/
public class Step5 {

public static class Mapper5  extends Mapper<LongWritable,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();

private DecimalFormat df = new DecimalFormat("0.00");

/***
* 	将保存右侧矩阵的文件缓存到内存中,每一行为一个字符串,是所有行构成list
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileReader fr = new FileReader("itemUserScore3");
BufferedReader br = new BufferedReader(fr);
String line = null;
while((line = br.readLine()) != null)
{
cacheList.add(line);
System.out.println("----------------------cache line :" + line);
}
fr.close();
br.close();
}

/**
* 以
* 	1商品的推荐列表:1		A_9.87,B_2.38,C_23.90
1商品的操作记录:1		A_2,C_5
为例
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
System.out.println("-------------------map,key=" + key + "value=" + value);
String item_matrix1 = value.toString().split("\t")[0];
//推荐列表商品号 1
String[] user_score_array_matrix1 = value.toString().split("\t")[1].split(",");
//推荐列表 A_9.87,B_2.38,C_23.90

for(String line : cacheList)//商品的操作记录列表
{
String item_matrix2 = line.toString().split("\t")[0];
//操作记录商品号 1

String[] user_score_array_matrix2 = line.toString().split("\t")[1].split(",");
//操作记录  A_2,C_5

if(item_matrix1.equals(item_matrix2))//如果推荐列表商品号==操作记录商品号,证明是同一商品,才能操作
{
for(String user_score : user_score_array_matrix1)//对于推荐列表中每一个用户  A_9.87,B_2.38,C_23.90
{
boolean flag = false;//默认操作过标志位
String user_matrix1 = user_score.split("_")[0];
//用户ID
String score_matrix1 = user_score.split("_")[1];
//推荐度

for(String user_score2 : user_score_array_matrix2)//对于操作记录中的每一条记录  A_2,C_5
{
String user_matrix2 = user_score2.split("_")[0];
//用户ID
if(user_matrix1.equals(user_matrix2))//如果两个ID相等 如A_9.87 和A_2 则证明用户A操作过该商品
{
flag = true;
}
}
if(flag == false)//如果用户A没有操作过该物品
{
outKey.set(user_matrix1);//将用户ID作为Key
outValue.set(item_matrix1 + "_" +score_matrix1 );//将商品ID_推荐度作为value
context.write(outKey, outValue);//写入结果集
}
}
}
}
}
}

public static class Reducer5 extends Reducer<Text,Text,Text,Text>
{
private Text outKey = new Text();
private Text outValue = new Text();

/**
* 将map产生的key-value对进行组合,拼接成结果矩阵的物理形式
* ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* ("2","1_9")...
* ...
* 对于key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
* 会将其组合
* key : "1"
* values : {"2_7","3_1","2_4","4_0","5_9"}
*
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

StringBuilder sb = new StringBuilder();
for(Text text : values)
{
sb.append(text + ",");
}
// sb : "2_7,3_1,2_4,4_0,5_9,"
String line = "";
if(sb.toString().endsWith(","))
{
line = sb.substring(0,sb.length()-1);
}
//line :"2_7,3_1,2_4,4_0,5_9"
outKey.set(key);
outValue.set(line);
context.write(outKey, outValue);
// ("1","2_7,3_1,2_4,4_0,5_9")
}
}

private static final String INPATH = "hdfs://pc1:9000/output/tuijian4/part-r-00000";
private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian5";

private static final String CACHE = "hdfs://pc1:9000/output/tuijian1/part-r-00000";
private static final String HDFS = "hdfs://pc1:9000";

public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
//String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
String[] otherArgs = {INPATH,OUTPATH};
//这里需要配置参数即输入和输出的HDFS的文件路径
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//conf.set("fs.defaultFS",HDFS);
// JobConf conf1 = new JobConf(WordCount.class);
Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 设置job名称和
job.setJarByClass(Step5.class);
job.setMapperClass(Mapper5.class); //为job设置Mapper类
//job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(Reducer5.class); //为job设置Reduce类

job.addCacheArchive(new URI(CACHE + "#itemUserScore3"));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);        //设置输出key的类型
job.setOutputValueClass(Text.class);//  设置输出value的类型

job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //为map-reduce任务设置InputFormat实现类   设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类  设置输出路径

FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}
return job.waitForCompletion(true) ? 1 : -1;

/*Configuration conf = new Configuration();
conf.set("fs.defaultFS",HDFS);
Job job = Job.getInstance(conf,"step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
Path inPath = new Path(INPATH);
if(fs.exists(inPath))
{
//FileInputFormat.addInputPath(conf, inPath);
}
Path outPath = new Path(OUTPATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}*/

}

public static void main(String[] args)
{
try {
new Step5().run();
} catch (ClassNotFoundException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}


6、调度程序

每个程序可单独运行查看结果,也可串行

package hadoop2;

import java.io.IOException;
import java.net.URISyntaxException;

public class JobRunner {

public static void main(String[] args) {
int result1 = -1;
int result2 = -1;
int result3 = -1;
int result4 = -1;
int result5 = -1;
try {
result1 = new Step1().run();
}
catch (Exception e) {
result1 = -1;
}
if(result1 == 1)
{
System.out.println("Step1 run success");
try {
result2 = new Step2().run();
} catch (ClassNotFoundException | IOException | InterruptedException | URISyntaxException e) {
result2 = -1;
}
}
else
{
System.out.println("Step1 run failed");
}

if(result2 == 1)
{
System.out.println("Step2 run success");
try {
result3 = new Step3().run();
} catch (Exception e) {
result3 = -1;
}
}
else
{
System.out.println("Step2 run failed");
}

if(result3 == 1)
{
System.out.println("Step3 run success");
try {
result4 = new Step4().run();
} catch (Exception e) {
result4 = -1;
}
}
else
{
System.out.println("Step3 run failed");
}

if(result4 == 1)
{
System.out.println("Step4 run success");
try {
result5 = new Step5().run();
} catch (Exception e) {
result5 = -1;
}
}
else
{
System.out.println("Step4 run failed");
}

if(result5 == 1)
{
System.out.println("Step5 run success");
System.out.println("job finished ");
}
else
{
System.out.println("Step5 run failed");
}

}

}


7、各步的结果截图

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息