基于Hadoop的MapReduce框架实现最简单PageRank模型
2016-05-16 00:00
519 查看
摘要: 本文基于前面对MapReduce框架的理解,进一步学习PageRank算法,对于Google的PageRank算法我也不说过多的了。本篇将实现最简单的PageRank模型。
1、首先大家需要对最简单的PageRank模型有基本的了解,网上一百度比比皆是,在此我就不重复别人的步骤了,既然有资源就好好利用。不了解的同学可以先自行阅读这篇博客:http://blog.jobbole.com/71431/
2、数据我们就用博客中的数据,因为考虑转移矩阵是一个很多的稀疏矩阵,我们可以用稀疏矩阵的形式表示,我们把web图中的每一个网页及其链出的网页作为一行。如图:
3、其实博客中最重要的就是一张图:
图中原理我就不进行赘述了。其实,可以简化该步骤,只使用一个MapReduce,然后不断迭代就可以搞定。简化如下图:
简化介绍:将链接文件和概率分布文件传入Mapper层,在Mapper层就开始计算每一个页面的出链页面概率值,省去将两个文件并合并的Mapper与Reducer过程,然后传入Reducer层根据
这个公式计算出每个页面的概率分布,并存入文件,下一次迭代使用的概率分布分件就是刚才输出的保存的文件,这样不断迭代,最终收敛得出结果。
4、代码实现(有很多硬编码处,只是简单实现望海涵)
/**
* 最简单PageRank模型实现
* @author ZD
*/
public class FirstRank {
private static double A = 0.25;
private static double B = 0.25;
private static double C = 0.25;
private static double D = 0.25;
private static double temp=0.0;
//阻尼系数,一般为0.85
private static double pValue = 0.8;
private static class FirstRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {
@Override
protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = HadoopCfg.getConfigration();
FileSystem fs = FileSystem.get(cfg);
Path path = new Path(cfg.get("rand"));
RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);
while(rt.hasNext()){
LocatedFileStatus status = rt.next();
Path filePath = status.getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
String line = "";
while ((line = br.readLine()) != null) {
//注意分割,本地文件为空格,而hdfs中文件为\t
String[] strs = line.trim().split("\t");
if (strs[0].equals("A")) {
A = Double.parseDouble(strs[1]);
} else if (strs[0].equals("B")) {
B = Double.parseDouble(strs[1]);
} else if (strs[0].equals("C")) {
C = Double.parseDouble(strs[1]);
} else if (strs[0].equals("D")) {
D = Double.parseDouble(strs[1]);
}
}
}
}
@Override
protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
if (fileName.toString().equals("links.txt")) {
//链接文件是用空格分割的,要注意区分
String[] strs = value.toString().split(" ");
for(int i=0; i<strs.length; i++){
System.out.println(strs[i]);
}
context.write(new Text(strs[0]), new DoubleWritable(0.0));
//double temp = 0.0;
if (strs[0].equals("A")) {
temp = A;
} else if (strs[0].equals("B")) {
temp = B;
} else if (strs[0].equals("C")) {
temp = C;
} else if (strs[0].equals("D")) {
temp = D;
}
//i从1开始,因为首位不需要分概率,也不需要传入reducer
for (int i = 1; i < strs.length; i++) {
context.write(new Text(strs[i]), new DoubleWritable(temp / (strs.length - 1)));
}
}
}
}
private static class FirstRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text value, Iterable<DoubleWritable> datas,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double total = 0.0;
for (DoubleWritable data : datas) {
total += data.get();
}
double result = pValue*total+(1-pValue)*temp;
context.write(new Text(value), new DoubleWritable(result));
}
}
public static void firstRun(int count){
try {
Configuration cfg = HadoopCfg.getConfigration();
cfg.set("rand", "/input/outrank"+count);
Job job = Job.getInstance(cfg);
job.setJobName("FirstRank");
job.setJarByClass(FirstRank.class);
//要引入
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(FirstRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(FirstRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/rank"));
FileInputFormat.addInputPath(job, new Path("/input/outrank"+count)); //将概率分布保存到该路径下,初始化概率分布则在"/input/outrank1/"下,所以count从1开始
FileOutputFormat.setOutputPath(job, new Path("/input/outrank"+(count+1)));
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
for(int i=0; i<30; i++){ //迭代至少30次
firstRun(i+1);
}
}
}
其中FileNameInputFormat重写了FileInputFormat类的createRecordReader方法,使Mapper的inputKey为文件名而不是偏移量了。
FileNameInputFormat类:
/**
* @author ZD
*/
public class FileNameInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileNameRecordReader record = new FileNameRecordReader();
record.initialize(split, context);
return record;
}
}
FileNameRecordReader类:
/**
* @author ZD
*/
public class FileNameRecordReader extends RecordReader<Text, Text> {
private String fileName;
private LineRecordReader lrr = new LineRecordReader();
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lrr.getCurrentValue();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
lrr.initialize(arg0, arg1);
fileName = ((FileSplit) arg0).getPath().getName();
}
@Override
public void close() throws IOException {
lrr.close();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lrr.nextKeyValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lrr.getProgress();
}
}
写在最后:若有错误,望指出。下一篇将与大家分享根据PageRank原理,处理微博人物关系 ,根据已有数据分析粉丝数最多的微博都是哪些。
1、首先大家需要对最简单的PageRank模型有基本的了解,网上一百度比比皆是,在此我就不重复别人的步骤了,既然有资源就好好利用。不了解的同学可以先自行阅读这篇博客:http://blog.jobbole.com/71431/
2、数据我们就用博客中的数据,因为考虑转移矩阵是一个很多的稀疏矩阵,我们可以用稀疏矩阵的形式表示,我们把web图中的每一个网页及其链出的网页作为一行。如图:
3、其实博客中最重要的就是一张图:
图中原理我就不进行赘述了。其实,可以简化该步骤,只使用一个MapReduce,然后不断迭代就可以搞定。简化如下图:
简化介绍:将链接文件和概率分布文件传入Mapper层,在Mapper层就开始计算每一个页面的出链页面概率值,省去将两个文件并合并的Mapper与Reducer过程,然后传入Reducer层根据
这个公式计算出每个页面的概率分布,并存入文件,下一次迭代使用的概率分布分件就是刚才输出的保存的文件,这样不断迭代,最终收敛得出结果。
4、代码实现(有很多硬编码处,只是简单实现望海涵)
/**
* 最简单PageRank模型实现
* @author ZD
*/
public class FirstRank {
private static double A = 0.25;
private static double B = 0.25;
private static double C = 0.25;
private static double D = 0.25;
private static double temp=0.0;
//阻尼系数,一般为0.85
private static double pValue = 0.8;
private static class FirstRankMapper extends Mapper<Text, Text, Text, DoubleWritable> {
@Override
protected void setup(Mapper<Text, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = HadoopCfg.getConfigration();
FileSystem fs = FileSystem.get(cfg);
Path path = new Path(cfg.get("rand"));
RemoteIterator<LocatedFileStatus> rt = fs.listFiles(path, false);
while(rt.hasNext()){
LocatedFileStatus status = rt.next();
Path filePath = status.getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
String line = "";
while ((line = br.readLine()) != null) {
//注意分割,本地文件为空格,而hdfs中文件为\t
String[] strs = line.trim().split("\t");
if (strs[0].equals("A")) {
A = Double.parseDouble(strs[1]);
} else if (strs[0].equals("B")) {
B = Double.parseDouble(strs[1]);
} else if (strs[0].equals("C")) {
C = Double.parseDouble(strs[1]);
} else if (strs[0].equals("D")) {
D = Double.parseDouble(strs[1]);
}
}
}
}
@Override
protected void map(Text fileName, Text value, Mapper<Text, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
if (fileName.toString().equals("links.txt")) {
//链接文件是用空格分割的,要注意区分
String[] strs = value.toString().split(" ");
for(int i=0; i<strs.length; i++){
System.out.println(strs[i]);
}
context.write(new Text(strs[0]), new DoubleWritable(0.0));
//double temp = 0.0;
if (strs[0].equals("A")) {
temp = A;
} else if (strs[0].equals("B")) {
temp = B;
} else if (strs[0].equals("C")) {
temp = C;
} else if (strs[0].equals("D")) {
temp = D;
}
//i从1开始,因为首位不需要分概率,也不需要传入reducer
for (int i = 1; i < strs.length; i++) {
context.write(new Text(strs[i]), new DoubleWritable(temp / (strs.length - 1)));
}
}
}
}
private static class FirstRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text value, Iterable<DoubleWritable> datas,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double total = 0.0;
for (DoubleWritable data : datas) {
total += data.get();
}
double result = pValue*total+(1-pValue)*temp;
context.write(new Text(value), new DoubleWritable(result));
}
}
public static void firstRun(int count){
try {
Configuration cfg = HadoopCfg.getConfigration();
cfg.set("rand", "/input/outrank"+count);
Job job = Job.getInstance(cfg);
job.setJobName("FirstRank");
job.setJarByClass(FirstRank.class);
//要引入
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(FirstRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(FirstRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/rank"));
FileInputFormat.addInputPath(job, new Path("/input/outrank"+count)); //将概率分布保存到该路径下,初始化概率分布则在"/input/outrank1/"下,所以count从1开始
FileOutputFormat.setOutputPath(job, new Path("/input/outrank"+(count+1)));
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args){
for(int i=0; i<30; i++){ //迭代至少30次
firstRun(i+1);
}
}
}
其中FileNameInputFormat重写了FileInputFormat类的createRecordReader方法,使Mapper的inputKey为文件名而不是偏移量了。
FileNameInputFormat类:
/**
* @author ZD
*/
public class FileNameInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileNameRecordReader record = new FileNameRecordReader();
record.initialize(split, context);
return record;
}
}
FileNameRecordReader类:
/**
* @author ZD
*/
public class FileNameRecordReader extends RecordReader<Text, Text> {
private String fileName;
private LineRecordReader lrr = new LineRecordReader();
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lrr.getCurrentValue();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
lrr.initialize(arg0, arg1);
fileName = ((FileSplit) arg0).getPath().getName();
}
@Override
public void close() throws IOException {
lrr.close();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lrr.nextKeyValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lrr.getProgress();
}
}
写在最后:若有错误,望指出。下一篇将与大家分享根据PageRank原理,处理微博人物关系 ,根据已有数据分析粉丝数最多的微博都是哪些。
相关文章推荐
- 慕课linux学习笔记(七)常用命令(4)
- 【小平工作日志】Hadoop产品简介
- 一台电脑同时运行多个tomcat配置方法 (转)
- Hadoop三种安装模式
- 如何让Windows恋上Linux bash
- Centos下快速搭建svn版本库
- Pause/Resume Instance 操作详解 - 每天5分钟玩转 OpenStack(34)
- apache (0xc000007b) 错误
- Mac下Tomcat不能正常访问的问题
- zabbix 添加监控Linux Disk I/O 模板
- win7下硬盘安装fedora,linux的学习之路
- 嵌入式linux教程
- Linux下堆漏洞的利用机制
- Linux内核提权查询
- tomcat6.0配置
- MySQL5.6主从复制的配置(CentOS-6.6+MySQL-5.6)(二)
- PHP7 + Apache2.4 + MySQL 5.7 + Windows7
- MySQL5.6源码编译安装(CentOS-6.6+MySQL-5.6)(一)
- linux ar命令
- [svc][op]LVS+keepalived