您的位置:首页 > 运维架构

基于Hadoop的MapReduce框架实现最简单PageRank模型

2016-05-16 00:00 519 查看
摘要: 本文基于前面对MapReduce框架的理解,进一步学习PageRank算法,对于Google的PageRank算法我也不说过多的了。本篇将实现最简单的PageRank模型。

1、首先大家需要对最简单的PageRank模型有基本的了解,网上一百度比比皆是,在此我就不重复别人的步骤了,既然有资源就好好利用。不了解的同学可以先自行阅读这篇博客:http://blog.jobbole.com/71431/

2、数据我们就用博客中的数据,因为考虑转移矩阵是一个很多的稀疏矩阵,我们可以用稀疏矩阵的形式表示,我们把web图中的每一个网页及其链出的网页作为一行。如图:



3、其实博客中最重要的就是一张图:



图中原理我就不进行赘述了。其实,可以简化该步骤,只使用一个MapReduce,然后不断迭代就可以搞定。简化如下图:



简化介绍:将链接文件和概率分布文件传入Mapper层,在Mapper层就开始计算每一个页面的出链页面概率值,省去将两个文件并合并的Mapper与Reducer过程,然后传入Reducer层根据

这个公式计算出每个页面的概率分布,并存入文件,下一次迭代使用的概率分布分件就是刚才输出的保存的文件,这样不断迭代,最终收敛得出结果。

4、代码实现(有很多硬编码处,只是简单实现望海涵)

/**

* 最简单PageRank模型实现

* @author ZD

*/

public class FirstRank {

private static double A = 0.25;

private static double B = 0.25;

private static double C = 0.25;

private static double D = 0.25;

private static double temp=0.0;

//阻尼系数,一般为0.85

private static double pValue = 0.8;

private static class FirstRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {

@Override

protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)

throws IOException, InterruptedException {

super.setup(context);

Configuration cfg = HadoopCfg.getConfigration();

FileSystem fs = FileSystem.get(cfg);

Path path = new Path(cfg.get("rand"));

RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);

while(rt.hasNext()){

LocatedFileStatus status = rt.next();

Path filePath = status.getPath();

BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));

String line = "";

while ((line = br.readLine()) != null) {

//注意分割,本地文件为空格,而hdfs中文件为\t

String[] strs = line.trim().split("\t");

if (strs[0].equals("A")) {

A = Double.parseDouble(strs[1]);

} else if (strs[0].equals("B")) {

B = Double.parseDouble(strs[1]);

} else if (strs[0].equals("C")) {

C = Double.parseDouble(strs[1]);

} else if (strs[0].equals("D")) {

D = Double.parseDouble(strs[1]);

}

}

}

}

@Override

protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {

if (fileName.toString().equals("links.txt")) {

//链接文件是用空格分割的,要注意区分

String[] strs = value.toString().split(" ");

for(int i=0; i<strs.length; i++){

System.out.println(strs[i]);

}

context.write(new Text(strs[0]), new DoubleWritable(0.0));

//double temp = 0.0;

if (strs[0].equals("A")) {

temp = A;

} else if (strs[0].equals("B")) {

temp = B;

} else if (strs[0].equals("C")) {

temp = C;

} else if (strs[0].equals("D")) {

temp = D;

}

//i从1开始,因为首位不需要分概率,也不需要传入reducer

for (int i = 1; i < strs.length; i++) {

context.write(new Text(strs[i]), new DoubleWritable(temp / (strs.length - 1)));

}

}

}

}

private static class FirstRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

@Override

protected void reduce(Text value, Iterable<DoubleWritable> datas,

Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)

throws IOException, InterruptedException {

double total = 0.0;

for (DoubleWritable data : datas) {

total += data.get();

}

double result = pValue*total+(1-pValue)*temp;

context.write(new Text(value), new DoubleWritable(result));

}

}

public static void firstRun(int count){

try {

Configuration cfg = HadoopCfg.getConfigration();

cfg.set("rand", "/input/outrank"+count);

Job job = Job.getInstance(cfg);

job.setJobName("FirstRank");

job.setJarByClass(FirstRank.class);

//要引入

job.setInputFormatClass(FileNameInputFormat.class);

job.setMapperClass(FirstRankMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DoubleWritable.class);

job.setReducerClass(FirstRankReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path("/input/rank"));

FileInputFormat.addInputPath(job, new Path("/input/outrank"+count)); //将概率分布保存到该路径下,初始化概率分布则在"/input/outrank1/"下,所以count从1开始

FileOutputFormat.setOutputPath(job, new Path("/input/outrank"+(count+1)));

job.waitForCompletion(true);

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args){

for(int i=0; i<30; i++){ //迭代至少30次

firstRun(i+1);

}

}

}

其中FileNameInputFormat重写了FileInputFormat类的createRecordReader方法,使Mapper的inputKey为文件名而不是偏移量了。

FileNameInputFormat类:

/**

* @author ZD

*/

public class FileNameInputFormat extends FileInputFormat<Text, Text> {

@Override

public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)

throws IOException, InterruptedException {

FileNameRecordReader record = new FileNameRecordReader();

record.initialize(split, context);

return record;

}

}

FileNameRecordReader类:

/**

* @author ZD

*/

public class FileNameRecordReader extends RecordReader<Text, Text> {

private String fileName;

private LineRecordReader lrr = new LineRecordReader();

@Override

public Text getCurrentKey() throws IOException, InterruptedException {

return new Text(fileName);

}

@Override

public Text getCurrentValue() throws IOException, InterruptedException {

return lrr.getCurrentValue();

}

@Override

public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {

lrr.initialize(arg0, arg1);

fileName = ((FileSplit) arg0).getPath().getName();

}

@Override

public void close() throws IOException {

lrr.close();

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

return lrr.nextKeyValue();

}

@Override

public float getProgress() throws IOException, InterruptedException {

return lrr.getProgress();

}

}

写在最后:若有错误,望指出。下一篇将与大家分享根据PageRank原理,处理微博人物关系 ,根据已有数据分析粉丝数最多的微博都是哪些。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: