您的位置:首页 > 其它

MapReduce实现商品推荐算法(用户购买向量*商品同现矩阵)

2018-04-17 10:54 120 查看

基于Hadoop的商品推荐系统

推荐结果=用户的购买向量*物品的相似度矩阵


 物品的相似度:物品的共现次数(也可以使用欧氏距离等)

 预备工作

1.项目名:GRMS

2.添加Maven依赖:pom.xml

3.创建包:

com.briup.bigdata.project.grms

   |--step1

   |--step2

   |--...

   |--utils

4.将集群上的四个xml配置文件放到resources目录中。

5.在HDFS集群的根目录下创建目录:

   /grms

       |--rawdata/matrix.txt

       |--step1

       |--...

 

6.初始数据:matrix.txt

       10001      20001      1

       10001      20002      1

       10001      20005      1

       10001      20006      1

       10001      20007      1

       10002      20003      1

       10002      20004      1

       10002      20006      1

       10003      20002      1

       10003      20007      1

       10004      20001      1

       10004      20002      1

       10004      20005      1

       10004      20006      1

       10005      20001      1

       10006      20004      1

       10006      20007      1

这里1000开头的是用户编号,2000开头的是商品编号,最后一列是购买次数
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.briup.bigdata.project.grms</groupId>
<artifactId>GRMS</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
<build>
<finalName>grms</finalName>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>

</project>

//以上版本信息根据各人使用版本进行调整


8. 计算用户购买商品的列表

   类名:UserBuyGoodsList.java

        方法:  

        UserBuyGoodsList

        UserBuyGoodsListMapper

        UserBuyGoodsListReducer

代码实现

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

import java.io.IOException;
import java.util.Iterator;

public class UserBuyGoodsList extends Configured implements Tool {
static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tuple=value.toString().split("\t");
context.write(new Text(tuple[0]),new Text(tuple[1]));

}
}
static class UserBuyGoodsListReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
Iterator<Text> iterator=values.iterator();
StringBuilder builder=new StringBuilder();
while(iterator.hasNext()){
builder.append(iterator.next().toString()+",");
}
String result=builder.substring(0,builder.length()-1);
context.write(key,new Text(result));

}
}

public int run(String[] args) throws Exception {
Configuration conf = getConf();
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(UserBuyGoodsList.class);
job.setMapperClass(UserBuyGoodsListMapper.class);
job.setReducerClass(UserBuyGoodsListReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,in);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:-1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new GoodsConcurrenceList(),args));
}
}
以上思路:在Map端中对初始数据按行读取,每行数据按”/t”分隔放入数组,也就是说用户数据进入tuple[0],商品数据进入tuple[1],提交给reduce端处理,利用迭代器将key值相同的value值分别append到stringbuilder中(注意,不建议使用string,因为使用string + 会新生成一个字符串,在大数据中更消耗内存),以逗号分隔,最后在context中写入时注意substring最后一个字符(那个逗号没有意义)

 

run方法中配置作业,这种方法会相对比较浪费精力因为每次写一个新的类就要重新配置一次,本文最后会推荐一个将所有配置写成一个类,这样重新配置的时候可以轻松且可视一些。

 

结果数据:

       10001      20001,20005,20006,20007,20002

       10002      20006,20003,20004

       10003      20002,20007

       10004      20001,20002,20005,20006

       10005      20001

       10006      20004,20007

 

9.计算商品的共现关系

 文件:GoodsCooccurrenceList.java

 类名:GoodsCooccurrenceList

      GoodsCooccurrenceListMapper

      GoodsCooccurrenceListReducer

数据来源:第1步的计算结果

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class GoodsConcurrenceList extends Configured implements Tool{
private final static Text K = new Text();
private final static IntWritable V = new IntWritable(1);

static class GoodsConcurrenceListMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens=value.toString().split("\t");
String[] items =tokens[1].split(",");
for(int i=0;i<items.length;i++){
String itemA=items[i];
for(int j=0;j<items.length;j++){
String itemB=items[j];
K.set(itemA+"/t"+itemB);
context.write(K,V);
}
}
}
}
static class GoodsConcurrenceListReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int sum =0;
for(IntWritable value :values ){
sum += value.get();
}
V.set(sum);
context.write(key, V);
}
}
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf,this.getClass().getSimpleName());
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
job.setJarByClass(GoodsConcurrenceList.class);
job.setMapperClass(GoodsConcurrenceListMapper.class);
job.setReducerClass(GoodsConcurrenceListReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,in);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:-1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new GoodsConcurrenceList(),args));
}
}
以上思路:为了方便参数的设定,我定义了两个final的静态数据,分别是K,V 其实可以直接把计算结果放入context中。首先我们把UserBuyGoodsList得到的数据按行读取按”\t”分隔,但是同现矩阵是不需要用户数据的,所以要把tokens[1]再按”,”分隔存入数组items[],这里我们设计一个for循环,为了得到同现的商品编号,itemA与itemB匹配一次(设为K值)则偏移量(V)加一提交给reduce端(这里itemA和itemB都是从0开始的,也就是说会出现自己匹配自己的情况,这些我们在后面可以进行去重(也可以在矩阵相乘时忽略掉),这时可以脑补一下map端提交的数据应该是20001 20001 1     20001 200011     .... 也就是reduce端需要将同现的偏移量累加才能形成同现矩阵,所以用for循环迭代地将value值加到sum中再输出就可以了。

结果数据:

20001      20001      3

20001      20002      2

(数据过多浪费地方就不贴了)

10.计算商品共现矩阵

         文件:GoodsConcurrenceMatrix

         类名:GoodsConcurrenceMatrixMapper

                    GoodsConcurrenceMatrixReducer

将共现次数记为矩阵进入计算

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class GoodsConcurrenceMatrix extends Configured implements Tool {
static class GoodsConcurrenceMatrixMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
String s[] = value.toString().split("\t");
sb.append(s[1]).append(":").append(s[2]);
context.write(new Text(s[0]),new Text(sb.toString()));
}
}
static class GoodsConcurrenceMatrixReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
Iterator<Text> iterator = values.iterator();
if(iterator.hasNext()) builder.append(iterator.next()).append(",");
context.write(key,new Text(builder.toString().substring(0,builder.length()-1)));
}
}
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf,this.getClass().getSimpleName());
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
job.setJarByClass(GoodsConcurrenceMatrix.class);
job.setMapperClass(GoodsConcurrenceMatrixMapper.class);
job.setReducerClass(GoodsConcurrenceMatrixReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,in);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run( new GoodsConcurrenceMatrix(),args));
}
}
以上思路:由于我们已经把同现次数算出来了,只要把后面两项数据以”:”append进stringbuilder并提交给reduce端,而reduce端只需要将key值相同的value以”,”append起来输出即可(这里使用了if(iterator.hasNext()) 也可以使用循环语句)。

        计算结果:
            20001    20001:3,20002:2,20005:2,20006:2,20007:1
            20002    20001:2,20002:3,20005:2,20006:2,20007:2
            20003    20003:1,20004:1,20006:1
            20004    20003:1,20004:2,20006:1,20007:1
            20005    20001:2,20002:2,20005:2,20006:2,20007:1
            20006    20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
            20007    20001:1,20002:2,20004:1,20005:1,20006:1,20007:3

11. 计算用户的购买向量

    文件:UserBuyGoodsVector.java

    类名:UserBuyGoodsVector

         UserBuyGoodsVectorMapper

         UserBuyGoodsVectorReducer

    源数据:第1步的结果或者最原始数据。

       10001  20001,20005,20006,20007,20002

       10002   20006,20003,20004

       10003   20002,20007

       10004   20001,20002,20005,20006

       10005  20001

       10006   20004,20007

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;

import java.io.IOException;

public class UserBuyGoodsVector extends Configured implements Tool{
//源数据是UserBuyGoodsList的结果
static class UserBuyGoodsVectorMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s[] = value.toString().split("\t");
String vs[]=  s.toString().split(",");
for(String v:vs){
context.write(new Text(v),new Text(s[0]+":1"));
}
}
}
static class UserBuyGoodsVectorReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
StringBuilder sb=new StringBuilder();
for(Text value : values){
sb.append(value.toString()).append(",");
}
context.write(key,new Text(sb.substring(0,sb.length()-1)));
}
}

public int run(String[] strings) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = getConf();
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(UserBuyGoodsVector.class);
job.setMapperClass(UserBuyGoodsVectorMapper.class);
job.setReducerClass(UserBuyGoodsVectorReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,in);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:-1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new UserBuyGoodsVector(),args));
}
}
以上思路:如果无法一步到位,可以使用两次MapReduce达到目的,此代码尽可能的简化。

