您的位置:首页 > 编程语言 > Java开发

MapReducer-找共同好友 分类: Java hadoop 2015-06-25 22:31 71人阅读 评论(0) 收藏

2015-06-25 22:31 651 查看
package com.billstudy.mr.friends;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 找朋友
*  共同好友
原始数据:每个人的好友列表
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
……

输出结果:每个人和其他各人所拥有的功能好友
A-B	C,E,
A-C	D,F,
A-D	E,F,
A-E	B,C,D,
A-F	B,C,D,E,O,
A-G	C,D,E,F,
A-H	C,D,E,O,
A-I	O,
A-J	B,O,
A-K	C,D,
A-L	D,E,F,
A-M	E,F,
B-C	A,
B-D	A,E,
……
* @author Bill
* @since V1.0 2015年6月24日 - 下午4:53:01
*/
public class ShareFriends {

/**
*
* 把拥有同一个朋友的放到同一组
*
* 将
*  A:B,C,D,F,E,O
*
* 输出:
* B	A
* C	A
* D	A
* F	A
* ....
*
* @author Bill
* @since V1.0 2015年6月24日 - 下午5:15:21
*/
static class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {

private final Text k = new Text();

private final Text v = new Text();

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {

String[] persons = value.toString().split(":");

if(persons.length != 2){
return;
}

// 切分字段
String self = persons[0];
String[] friends = persons[1].split(",");

v.set(self);
for (int i = 0; i < friends.length; i++) {
k.set(friends[i]);
context.write(k, v);
}
}
}

/**
*
* 把拥有同一个朋友的拼接到一起,输出
*
* 将
* A	{B,C,D}
*
* 输出:
* A	B-C-D
* @author Bill
* @since V1.0 2015年6月24日 - 下午5:14:26
*/
static class Reducer1 extends Reducer<Text, Text, Text, Text>{

// private final Text k = new Text();

private final Text v = new Text();

@Override
protected void reduce(Text key, Iterable<Text> friends, Context context)
throws IOException, InterruptedException {

StringBuilder friendNames = new StringBuilder();

for (Text friend : friends) {
friendNames.append(friend.toString() + "-");
}

// 去掉最后一个杠杠
v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "" );

context.write(key, v);

}
}

/**
* 将拥有同一个朋友的人排序后两两拼接输出,让朋友任意的一对组合都可以分到同一组
*
* 将
* A	B-C-D-E-F
*
* 输出:
* B-C	A
* B-D	A
* B-E	A
* B-F	A
* C-D	A
* ....
* @author Bill
* @since V1.0 2015年6月24日 - 下午5:12:44
*/
static class Mapper2 extends Mapper<LongWritable, Text, Text, Text>{

private final Text k = new Text();

private final Text v = new Text();

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {

String[] persons = value.toString().split("\t");
String self = persons[0];
String[] friends = persons[1].split("-");

// 此处必须要对其朋友排序,否则交叉输出时会导致A-B:D / B-A:F 的问题出现,实际上述两个key为同一组。应该为:A-B:D,F
Arrays.sort(friends);

v.set(self);

// 交叉打印
for (int i = 0; i < friends.length - 1; i++) {
for (int j = i + 1; j < friends.length; j++) {
k.set(friends[i] + "-" + friends[j]);
context.write(k, v);
}
}
}
}

/**
*
* 把分到同一组组合的朋友拼接输出
*
* 将类似:
* 	A-B B
*  A-B C
*  A-B D
*
* 输出:
* A-B	B,C,D
* @author Bill
* @since V1.0 2015年6月24日 - 下午5:11:24
*/
static class Reducer2 extends Reducer<Text,Text,Text,Text>{

// private final Text k = new Text();

private final Text v = new Text();

@Override
protected void reduce(Text pair, Iterable<Text> friends,Context context)
throws IOException, InterruptedException {

StringBuilder friendNames = new StringBuilder();

for (Text friend : friends) {
friendNames.append(friend.toString() + ",");
}

// 去掉逗号
v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "");

context.write(pair, v);

}

}

public static void main(String[] args) throws Exception {

if (args.length != 3) {
System.err.println("Usage:<job1-inpath> <job1-outPath> <job2-outPath>");
System.exit(1);
}

Configuration conf = new Configuration();

// 创建路径,清除旧数据
Path job1InputPath = new Path(args[0]);
Path job1OutputPath = new Path(args[1]);
Path job2OutputPath = new Path(args[2]);

FileSystem fs = FileSystem.get(conf);
if (fs.exists(job1OutputPath)) {
fs.delete(job1OutputPath, true);
}

if (fs.exists(job2OutputPath)) {
fs.delete(job2OutputPath, true);
}

// job1
Job job1 = Job.getInstance(conf);
job1.setJarByClass(ShareFriends.class);
job1.setMapperClass(Mapper1.class);
job1.setReducerClass(Reducer1.class);

job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, job1InputPath);
FileOutputFormat.setOutputPath(job1, job1OutputPath);

// job2
Job job2 = Job.getInstance(conf);
job2.setJarByClass(ShareFriends.class);
job2.setMapperClass(Mapper2.class);
job2.setReducerClass(Reducer2.class);

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job2, job1OutputPath);
FileOutputFormat.setOutputPath(job2, job2OutputPath);

// 控制依赖
ControlledJob controlledJob1 = new ControlledJob(conf);
ControlledJob controlledJob2 = new ControlledJob(conf);

controlledJob1.setJob(job1);
controlledJob2.setJob(job2);
controlledJob2.addDependingJob(controlledJob1);

JobControl jobControl = new JobControl("share-friends");
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);

// 创建线程,开始执行任务
Thread shareFriendExecuteThread = new Thread(jobControl);
shareFriendExecuteThread.start();

while(!jobControl.allFinished()){
TimeUnit.SECONDS.sleep(1);
}

jobControl.stop();

//  弹出两个job的输出结果文件夹
Runtime.getRuntime().exec("cmd.exe /c start " + job1OutputPath.toUri().getPath().substring(1));
Runtime.getRuntime().exec("cmd.exe /c start " + job2OutputPath.toUri().getPath().substring(1));
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: