您的位置:首页 > 其它

MapReduce实例之PageRank

2016-09-26 08:25 211 查看
PageRank原理

维基百科 https://en.wikipedia.org/wiki/PageRank

浅析PageRank算法 http://blog.codinglabs.org/articles/intro-to-pagerank.html

PageRank算法 http://blog.csdn.net/hguisu/article/details/7996185

PageRank算法简介及Map-Reduce实现 http://blog.sae.sina.com.cn/archives/4576

HDFS目录结构

input/PageLinks

PageLinks存储的是每一个页面链接到其它页面的数据



存储示例:

A,B,C,D
B,A,C
C,D
D,A,B


input/Pages

Pages存储的是所有的页面数据

存储示例:

A
B
C
D


output/PageCount

PageCount存储的是所有的页面总计数

output/PageMatrix

PageMatrix存储的是所有页面的转移稀疏矩阵数据



output/PageRank1

交替计算新的PageRank值输出结果

output/PageRank2

交替计算新的PageRank值输出结果

PageMatrix: 依据PageLinks资料计算M概率转移矩阵

Mapper input: LongWritable, Text; output: Text, Text

input: (0, (A,B,C,D)) =>output: (A, B)(A,C)(A,D)

PageMatrixMapper.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PageMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();

@Override
protected void setup(Context context) throws IOException,
InterruptedException {

}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();

String[] values = lineValue.split(",");

if (values.length < 2) {
return;
}

int index = 0;
for (String val : values) {
if (index == 0) {
outputKey.set(val);
} else {
outputValue.set(val);
context.write(outputKey, outputValue);
}

index++;
}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {

}
}


Reducer input: Text, Text; output: Text, PageRankWritable

input: (A, (B,C,D)) => output: (B, (M,1/3)), (C,(M,1/3)),(D,(M,1/3))

PageMatrixReducer.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PageMatrixReducer extends
Reducer<Text, Text, Text, PageRankWritable> {
private Text outputKey = new Text();
private PageRankWritable outputValue = new PageRankWritable();
private ArrayList<String> outputKeys = new ArrayList<String>();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int length = 0;

outputKeys.clear();
for (Text value : values) {
outputKeys.add(value.toString());
length++;
}

if (length == 0) {
return;
}

for (String outKey : outputKeys) {
outputKey.set(outKey);
outputValue.set("M", 1D / length);
context.write(outputKey, outputValue);
}
}
}


PageRankWritable属性定义

tag: String;

value: Double;

PageRankWritable.java

package org.feather.mapreduce.PageRank;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PageRankWritable implements WritableComparable<PageRankWritable> {
private String tag;
private Double value;

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}

public Double getValue() {
return value;
}

public void setValue(Double value) {
this.value = value;
}

public void set(String tag, Double value) {
this.setTag(tag);
this.setValue(value);
}

public PageRankWritable() {

}

public PageRankWritable(String tag, Double value) {
this.set(tag, value);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.tag);
out.writeDouble(this.value);
}

@Override
public void readFields(DataInput in) throws IOException {
this.tag = in.readUTF();
this.value = in.readDouble();
}

@Override
public int compareTo(PageRankWritable o) {
int comp = this.getTag().compareTo(o.getTag());

if (comp != 0) {
return comp;
}

return this.getValue().compareTo(o.getValue());
}

@Override
public String toString() {
return tag + "," + value.toString();
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((tag == null) ? 0 : tag.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PageRankWritable other = (PageRankWritable) obj;
if (tag == null) {
if (other.tag != null)
return false;
} else if (!tag.equals(other.tag))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}


PageMatrix.java

package org.feather.mapreduce.PageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PageMatrix extends Configured implements Tool {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int status = ToolRunner.run(configuration, new PageMatrix(), args);
System.exit(status);
}

@Override
public int run(String[] args) throws Exception {
Configuration configuration = this.getConf();

Job job = Job.getInstance(configuration, this.getClass()
.getSimpleName());
job.setJarByClass(this.getClass());

Path inputPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath);

job.setMapperClass(PageMatrixMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(PageMatrixReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PageRankWritable.class);

Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);

boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
}


在Hadoop中运行

$ bin/hdfs dfs -rm -r output/PageMatrix
$ bin/yarn jar jars/PageRank.jar org.feather.mapreduce.PageRank.PageMatrix input/PageLinks output/PageMatrix


PageCount: 依据Pages资料计算总页数

Mapper input: LongWritable, Text; output: Text, LongWritable

input: (0, A) => output: (PC, 1)

input: (1, B) => output: (PC, 1)

PageCountMapper.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PageCountMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private Text outputKey = new Text();
private LongWritable outputValue = new LongWritable();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
outputKey.set("PC");
outputValue.set(1);
context.write(outputKey, outputValue);
}
}


此处需要设置Combiner以大大降低磁盘和网络IO

Combiner与Reducer是同一个类

Reducer input: Text, LongWritable; output: Text, LongWritable

input: (PC, 1) (PC, 1) => output: (PC, 2)

PageCountReducer.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PageCountReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
private Text outputKey = new Text();
private LongWritable outputValue = new LongWritable();

@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
Long count = 0L;
for (LongWritable value : values) {
count += value.get();
}
outputKey.set(key);
outputValue.set(count);
context.write(outputKey, outputValue);
}
}


PageCount.java

package org.feather.mapreduce.PageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PageCount extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

Path inputPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputPath);

job.setMapperClass(PageCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setCombinerClass(PageCountReducer.class);

job.setReducerClass(PageCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);

boolean isSuccessful = job.waitForCompletion(true);
return isSuccessful ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new PageCount(), args);
System.exit(status);
}
}


在Hadoop中运行

$ bin/hdfs dfs -rm -r output/PageCount
$ bin/yarn jar jars/PageRank.jar org.feather.mapreduce.PageRank.PageCount input/Pages output/PageCount


依据Pages和PageCount计算V网页排名初始值PageRank值

Mapper input: LongWritable, Text; output: Text, LongWritable

PageCount值加载到内存

input: (0, A) => output: (A, 1/PageCount)

input: (1, B) => output: (B, 1/PageCount)

InitPageRankMapper.java

package org.feather.mapreduce.PageRank;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class InitPageRankMapper extends
Mapper<LongWritable, Text, Text, PageRankWritable> {
private HashMap<String, Long> pageCount = new HashMap<String, Long>();
private Text outputKey = new Text();
private PageRankWritable outputValue = new PageRankWritable();

@SuppressWarnings("deprecation")
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader br = null;
String pageCountLine = null;

Path[] uris = context.getLocalCacheFiles();

for (Path uri : uris) {
if (uri.toString().endsWith("part-r-00000")) {
br = new BufferedReader(new FileReader(uri.toString()));

while (null != (pageCountLine = br.readLine())) {
String[] counts = pageCountLine.split("\t");
if (counts.length == 2) {
String key = counts[0];
Long value = Long.valueOf(counts[1]);
pageCount.put(key, value);
}
}
}
}
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Long count = pageCount.get("PC");

if (count == null) {
return;
}

outputKey.set(value);
outputValue.set("V", 1D / count);
context.write(outputKey, outputValue);
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
pageCount.clear();
pageCount = null;
}
}


没有Reducer类

InitPageRank.java

package org.feather.mapreduce.PageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class InitPageRank extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

job.addCacheFile(new Path(args[0]).toUri());

job.setNumReduceTasks(0);

Path inputPath = new Path(args[1]);
FileInputFormat.addInputPath(job, inputPath);

job.setMapperClass(InitPageRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageRankWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PageRankWritable.class);

Path outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);

boolean isSuccessful = job.waitForCompletion(true);
return isSuccessful ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new InitPageRank(), args);
System.exit(status);
}
}


在Hadoop中运行

$ bin/hdfs dfs -rm -r output/PageRank1
$ bin/yarn jar jars/PageRank.jar org.feather.mapreduce.PageRank.InitPageRank output/PageCount/part-r-00000 input/Pages output/PageRank1


依据PageMatrix和PageRank值计算新的PageRank值(循环调用)

Mapper input: LongWritable, Text; output: Text, PageRankWritable

input: (0, (A, M, 1/3)) => output: (A, (M, 1/3))

input: (1, (A, M, 1/3)) => output: (A, (M, 1/2))

input: (2, (A, V, 1/PageCount)) => output: (A, (V, 1/PageCount))

PageRankMapper.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class PageRankMapper extends
Mapper<LongWritable, Text, Text, PageRankWritable> {
private Text outputKey = new Text();
private PageRankWritable outputValue = new PageRankWritable();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String valueLine = value.toString();

String[] values = valueLine.split("\t");

if (values.length != 2) {
return;
}

String[] outputValues = values[1].split(",");

if (outputValues.length != 2) {
return;
}

outputKey.set(values[0]);
outputValue.set(outputValues[0], Double.valueOf(outputValues[1]));
context.write(outputKey, outputValue);
}
}


设置Combiner,合并Mapper输出资料

PageRankCombiner.java

package org.feather.mapreduce.PageRank;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PageRankCombiner extends
Reducer<Text, PageRankWritable, Text, PageRankWritable> {
private Text outputKey = new Text();
private PageRankWritable outputValue = new PageRankWritable();

@Override
protected void reduce(Text key, Iterable<PageRankWritable> values,
Context context) throws IOException, InterruptedException {
Double M = 0D;
Double V = 0D;
for (PageRankWritable value : values) {
if (value.getTag().equals("M")) {
M += value.getValue();
}

if (value.getTag().equals("V")) {
V += value.getValue();
}
}

outputKey.set(key.toString());
outputValue.set("M", M);
context.write(outputKey, outputValue);

outputKey.set(key.toString());
outputValue.set("V", V);
context.write(outputKey, outputValue);
}
}


Reducer input: Text, PageRankWritable; output: Text, PageRankWritable

input: (A, [(M, 1/3), (M, 1/2), (V, 1/N)]) => output: (A, (V, ((1 - Q) x (1/3 + 1/2) x V) + E x (Q/N))

N = PageCount值(加载到内存)

M = PageLinks转移概率值

Q = 0.15(心灵转移概率值)

E = 1(N维单位向量)

V = 上次PageRank值

算法描述如下:

V′=(1−Q)(MV)+EQN

PageRankReducer.java

package org.feather.mapreduce.PageRank;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

@SuppressWarnings("deprecation")
public class PageRankReducer extends
Reducer<Text, PageRankWritable, Text, PageRankWritable> {
private HashMap<String, Long> pageCount = new HashMap<String, Long>();
private Text outputKey = new Text();
private PageRankWritable outputValue = new PageRankWritable();

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader br = null;
String pageCountLine = null;

Path[] uris = context.getLocalCacheFiles();

for (Path uri : uris) {
if (uri.toString().endsWith("part-r-00000")) {
br = new BufferedReader(new FileReader(uri.toString()));

while (null != (pageCountLine = br.readLine())) {
String[] counts = pageCountLine.split("\t");
if (counts.length == 2) {
String key = counts[0];
Long value = Long.valueOf(counts[1]);
pageCount.put(key, value);
}
}
}
}
}

@Override
protected void reduce(Text key, Iterable<PageRankWritable> values,
Context context) throws IOException, InterruptedException {
Double N = pageCount.get("PC").doubleValue();
Double Q = 0.15D;
Double E = 1D;
Double M = 0D;
Double V = 0D;

if (N == null || N == 0) {
return;
}

for (PageRankWritable value : values) {
if (value.getTag().equals("M")) {
M += value.getValue();
}

if (value.getTag().equals("V")) {
V += value.getValue();
}
}

Double value = (1D - Q) * M * V + E * (Q / N);

outputKey.set(key.toString());
outputValue.set("V", value);

context.write(outputKey, outputValue);
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
pageCount.clear();
pageCount = null;
}
}


PageRank.java

package org.feather.mapreduce.PageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PageRank extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());

job.addCacheFile(new Path(args[0]).toUri());

Path inputPath1 = new Path(args[1]);
FileInputFormat.addInputPath(job, inputPath1);

Path inputPath2 = new Path(args[2]);
FileInputFormat.addInputPath(job, inputPath2);

job.setMapperClass(PageRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageRankWritable.class);

job.setCombinerClass(PageRankCombiner.class);

job.setReducerClass(PageRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PageRankWritable.class);

Path outputPath = new Path(args[3]);
FileOutputFormat.setOutputPath(job, outputPath);

boolean isSuccessful = job.waitForCompletion(true);
return isSuccessful ? 0 : 1;
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new PageRank(), args);
System.exit(status);
}
}


在Hadoop中运行

$ bin/hdfs dfs -rm -r output/PageRank2
$ bin/yarn jar jars/PageRank.jar org.feather.mapreduce.PageRank.PageRank hdfs://hadoop01.malone.com:8020/user/hadoop/output/PageCount/part-r-00000 output/PageMatrix/part-r-00000 output/PageRank1/part-m-00000 output/PageRank2


$ bin/hdfs dfs -rm -r output/PageRank1
$ bin/yarn jar jars/PageRank.jar org.feather.mapreduce.PageRank.PageRank hdfs://hadoop01.malone.com:8020/user/hadoop/output/PageCount/part-r-00000 output/PageMatrix/part-r-00000 output/PageRank2/part* output/PageRank1


源代码下载地址

https://git.oschina.net/elbertmalone/PageRank.git
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce 算法 实例