用两个字符串数组存储用户编号和商品编号,s[1]中以逗号分隔存储在vs中,分别作为key值,s[0]每一项记一次作为value值,提交给reduce端。这里我们可以理解为,每个商品对其被购买的用户编号记了一次数,接下来就要把他们合并。于是reduce端中创建一个stringbuilder把所有同key值的value用”,”append即可。

计算结果:

           20001      10001:1,10004:1,10005:1

           20002      10001:1,10003:1,10004:1

           20003      10002:1

           20004      10002:1,10006:1

           20005      10001:1,10004:1

           20006      10001:1,10002:1,10004:1

           20007      10001:1,10003:1,10006:1

12. 商品共现矩阵乘以用户购买向量,形成临时的推荐结果。

                  文件:MultiplyGoodsMatrixAndUserVector.java

                  类名:MultiplyGoodsMatrixAndUserVectorFirstMapper

                            MultiplyGoodsMatrixAndUserVectorSecondMapper

                  文件:MultiplyGoodsMatrixAndUserVectorReducer

                  思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。

                          1.保证两个Mapper的Key要相同。

                          2.两个Mapper的数据输出的Key和Value的数据类型是一致的。

                          3.在作业配置中,对于Mapper端的配置需要使用MultipleInputs.addInputPath(job,数据的输入路径,数据输入的格式控制器.class,执行的Mapper类.class);

                  原始数据:第3步和第4步的结果数据。

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MultiplyGoodsMatrixAndUserVector extends Configured implements Tool{
static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s[]=value.toString().split("\t");
context.write(new Text(s[0]),new Text("m"+s[1]));//处理矩阵
}
}
static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s[]=value.toString().split("\t");
context.write(new Text(s[0]),new Text("v"+s[1]));//处理用户购买向量
}
}

static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String ms[]=null;
String vs[]=null;
for(Text value:values){
String str=value.toString();
if(str.charAt(0)=='m'){
ms=str.substring(1).split(",");
}
if(str.charAt(0)=='v'){
vs=str.substring(1).split(",");
}
}
for (String m : ms) {
for (String v : vs) {
String[] mss = m.split(":");
String[] vss = v.split(":");
long vv = Long.parseLong(vss[1]);
long mm = Long.parseLong(mss[1]);
context.write(new Text(vss[0]+","+mss[0]),new Text((vv*mm)+""));
}
}
}
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MultiplyGoodsMatrixAndUserVector(),args));
}

public int run(String[] strings) throws Exception {
Configuration conf=getConf();
Path in1=new Path(conf.get("in1"));
Path in2=new Path(conf.get("in2"));
Path out=new Path(conf.get("out"));

Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

MultipleInputs.addInputPath(job,in1,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorFirstMapper.class);
MultipleInputs.addInputPath(job,in2,TextInputFormat.class,MultiplyGoodsMatrixAndUserVectorSecondMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(MultiplyGoodsMatrixAndUserVectorReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:1;
}
}
以上思路:由于要对两个矩阵相乘,故必须用两个Mapper将商品编号拎出来作为key值,将后面的内容作为value值都提交给reduce端,而这两个提交给reduce的数据必须标记出来,分别以”m”,”v”标记。在reduce端中,针对’m’开头的字符串,取’m’后的数据按’,’分隔存入数组ms,vs同理。现在我们得到的数据是有”:”的,而”:”前面的数据并不参与矩阵的乘法计算,所以再做一个for循环,分别将两个矩阵中所有数据按”:”分隔,将后面的数据转化为Long类型相乘,前面的数据整合到一起作为key值输出。

该类使用了两个map作为输入,所以在run方法中配置作业时要注意使用MultipleInputs.addInputPath的方法。

计算结果:

                          10001,20001   2

                          10001,20001   2

                          10001,20001   3

                          10001,20001   1

                          10001,20001   2

                          10001,20002   3

                          10001,20002   2

                          10001,20002   2

(数据过多浪费地方就不贴了)

13. 对第5步计算的推荐的零散结果进行求和。

                  文件:MakeSumForMultiplication.java

                            MakeSumForMultiplicationMapper

                            MakeSumForMultiplicationReducer

                  原始数据:第5步的计算结果

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MakeSumForMultiplication extends Configured implements Tool{
static class MakeSumForMultiplicationMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s[] = value.toString().split("\t");
context.write(new Text(s[0]),new Text(s[1]));
}
}
static class MakeSumForMultiplicationReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum = value.get();
}
context.write(key,new IntWritable(sum));
}
}

public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Path in = new Path(conf.get("in"));
Path out = new Path(conf.get("out"));
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(MakeSumForMultiplication.class);
job.setMapperClass(MakeSumForMultiplicationMapper.class);
job.setReducerClass(MakeSumForMultiplicationReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job,in);
TextOutputFormat.setOutputPath(job,out);

return job.waitForCompletion(true)?0:-1;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new MakeSumForMultiplication(),args));
}
}
以上思路:这里就是一个简单的词频统计了,将数据按”\t”分隔,前面为key,后面为value,提交给reduce端,key值相同的相加即可。

                  计算结果:

                          10001,20001   10

                          10001,20002   11

                          10001,20003   1

                          10001,20004   2

                          10001,20005   9

            (数据过多浪费地方就不贴了)

14.数据去重,在推荐结果中去掉用户已购买的商品信息。
        文件:DuplicateDataForResult.java
        类名:DuplicateDataForResultFirstMapper
              DuplicateDataForResultSecondMapper
              DuplicateDataForResultReducer
        数据来源:
            1.FirstMapper处理用户的购买列表数据。

            2.SecondMapper处理 MakeSumForMutiplication的推荐结果数据。

代码实现:

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.*;

import java.io.IOException;
import java.util.*;

public class DuplicateDataForResult extends Configured implements Tool{

static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] strs=value.toString().split("[\t]");
for(String s : strs[1].split(",")){
context.write(new Text(strs[0]+","+s),new Text("r"+value.toString()));
}
}
}

static class DuplicateDataForResultSecondMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] strs=value.toString().split("[\t]");
context.write(new Text(strs[0]),new Text("u"+value.toString()));
}
}

static class DuplicateDataForResultReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
Map<String,String> map=new HashMap<String, String>();
List<String> list=new ArrayList<String>();
for(Text value : values){
String val=value.toString();
if(val.charAt(0)=='r') list.add(key.toString());
if(val.charAt(0)=='u') map.put(key.toString(),val.substring(1));
}
for(String str : list){
map.remove(str);
}
for(String str : map.keySet()){
String[] strs=map.get(str).split(",");
context.write(new Text(strs[0]),new Text(strs[1]));
}
}
}

public int run(String[] args) throws Exception{
Configuration conf=getConf();
Path in1=new Path(conf.get("in1"));
Path in2=new Path(conf.get("in2"));
Path out=new Path(conf.get("out"));

Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

MultipleInputs.addInputPath(job,in1,TextInputFormat.class,DuplicateDataForResultFirstMapper.class);
MultipleInputs.addInputPath(job,in2,TextInputFormat.class,DuplicateDataForResultSecondMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(DuplicateDataForResultReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,out);

job.setNumReduceTasks(1);

return job.waitForCompletion(true)?0:1;
}
}
以上思路:将来自于UserBuyGoodsList中的数据中所有用户与其相应购买的商品编号用","并在一起提交给reduce,这样可以与从MakeSumForMultiplication得到的数据有相同的key值,由于是两个Mapper提交的数据,为了区分我们分别给他坐上标记"r","u"。在reduce端中,根据其来源不同分别做处理:将用户已购买商品信息存入List,将推荐信息存入HashMap,这里HashMap的key值在形式上和List相同,这样List就可以作为Map的一个索引,凡是存在于List中的,都是用户已经购买了的所以不需要推荐,于是将索引相同的Map数据remove掉,最后这个Map中存储的就是我们要的数据了,因为下一步要将数据存储进数据库所以输出的数据不需要有","了,用一个for循环输出所有相应key值的value(这个循环中的map.keySet()就是按顺序循环操作对应key值的map)

    计算结果:

            10001    20004    2
            10001    20003    1
            10002    20002    2
            10002    20007    2
            10002    20001    2
            10002    20005    2
            10003    20006    3
            10003    20005    3
            10003    20001    3
            10003    20004    1
            10004    20007    5
            10004    20004    1
            10004    20003    1
            10005    20006    2
            10005    20002    2
            10005    20005    2
            10005    20007    1
            10006    20006    2
            10006    20002    2
            10006    20005    1
            10006    20003    1
            10006    20001    1
