PageRank Hadoop MapReduce
2016-05-14 14:25
274 查看
links.txt
链接关系A B C D B A D C C D B C
part-r-00000 初试概率分布向量
a=0.8A a 0.25 B a 0.25 C a 0.25 D a 0.25
PageRankMapReduce
package org.bigdata.pagerank; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.bigdata.util.HadoopCfg; import org.bigdata.util.HadoopUtil; /** * PageRank * * @author wwhhf * */ public class PageRankMapReduce { private final static String JOB_NAME = "PageRank"; private static String LINKS = "links"; private static Map<String, Double> rand = new HashMap<String, Double>(); private static final double a = 0.8; public static void initRand(String pathin, String filename) throws IOException { List<String> lines = HadoopUtil.lslFile(pathin, filename); for (String line : lines) { String terms[] = line.toString().split("\t"); rand.put(terms[0], Double.valueOf(terms[1])); } } private static class PageRankMapper extends Mapper<Text, Text, Text, DoubleWritable> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String filename = key.toString(); if (filename.startsWith(LINKS)) { String dests[] = value.toString().split(" "); double e = rand.get(dests[0]); for (int i = 0, len = dests.length; i < len; i++) { String dest = dests[i]; if (i == 0) { context.write(new Text(dest), new DoubleWritable(0.0)); } else { context.write(new Text(dest), new DoubleWritable(e / (len - 1))); } } } } } private static class PageRankReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { Double sum = 0.0; for (DoubleWritable value : values) { sum += value.get(); } double e = rand.get(key.toString()); context.write(key, new DoubleWritable(a * sum + (1 - a) * e)); } } public static void solve(String linksin, String pathin, String pathout) throws ClassNotFoundException, InterruptedException { try { Configuration cfg = HadoopCfg.getConfiguration(); Job job = Job.getInstance(cfg); job.setJobName(JOB_NAME); job.setJarByClass(PageRankMapReduce.class); job.setInputFormatClass(FileNameInputFormat.class); // mapper job.setMapperClass(PageRankMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); // reducer job.setReducerClass(PageRankReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(pathin)); FileInputFormat.addInputPath(job, new Path(linksin)); FileOutputFormat.setOutputPath(job, new Path(pathout)); job.waitForCompletion(true); } catch (IllegalStateException | IllegalArgumentException | IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws ClassNotFoundException, InterruptedException, IOException { String path = "/pagerank"; String links_pathin = "/pagerank_links"; String filename = "part-r-00000"; String tmp_pathin = path; for (int i = 1; i <= 5; i++) { initRand(tmp_pathin, filename); String tmp_pathout = path + i; System.out.println(links_pathin + " " + tmp_pathin + " " + tmp_pathout); solve(links_pathin, tmp_pathin, tmp_pathout); tmp_pathin = tmp_pathout; } } }
相关文章推荐
- luci map 中的动作
- linux系统管理---服务控制
- linux系统管理---文件管理
- Android系统篇之----Android中的run-as命令引出升降权限的安全问题(Linux中的setuid和setgid)
- Centos 系统swap虚拟内存添加与删除配置
- linux系统管理---磁盘管理
- Linux Autotools
- centos(linux)学习笔记
- getsockopt 和 setsockopt讲解
- ClassNotFoundException: org.apache.ws.commons.schema.resolver.URIResolver
- tengine 淘宝 nginx
- 关于EBS七层组织架构的一些理解
- (Ubuntu+Centos) ntp时间同步服务器搭建与使用
- linux shell编程学习--日志工具
- Linux中用户管理详解
- Win7下硬盘安装Linux双系统
- Linux文件远程拷贝命令scp的使用
- 新版本 Linux Deploy 部署 Debian 提示 E:the selected extractor cannot be found:ar 问题
- 转载: #pragma pack(push,1) & #pragma pack(pop)
- Hbase原理、基本概念、基本架构