Mahout基于hadoop实现itembased协同过滤流程解析
2014-11-11 18:22
696 查看
注:我们要想分析mahout itembased协同过滤,必须先找到执行协同过滤的触发类
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob。通过分析RecommenderJob的代码来了解mahout itembased协同过滤在hadoop上的实现方式。从RecommenderJob类中run()方法中我们可以看到前面的addOption部分是参数设置说明和参数设置方式AtomicInteger
currentPhase = new AtomicInteger()语句开始进行itembased协同过滤。currentPhase初始值为0。
分析shouldRunNextPhase(parsedArgs, currentPhase)源码可知,这个函数的作用是判断currentPhase是否在startPhase和endPhase之间,同时将currentPhase自增1。如果满足在startPhase和endPhase之间的条件,执行PreparePreferenceMatrixJob作业。
此时,定位到org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob类,看此类的run函数,在此作业中存在很多子作业。
首先执行itemIDIndex子作业,进行item和id映射。
再次,执行toItemVectors子作业,将原始数据整理为item向量,即评分矩阵
第二步:此步有两个大作业,第一个执行RowSimilarityJob大作业,计算行相似度
首先,执行countObservations子作业。
执行第二部第二个大作业outputSimilarityMatrix,此次作业是一个判断性的,如果没有设置参数则不执行。
第三步:执行partialMultiply作业,将同现矩阵和用户向量相乘
第四步:在此步首先执行一个判断性的作业,如果filterFile文件没有设定就不用执行
org.apache.mahout.cf.taste.hadoop.item.RecommenderJob。通过分析RecommenderJob的代码来了解mahout itembased协同过滤在hadoop上的实现方式。从RecommenderJob类中run()方法中我们可以看到前面的addOption部分是参数设置说明和参数设置方式AtomicInteger
currentPhase = new AtomicInteger()语句开始进行itembased协同过滤。currentPhase初始值为0。
itembased协同过滤在hadoop上的实现主要步骤
第一步:执行PreparePreferenceMatrixJob大作业if (shouldRunNextPhase(parsedArgs, currentPhase)) { ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{ "--input", getInputPath().toString(), "--output", prepPath.toString(), "--minPrefsPerUser", String.valueOf(minPrefsPerUser), "--booleanData", String.valueOf(booleanData), "--tempDir", getTempPath().toString(), }); numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf()); }
分析shouldRunNextPhase(parsedArgs, currentPhase)源码可知,这个函数的作用是判断currentPhase是否在startPhase和endPhase之间,同时将currentPhase自增1。如果满足在startPhase和endPhase之间的条件,执行PreparePreferenceMatrixJob作业。
此时,定位到org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob类,看此类的run函数,在此作业中存在很多子作业。
首先执行itemIDIndex子作业,进行item和id映射。
Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class, ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class); itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);其次执行toUserVectors子作业,将原始数据整理为用户向量
Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class, ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class, ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData); toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser); toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift)); succeeded = toUserVectors.waitForCompletion(true);
再次,执行toItemVectors子作业,将原始数据整理为item向量,即评分矩阵
第二步:此步有两个大作业,第一个执行RowSimilarityJob大作业,计算行相似度
if (shouldRunNextPhase(parsedArgs, currentPhase)) { /* special behavior if phase 1 is skipped */ if (numberOfUsers == -1) { numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS), PathType.LIST, null, getConf()); } //calculate the co-occurrence matrix ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{ "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(), "--output", similarityMatrixPath.toString(), "--numberOfColumns", String.valueOf(numberOfUsers), "--similarityClassname", similarityClassname, "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity), "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity), "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem), "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE), "--threshold", String.valueOf(threshold), "--randomSeed", String.valueOf(randomSeed), "--tempDir", getTempPath().toString(), });此时,定位到org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob类,查看此类的run函数,从函数中我们可以看到此类执行了多个job子作业。
首先,执行countObservations子作业。
Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class, NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class, VectorWritable.class); countObservations.setCombinerClass(VectorSumCombiner.class); countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString()); countObservations.setNumReduceTasks(1); countObservations.waitForCompletion(true);其次,执行normsAndTranspose子作业,得到weights文件。
if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class); normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class); Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration(); normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold)); normsAndTransposeConf.set(NORMS_PATH, normsPath.toString()); normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString()); normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString()); normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname); normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString()); normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow)); normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn)); normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed)); boolean succeeded = normsAndTranspose.waitForCompletion(true); if (!succeeded) { return -1; } }再次,执行pairwiseSimilarity子作业,得到两两相似度文件pairwiseSimilarity。
if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class, IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class); pairwiseSimilarity.setCombinerClass(VectorSumReducer.class); Configuration pairwiseConf = pairwiseSimilarity.getConfiguration(); pairwiseConf.set(THRESHOLD, String.valueOf(threshold)); pairwiseConf.set(NORMS_PATH, normsPath.toString()); pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString()); pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString()); pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname); pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns); pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity); boolean succeeded = pairwiseSimilarity.waitForCompletion(true); if (!succeeded) { return -1; } }最后,执行asMatrix子作业,得到item相似度值文件。
if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class, IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class, VectorWritable.class); asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class); asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow); boolean succeeded = asMatrix.waitForCompletion(true); if (!succeeded) { return -1; } }
执行第二部第二个大作业outputSimilarityMatrix,此次作业是一个判断性的,如果没有设置参数则不执行。
if (hasOption("outputPathForSimilarityMatrix")) { Path outputPathForSimilarityMatrix = new Path(getOption("outputPathForSimilarityMatrix")); Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, outputPathForSimilarityMatrix, SequenceFileInputFormat.class, ItemSimilarityJob.MostSimilarItemPairsMapper.class, EntityEntityWritable.class, DoubleWritable.class, ItemSimilarityJob.MostSimilarItemPairsReducer.class, EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class); Configuration mostSimilarItemsConf = outputSimilarityMatrix.getConfiguration(); mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR, new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString()); mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem); outputSimilarityMatrix.waitForCompletion(true); }
第三步:执行partialMultiply作业,将同现矩阵和用户向量相乘
//start the multiplication of the co-occurrence matrix by the user vectors if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job partialMultiply = new Job(getConf(), "partialMultiply"); Configuration partialMultiplyConf = partialMultiply.getConfiguration(); MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class, SimilarityMatrixRowWrapperMapper.class); MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS), SequenceFileInputFormat.class, UserVectorSplitterMapper.class); partialMultiply.setJarByClass(ToVectorAndPrefReducer.class); partialMultiply.setMapOutputKeyClass(VarIntWritable.class); partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class); partialMultiply.setReducerClass(ToVectorAndPrefReducer.class); partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class); partialMultiply.setOutputKeyClass(VarIntWritable.class); partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class); partialMultiplyConf.setBoolean("mapred.compress.map.output", true); partialMultiplyConf.set("mapred.output.dir", partialMultiplyPath.toString()); if (usersFile != null) { partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile); } partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED, maxPrefsPerUser); boolean succeeded = partialMultiply.waitForCompletion(true); if (!succeeded) { return -1; } }
第四步:在此步首先执行一个判断性的作业,如果filterFile文件没有设定就不用执行
if (filterFile != null) { Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class, ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class, ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class, SequenceFileOutputFormat.class); boolean succeeded = itemFiltering.waitForCompletion(true); if (!succeeded) { return -1; } }其次,执行aggregateAndRecommend作业,得到最后推荐结果
Job aggregateAndRecommend = prepareJob( new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class, PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class, AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class, outputFormat);
相关文章推荐
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (2)
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (7)
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (3)
- 基于hadoop下的mahout推荐系统实现
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (5)
- 基于hadoop下的mahout推荐系统实现
- 深入理解mahout基于hadoop的协同过滤流程
- Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (6)
- mapreduce实现与流程解析—Hadoop2.6.0
- Linux下Shell编程实现基于Hadoop的ETL(流程篇)
- Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
- Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (4)
- 图文讲解基于centos虚拟机的Hadoop集群安装,并且使用Mahout实现贝叶斯分类实例 (1)
- Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析
- mahout demo——本质上是基于Hadoop的分步式算法实现,比如多节点的数据合并,数据排序,网路通信的效率,节点宕机重算,数据分步式存储
- 基于模型设计的FPGA开发与实现:基本流程(一)之入门小例子(二):秒表
- 基于Hadoop的数据挖掘项目mahout的源码编译
- 基于模型设计的FPGA开发与实现:基本流程(一)概述