15.将推荐结果保存到MySQL数据库中
        注意:
            a.保证表提前存在。
                grms.results(uid varchar(20),
                gid varchar(20),
                exp int)
            b.通过MR程序将HDFS集群上的数据保存到MySQL数据库中的时候,只能将最终输出的Key值保存到数据库中。

            c.自定义最终输出的Key的数据类型。自定义的类实现WritableComparable<自定义的类>,但是作为将数据从HDFS集群输出到MySQL数据库中的Key,还要实现DBWritable接口。

readFields(ResultSet rs)
            write(PrepareStatement ps)

            A impl WC,DBW{
                private String uid;
                private String gid;
                private int exp;
            
                readFields(ResultSet rs){
                    uid=rs.getString(1);
                }

                write(PrepareStatement ps){
                    ps.setString(1,uid);
                    ps.setString(2,gid);
                    ps.setInt(1,exp);
                }
            }
            d.在作业配置中,需要使用DBConfiguration.setConfiguration()指定连接数据库的相关参数。
                参数1:和当前作业相关的配置对象,Configuration对象要通过Job对象来获取;
                参数2:"com.mysql.jdbc.Driver"
                参数3:"jdbc:mysql://ip:port/grms"
                参数4和5:"用户名"和"密码"。
            e.数据输出的格式控制需要使用DBOutputFormat。
                DBOutputFormat.setOutput();有三个参数:
                    参数1:Job对象。
                    参数2:数据库表名
                    参数3:可变长参数,指的是往数据库中插入的列名。
                    insert into 数据库表名 values(?,?,?);
        文件:SaveRecommendResultToDB.java
        类名:SaveRecommendResultToDBMapper<LW,Text,Text,Text>
              SaveRecommendResultToDBReducer<Text,Text,自定义的Key,NullWritable>
        数据来源:第7步的结果数据。
        数据去向:MySQL数据库,grms.result

package com.briup.bigdata.project.grms;
import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

public class SaveRecommendResultToDB extends Configured implements Tool{
static class SaveRecommendResultToDBMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String[] strs=value.toString().split("\t");
context.write(new Text(strs[0]+"\t"+strs[1]),new Text(strs[2]));
}
}

static class SaveRecommendResultToDBReducer extends Reducer<Text,Text,RecommendResultDB,NullWritable>{
private RecommendResultDB rrdb=new RecommendResultDB();

@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
String[] strs=key.toString().split("\t");
rrdb.setUid(strs[0]);
rrdb.setGid(strs[1]);
rrdb.setExp(Integer.parseInt(values.iterator().next().toString()));
context.write(rrdb,NullWritable.get());
}
}

