您的位置:首页 > 运维架构

PageRank Hadoop MapReduce

2016-05-14 14:25 274 查看


links.txt

链接关系

A B C D
B A D
C C
D B C


part-r-00000 初试概率分布向量

a=0.8

A a 0.25
B a 0.25
C a 0.25
D a 0.25


PageRankMapReduce

package org.bigdata.pagerank;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bigdata.util.HadoopCfg;
import org.bigdata.util.HadoopUtil;

/**
* PageRank
*
* @author wwhhf
*
*/
public class PageRankMapReduce {

private final static String JOB_NAME = "PageRank";
private static String LINKS = "links";

private static Map<String, Double> rand = new HashMap<String, Double>();

private static final double a = 0.8;

public static void initRand(String pathin, String filename)
throws IOException {
List<String> lines = HadoopUtil.lslFile(pathin, filename);
for (String line : lines) {
String terms[] = line.toString().split("\t");
rand.put(terms[0], Double.valueOf(terms[1]));
}
}

private static class PageRankMapper extends
Mapper<Text, Text, Text, DoubleWritable> {

@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String filename = key.toString();
if (filename.startsWith(LINKS)) {
String dests[] = value.toString().split(" ");
double e = rand.get(dests[0]);
for (int i = 0, len = dests.length; i < len; i++) {
String dest = dests[i];
if (i == 0) {
context.write(new Text(dest), new DoubleWritable(0.0));
} else {
context.write(new Text(dest), new DoubleWritable(e
/ (len - 1)));
}
}
}
}

}

private static class PageRankReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {

@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
Double sum = 0.0;
for (DoubleWritable value : values) {
sum += value.get();
}
double e = rand.get(key.toString());
context.write(key, new DoubleWritable(a * sum + (1 - a) * e));
}

}

public static void solve(String linksin, String pathin, String pathout)
throws ClassNotFoundException, InterruptedException {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName(JOB_NAME);
job.setJarByClass(PageRankMapReduce.class);
job.setInputFormatClass(FileNameInputFormat.class);

// mapper
job.setMapperClass(PageRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);

// reducer
job.setReducerClass(PageRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(pathin));
FileInputFormat.addInputPath(job, new Path(linksin));
FileOutputFormat.setOutputPath(job, new Path(pathout));

job.waitForCompletion(true);

} catch (IllegalStateException | IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws ClassNotFoundException,
InterruptedException, IOException {
String path = "/pagerank";
String links_pathin = "/pagerank_links";
String filename = "part-r-00000";
String tmp_pathin = path;
for (int i = 1; i <= 5; i++) {
initRand(tmp_pathin, filename);
String tmp_pathout = path + i;
System.out.println(links_pathin + " " + tmp_pathin + " "
+ tmp_pathout);
solve(links_pathin, tmp_pathin, tmp_pathout);
tmp_pathin = tmp_pathout;
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: