您的位置:首页 > 运维架构

Mahout基于hadoop实现itembased协同过滤流程解析

2014-11-11 18:22 696 查看
注:我们要想分析mahout itembased协同过滤,必须先找到执行协同过滤的触发类

org.apache.mahout.cf.taste.hadoop.item.RecommenderJob。通过分析RecommenderJob的代码来了解mahout itembased协同过滤在hadoop上的实现方式。从RecommenderJob类中run()方法中我们可以看到前面的addOption部分是参数设置说明和参数设置方式AtomicInteger
currentPhase = new AtomicInteger()语句开始进行itembased协同过滤。currentPhase初始值为0。

itembased协同过滤在hadoop上的实现主要步骤

第一步:执行PreparePreferenceMatrixJob大作业

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString(),
});
numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
}


分析shouldRunNextPhase(parsedArgs, currentPhase)源码可知,这个函数的作用是判断currentPhase是否在startPhase和endPhase之间,同时将currentPhase自增1。如果满足在startPhase和endPhase之间的条件,执行PreparePreferenceMatrixJob作业。

此时,定位到org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob类,看此类的run函数,在此作业中存在很多子作业。

首先执行itemIDIndex子作业,进行item和id映射。

Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,
ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,
VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);
itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
其次执行toUserVectors子作业,将原始数据整理为用户向量

Job toUserVectors = prepareJob(getInputPath(),
getOutputPath(USER_VECTORS),
TextInputFormat.class,
ToItemPrefsMapper.class,
VarLongWritable.class,
booleanData ? VarLongWritable.class : EntityPrefWritable.class,
ToUserVectorsReducer.class,
VarLongWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class);
toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData);
toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift));
succeeded = toUserVectors.waitForCompletion(true);


再次,执行toItemVectors子作业,将原始数据整理为item向量,即评分矩阵

第二步:此步有两个大作业,第一个执行RowSimilarityJob大作业,计算行相似度

if (shouldRunNextPhase(parsedArgs, currentPhase)) {

/* special behavior if phase 1 is skipped */
if (numberOfUsers == -1) {
numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
PathType.LIST, null, getConf());
}

//calculate the co-occurrence matrix
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
"--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
"--randomSeed", String.valueOf(randomSeed),
"--tempDir", getTempPath().toString(),
});
此时,定位到org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob类,查看此类的run函数,从函数中我们可以看到此类执行了多个job子作业。

首先,执行countObservations子作业。

Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class,
NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class,
VectorWritable.class);
countObservations.setCombinerClass(VectorSumCombiner.class);
countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
countObservations.setNumReduceTasks(1);
countObservations.waitForCompletion(true);
其次,执行normsAndTranspose子作业,得到weights文件。

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow));
normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn));
normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));

boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
再次,执行pairwiseSimilarity子作业,得到两两相似度文件pairwiseSimilarity。

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
pairwiseConf.set(NORMS_PATH, normsPath.toString());
pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
最后,执行asMatrix子作业,得到item相似度值文件。

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
VectorWritable.class);
asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
boolean succeeded = asMatrix.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}

执行第二部第二个大作业outputSimilarityMatrix,此次作业是一个判断性的,如果没有设置参数则不执行。
if (hasOption("outputPathForSimilarityMatrix")) {
Path outputPathForSimilarityMatrix = new Path(getOption("outputPathForSimilarityMatrix"));

Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, outputPathForSimilarityMatrix,
SequenceFileInputFormat.class, ItemSimilarityJob.MostSimilarItemPairsMapper.class,
EntityEntityWritable.class, DoubleWritable.class, ItemSimilarityJob.MostSimilarItemPairsReducer.class,
EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class);

Configuration mostSimilarItemsConf = outputSimilarityMatrix.getConfiguration();
mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR,
new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem);
outputSimilarityMatrix.waitForCompletion(true);
}


第三步:执行partialMultiply作业,将同现矩阵和用户向量相乘

//start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job partialMultiply = new Job(getConf(), "partialMultiply");
Configuration partialMultiplyConf = partialMultiply.getConfiguration();

MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class,
SimilarityMatrixRowWrapperMapper.class);
MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
partialMultiply.setJarByClass(ToVectorAndPrefReducer.class);
partialMultiply.setMapOutputKeyClass(VarIntWritable.class);
partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class);
partialMultiply.setReducerClass(ToVectorAndPrefReducer.class);
partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class);
partialMultiply.setOutputKeyClass(VarIntWritable.class);
partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class);
partialMultiplyConf.setBoolean("mapred.compress.map.output", true);
partialMultiplyConf.set("mapred.output.dir", partialMultiplyPath.toString());

if (usersFile != null) {
partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
}
partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED, maxPrefsPerUser);

boolean succeeded = partialMultiply.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}


第四步:在此步首先执行一个判断性的作业,如果filterFile文件没有设定就不用执行

if (filterFile != null) {
Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
SequenceFileOutputFormat.class);
boolean succeeded = itemFiltering.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
其次,执行aggregateAndRecommend作业,得到最后推荐结果

Job aggregateAndRecommend = prepareJob(
new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
outputFormat);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