public int run(String[] args) throws Exception{
Configuration conf=getConf();
Path in=new Path(conf.get("in"));

Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

job.setMapperClass(SaveRecommendResultToDBMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,in);

job.setReducerClass(SaveRecommendResultToDBReducer.class);
job.setOutputKeyClass(RecommendResultDB.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(DBOutputFormat.class);
Properties prop=new Properties();
prop.load(this.getClass().getResourceAsStream("/db.properties"));
DBConfiguration.configureDB(job.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password"));
DBOutputFormat.setOutput(job,prop.getProperty("grms.tblname"),"uid","gid","exp");

return job.waitForCompletion(true)?0:1;
}
}


package com.briup.bigdata.project.grms;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class RecommendResultDB implements DBWritable,WritableComparable<RecommendResultDB>{
private String uid;
private String gid;
private int exp;

public RecommendResultDB(){
}

public RecommendResultDB(String uid,String gid,int exp){
this.uid=uid;
this.gid=gid;
this.exp=exp;
}

@Override
public int compareTo(RecommendResultDB o){
int uidComp=this.uid.compareTo(o.uid);
int gidComp=this.gid.compareTo(o.gid);
int indexComp=this.exp-o.exp;
return uidComp==0?(gidComp==0?indexComp:gidComp):uidComp;
}

@Override
public void write(DataOutput out) throws IOException{
out.writeUTF(uid);
out.writeUTF(gid);
out.writeInt(exp);
}

@Override
public void readFields(DataInput in) throws IOException{
uid=in.readUTF();
gid=in.readUTF();
exp=in.readInt();
}

@Override
public void write(PreparedStatement preparedStatement) throws SQLException{
preparedStatement.setString(1,uid);
preparedStatement.setString(2,gid);
preparedStatement.setInt(3,exp);
}

@Override
public void readFields(ResultSet resultSet) throws SQLException{
if(resultSet==null) return;
uid=resultSet.getString(1);
gid=resultSet.getString(2);
exp=resultSet.getInt(3);
}

@Override
public boolean equals(Object o){
if(this==o) return true;
if(!(o instanceof RecommendResultDB)) return false;
RecommendResultDB that=(RecommendResultDB)o;
return getExp()==that.getExp()&&Objects.equals(getUid(),that.getUid())&&Objects.equals(getGid(),that.getGid());
}

@Override
public int hashCode(){

return Objects.hash(getUid(),getGid(),getExp());
}

public String getUid(){
return uid;
}

public void setUid(String uid){
this.uid=uid;
}

public String getGid(){
return gid;
}

public void setGid(String gid){
this.gid=gid;
}

public int getExp(){
return exp;
}

public void setExp(int exp){
this.exp=exp;
}

@Override
public String toString(){
return "RecommendResultDB{"+"uid='"+uid+'\''+", gid='"+gid+'\''+", exp="+exp+'}';
}
}
以上思路:设计一个RecommendResultDB类 在RecommendResulttoDB中的reduce端,将map拆分好的三个字符串按照uid,gid,exp的顺序存储进数据库。

16. 构建作业流对象(JobControl),让程序自行提交作业。

                  文件:GoodsRecommendationManagementSystemJobController.java

                  类名:GoodsRecommendationManagementSystemJobController

                          1.可以看到我们上面设计了9个类,其中一个是对RecommendResulttoDB的没有输入输出,为了方便我们一次性对所有作业配置,分别创建step1到step8的Job对象,然后进行各自的作业配置。

                          2.创建8个ControlledJob对象,将上一步的Job对象转化成可被控制的作业。

                          3.对可被控制的作业添加依赖关系。

                          4.构建JobControl对象,将8个可被控制的作业逐个添加。

                          5.构建线程对象,并启动线程,执行作业。

package com.briup.bigdata.project.grms;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.util.Properties;

public class GoodsRecommendationManagemetSystemJobController extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Path in1=new Path(conf.get("in1"));
Path out1=new Path(conf.get("out1"));
Path out2=new Path(conf.get("out2"));
Path out3=new Path(conf.get("out3"));
Path out4=new Path(conf.get("out4"));
Path out5=new Path(conf.get("out5"));
Path out6=new Path(conf.get("out6"));
Path out7=new Path(conf.get("out7"));
//--step1--
Job job1=Job.getInstance(conf,UserBuyGoodsList.class.getSimpleName());
job1.setJarByClass(this.getClass());

job1.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job1,in1);

job1.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job1,out1);

//--step2--
Job job2=Job.getInstance(conf,GoodsConcurrenceList.class.getSimpleName());
job2.setJarByClass(this.getClass());

job2.setMapperClass(GoodsConcurrenceList.GoodsConcurrenceListMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job2,out1);

job2.setReducerClass(GoodsConcurrenceList.GoodsConcurrenceListReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job2,out2);

//--step3--
Job job3=Job.getInstance(conf,GoodsConcurrenceMatrix.class.getSimpleName());
job3.setJarByClass(this.getClass());

job3.setMapperClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixMapper.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job3,out2);

job3.setReducerClass(GoodsConcurrenceMatrix.GoodsConcurrenceMatrixReducer.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job3,out3);

//--step4--
Job job4=Job.getInstance(conf,UserBuyGoodsVector.class.getSimpleName());
job4.setJarByClass(this.getClass());

job4.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class);
job4.setMapOutputKeyClass(Text.class);
job4.setMapOutputValueClass(Text.class);
job4.setInputFormatClass(TextInputFormat.class);
// 数据来源:第1步的计算结果或者原始数据
TextInputFormat.addInputPath(job4,out1);

job4.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);
job4.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job4,out4);

//--step5--
Job job5=Job.getInstance(conf,MultiplyGoodsMatrixAndUserVector.class.getSimpleName());
job5.setJarByClass(this.getClass());

// 数据来源:第1步的计算结果或者原始数据
MultipleInputs.addInputPath(job5,out3,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class);
MultipleInputs.addInputPath(job5,out4,TextInputFormat.class,MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class);
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(Text.class);

job5.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(Text.class);
job5.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job5,out5);

//--step6--
Job job6=Job.getInstance(conf,MakeSumForMultiplication.class.getSimpleName());
job6.setJarByClass(this.getClass());

job6.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class);
job6.setMapOutputKeyClass(Text.class);
job6.setMapOutputValueClass(LongWritable.class);
job6.setInputFormatClass(TextInputFormat.class);
// 数据来源:第5步的计算结果
TextInputFormat.addInputPath(job6,out5);

job6.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class);
job6.setOutputKeyClass(Text.class);
job6.setOutputValueClass(LongWritable.class);
job6.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job6,out6);

//--step7--
Job job7=Job.getInstance(conf,DuplicateDataForResult.class.getSimpleName());
job7.setJarByClass(this.getClass());

MultipleInputs.addInputPath(job7,out1,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultFirstMapper.class);
MultipleInputs.addInputPath(job7,out6,TextInputFormat.class,DuplicateDataForResult.DuplicateDataForResultSecondMapper.class);
job7.setMapOutputKeyClass(Text.class);
job7.setMapOutputValueClass(Text.class);

job7.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class);
job7.setOutputKeyClass(Text.class);
job7.setOutputValueClass(Text.class);
job7.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job7,out7);

job7.setNumReduceTasks(1);

//--step8--
Job job8=Job.getInstance(conf,SaveRecommendResultToDB.class.getSimpleName());
job8.setJarByClass(this.getClass());

job8.setMapperClass(SaveRecommendResultToDB.SaveRecommendResultToDBMapper.class);
job8.setMapOutputKeyClass(Text.class);
job8.setMapOutputValueClass(Text.class);
job8.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job8,out7);

job8.setReducerClass(SaveRecommendResultToDB.SaveRecommendResultToDBReducer.class);
job8.setOutputKeyClass(RecommendResultDB.class);
job8.setOutputValueClass(NullWritable.class);
job8.setOutputFormatClass(DBOutputFormat.class);
Properties prop=new Properties();
prop.load(this.getClass().getResourceAsStream("/db.properties"));
DBConfiguration.configureDB(job8.getConfiguration(),prop.getProperty("grms.driver"),prop.getProperty("grms.url"),prop.getProperty("grms.username"),prop.getProperty("grms.password"));
DBOutputFormat.setOutput(job8,prop.getProperty("grms.tblname"),"uid","gid","exp");

// JobController
ControlledJob cj1=new ControlledJob(job1.getConfiguration());
cj1.setJob(job1);

ControlledJob cj2=new ControlledJob(job2.getConfiguration());
cj2.setJob(job2);

ControlledJob cj3=new ControlledJob(job3.getConfiguration());
cj3.setJob(job3);

ControlledJob cj4=new ControlledJob(job4.getConfiguration());
cj4.setJob(job4);

ControlledJob cj5=new ControlledJob(job5.getConfiguration());
cj5.setJob(job5);

ControlledJob cj6=new ControlledJob(job6.getConfiguration());
cj6.setJob(job6);

ControlledJob cj7=new ControlledJob(job7.getConfiguration());
cj7.setJob(job7);

ControlledJob cj8=new ControlledJob(job8.getConfiguration());
cj8.setJob(job8);

// 添加作业之间的依赖关系
cj2.addDependingJob(cj1);
cj3.addDependingJob(cj2);
cj4.addDependingJob(cj1);
cj5.addDependingJob(cj3);
cj5.addDependingJob(cj4);
cj6.addDependingJob(cj5);
cj7.addDependingJob(cj1);
cj7.addDependingJob(cj6);
cj8.addDependingJob(cj7);
// 创建JobControl对象,添加ControlledJob
JobControl jc=new JobControl(this.getClass().getSimpleName());
jc.addJob(cj1);
jc.addJob(cj2);
jc.addJob(cj3);
jc.addJob(cj4);
jc.addJob(cj5);
jc.addJob(cj6);
jc.addJob(cj7);
jc.addJob(cj8);
// 构建线程类对象,执行作业
Thread thread=new Thread(jc);
thread.start();
do{
for(ControlledJob cj : jc.getRunningJobList()){
cj.getJob().monitorAndPrintJob();
}
}while(!jc.allFinished());

return 0;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new GoodsRecommendationManagemetSystemJobController(),args));
}
}
到了这里我们就可以把上面所有的类中的main函数都注释掉,打包jar然后在yarn集群中执行GoodsRecommendationManagementSystemJobController程序了,注意-Din和-Dout的使用

我们的商品推荐算法已经基本上完成了。

有条件的可以写一个脚本用于Yarn集群下执行shell命令免得一个一个输-D,挺累的。

附加:db.properties

grms.driver=com.mysql.jdbc.Driver
grms.url=jdbc:mysql://ud2:5721/grms
grms.username=root
grms.password=root
grms.tblname=results
JobUtils类,用于将所有作业配置(除了那些导入多个Mapper的)简化,请根据代码自行简化
package com.briup.bigdata.project.grms.utils;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class JobUtil{
private static Job job;
private static String in;
private static String out;
private static Configuration configuration;

public static void setConf(                 // 定义setConf方法
Configuration c,                        // MapReduce作业传递的整个作业的配置对象
Class cz,                               // MapReduce作业志行的jar包中包含的主类的镜像
String name,                            // 作业的名字
String vin,                             // 数据的输入路径
String vout                             // 数据的输出路径
){
try{
if(c==null){
throw new Exception("配置信息不能为null。");
}
job=Job.getInstance(c,name);        // 构建Job对象,设置配置对象和作业名
job.setJarByClass(cz);              // 提供执行的作业的主类的镜像
in=vin;                             // 将数据的输入路径传递给全局变量
out=vout;                           // 将数据的输出路径传递给全局变变量
configuration=c;
}catch(Exception e){
e.printStackTrace();
}
}

public static void setMapper(               // 定义setMapper方法
Class<? extends Mapper> x,              // 设置作业中运行的Mapper类的镜像参数
Class<? extends Writable> y,            // 设置作业中Mapper的Key的数据类型参数
Class<? extends Writable> z,            // 设置作业中Mapper的Value的数据类型参数
Class<? extends InputFormat> o          // 设置作业中数据输入的格式参数
){
try{
job.setMapperClass(x);
job.setMapOutputKeyClass(y);
job.setMapOutputValueClass(z);
job.setInputFormatClass(o);
o.getMethod("addInputPath",Job.class,Path.class).invoke(null,job,new Path(in));
}catch(Exception e){
e.printStackTrace();
}
}

public static void setReducer(              // 定义setReducer方法
Class<? extends Reducer> a,             // 设置作业中运行的Reducer类的镜像参数
Class<? extends Writable> b,            // 设置作业中Reducer的Key的数据类型参数
Class<? extends Writable> c,            // 设置作业中Reducer的Value的数据类型参数
Class<? extends OutputFormat> d,        // 设置作业中数据输出的格式参数
int rnum                                // 设置Reducer的个数
){
try{
job.setReducerClass(a);
job.setOutputKeyClass(b);
job.setOutputValueClass(c);
job.setOutputFormatClass(d);
d.getMethod("setOutputPath",Job.class,Path.class).invoke(null,job,new Path(out));
job.setNumReduceTasks(rnum);
}catch(Exception e){
e.printStackTrace();
}
}

public static void setTotalSort(float a,int b,int c) throws InterruptedException, IOException, ClassNotFoundException, URISyntaxException{
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.writePartitionFile(job,new RandomSampler(a,b,c));
job.addCacheFile(new URI(TotalOrderPartitioner.getPartitionFile(getConfiguration())));
}

public static void setSecondarySort(Class<? extends WritableComparator> g,Class<? extends WritableComparator> s,Class<? extends Partitioner> p) throws ClassNotFoundException{
job.setPartitionerClass(p);
job.setGroupingComparatorClass(g);
job.setSortComparatorClass(s);
}

public static void setCombiner(boolean flag,Class<? extends Reducer> combiner){
if(flag&&combiner!=null) job.setCombinerClass(combiner);
}

public static int commit() throws Exception{
return job.waitForCompletion(true)?0:1;         // 提交作业
}

public static Job getJob(){
return job;
}

public static void setJob(Job xyz){
JobUtil.job=xyz;
}

public static Configuration getConfiguration(){
return job.getConfiguration();
}

public static void setConfiguration(Configuration configuration){
JobUtil.configuration=configuration;
}
}


阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: