mahout Taste 代码分析, 可运行在hadoop上的分布式版本
2013-03-20 20:55
274 查看
1. 将itemid从long型装换成int型,然后找出最小的itemindex的值,具体可以看map和reduce的代码,这里不多写。下面是这个job的代码部分。前边的代码都是一些参数的配置,略过。
ToItemPrefsMapper继承自ToEntityPrefsMapper,这里主要是为了区分item-based 和 user-based 的方法,如果item-based的话,key就是userid。因此有了transpose用来转置我们输入的数据(userid, itemid).
ToUserVectorReducer,主要将相同userid的<itemid, Prefs>组成一个vector, 其中MIN_PREFERENCES_PRE_USER被设置为对于一个user来说,至少要有多少“购物”的经历,才能被认为是一个合格的研究点放在推荐系统中被研究。
3. 计算user数目,在RecommenderJob中的代码是:这部分和整个项目是独立的,即使再主程序中注销掉这个程序,程序依然可以正常执行,是整个taste中的一个附加功能,能够计算出有多少user.
map:org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper
CountUsersKeyWritable 是用户自己定义的可序列化的一个类,其实与VarLongWritable相似,就是将一个Long型的变量序列化。
从代码中可见map的结果是userid, vector<itemindex, prefs> 变成 userid, userid
Reducer: org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer
4. 计算item的user偏好量,job配置的代码如下:
map: org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper
代码比较复杂,基本的意思是将userid vector<item_index, prefs>的形式变成item_index <user_index, prefs>。注意这里不是vector的形式。新建了DistributedRowMatrix的可序列化矩阵,但是其实,该矩阵只有一个元素,设置了行为item_index, 列是user_index。当mapper执行完之后,会进行排序,这样能够将item_index相同的条目放在一起,从而能够在reducer中实现,与Job2相同,放在一个RandomAccessSparseVector中。
个人认为如果不计算后边的权重,是不需要计算item的偏好量的,而很多相似性的算法是不需要计算权重的,所以可以酌情考虑是否需要这个Job,打算自己写推荐系统的时候去掉这部分代码。欢迎大家讨论。
这部分在主函数中以一个任务RowSimilarityJob的形式展现出来,这个job被分配成三个mapreduce的来执行
5. 计算weight,这部分的代码我还没有分析清楚,因为我查了similarity.weight(),发现大部分相似性的代码中都没有这部分代码,所以我不是很清楚这个代码的任务。
但是经过这个代码,我们能够看到数据结构的变化。
itemindex, vector<userindex, prefs> => userindex, WeightedOccurrence<itemindex,prefs, weight>。 其中weightoccurrence就是一个简单的数据封装
userindex, WeightedOccurrence<itemindex,prefs, weight> => userindex, WeightedOccurrenceArray, 这个数组的数据项是WeightedOccurrence<itemindex, prefs, weight>
6.
/* * Job 1: itemid -> itemindex */ if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job itemIDIndex = prepareJob( inputPath, itemIDIndexPath, TextInputFormat.class, ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class); itemIDIndex.setCombinerClass(ItemIDIndexReducer.class); itemIDIndex.waitForCompletion(true); }2. userid, itemid, prefs => userid, <itemid, prefs> => userid, vector<itemindex, prefs>
/* * Job 2: userid itemid pref -> userid vector<itemid pref> */ if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job toUserVector = prepareJob( inputPath, userVectorPath, TextInputFormat.class, ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class, ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData); toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser); toUserVector.waitForCompletion(true); }
ToItemPrefsMapper继承自ToEntityPrefsMapper,这里主要是为了区分item-based 和 user-based 的方法,如果item-based的话,key就是userid。因此有了transpose用来转置我们输入的数据(userid, itemid).
public abstract class ToEntityPrefsMapper extends Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> { public static final String TRANSPOSE_USER_ITEM = "transposeUserItem"; private static final Pattern DELIMITER = Pattern.compile("[\t,]"); private boolean booleanData; private boolean transpose; private final boolean itemKey; ToEntityPrefsMapper(boolean itemKey) { this.itemKey = itemKey; } @Override protected void setup(Context context) { Configuration jobConf = context.getConfiguration(); booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false); transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString()); long userID = Long.parseLong(tokens[0]); long itemID = Long.parseLong(tokens[1]); /* * 转置的部分,如果是user-based cf 的算法,itemid将作为我们以后计算的key值进行推荐 */ if (itemKey ^ transpose) { // If using items as keys, and not transposing items and users, then users are items! // Or if not using items as keys (users are, as usual), but transposing items and users, // then users are items! Confused? long temp = userID; userID = itemID; itemID = temp; } if (booleanData) { context.write(new VarLongWritable(userID), new VarLongWritable(itemID)); } else { float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f; context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue)); } } }
ToUserVectorReducer,主要将相同userid的<itemid, Prefs>组成一个vector, 其中MIN_PREFERENCES_PRE_USER被设置为对于一个user来说,至少要有多少“购物”的经历,才能被认为是一个合格的研究点放在推荐系统中被研究。
public final class ToUserVectorReducer extends Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> { public static final String MIN_PREFERENCES_PER_USER = ToUserVectorReducer.class.getName() + ".minPreferencesPerUser"; private int minPreferences; @Override protected void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); minPreferences = ctx.getConfiguration().getInt(MIN_PREFERENCES_PER_USER, 1); } @Override protected void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { // 新建一个RandomAccessSparseVector,用来封装所有的<itemindex, prefs> Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { int index = TasteHadoopUtils.idToIndex(itemPref.get()); float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f; userVector.set(index, value); } if (userVector.getNumNondefaultElements() >= minPreferences) { VectorWritable vw = new VectorWritable(userVector); vw.setWritesLaxPrecision(true); context.write(userID, vw); } } }
3. 计算user数目,在RecommenderJob中的代码是:这部分和整个项目是独立的,即使再主程序中注销掉这个程序,程序依然可以正常执行,是整个taste中的一个附加功能,能够计算出有多少user.
if (shouldRunNextPhase(parsedArgs, currentPhase)) { Job countUsers = prepareJob(userVectorPath, countUsersPath, SequenceFileInputFormat.class, CountUsersMapper.class, CountUsersKeyWritable.class, VarLongWritable.class, CountUsersReducer.class, VarIntWritable.class, NullWritable.class, TextOutputFormat.class); countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class); countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class); countUsers.waitForCompletion(true); }
map:org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper
public class CountUsersMapper extends Mapper<VarLongWritable,VectorWritable,CountUsersKeyWritable,VarLongWritable> { @Override protected void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); context.write(new CountUsersKeyWritable(userID), new VarLongWritable(userID)); } }
CountUsersKeyWritable 是用户自己定义的可序列化的一个类,其实与VarLongWritable相似,就是将一个Long型的变量序列化。
从代码中可见map的结果是userid, vector<itemindex, prefs> 变成 userid, userid
Reducer: org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer
protected void reduce(CountUsersKeyWritable key, Iterable<VarLongWritable> userIDs, Context context) throws IOException, InterruptedException { long lastSeenUserID = Long.MIN_VALUE; int numberOfUsers = 0; for (VarLongWritable writable : userIDs) { long currentUserID = writable.get(); if (currentUserID > lastSeenUserID) { lastSeenUserID = currentUserID; numberOfUsers++; } } context.write(new VarIntWritable(numberOfUsers), NullWritable.get()); }计算user的数量,现在有疑问是为什么多做了一层判断。还有numberOfUsers是局部变量吧
4. 计算item的user偏好量,job配置的代码如下:
Job maybePruneAndTransponse = prepareJob(userVectorPath, itemUserMatrixPath, SequenceFileInputFormat.class, MaybePruneRowsMapper.class, IntWritable.class, DistributedRowMatrix.MatrixEntryWritable.class, ToItemVectorsReducer.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES, maxCooccurrencesPerItem); maybePruneAndTransponse.waitForCompletion(true);
map: org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper
protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx) throws IOException, InterruptedException { Vector vector = vectorWritable.get(); countSeen(vector); int numElementsBeforePruning = vector.getNumNondefaultElements(); vector = maybePruneVector(vector); int numElementsAfterPruning = vector.getNumNondefaultElements(); ctx.getCounter(Elements.USED).increment(numElementsAfterPruning); ctx.getCounter(Elements.NEGLECTED).increment(numElementsBeforePruning - numElementsAfterPruning); DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable(); int colIndex = TasteHadoopUtils.idToIndex(rowIndex.get()); entry.setCol(colIndex); Iterator<Vector.Element> iterator = vector.iterateNonZero(); while (iterator.hasNext()) { Vector.Element elem = iterator.next(); entry.setRow(elem.index()); entry.setVal(elem.get()); ctx.write(new IntWritable(elem.index()), entry); } }
代码比较复杂,基本的意思是将userid vector<item_index, prefs>的形式变成item_index <user_index, prefs>。注意这里不是vector的形式。新建了DistributedRowMatrix的可序列化矩阵,但是其实,该矩阵只有一个元素,设置了行为item_index, 列是user_index。当mapper执行完之后,会进行排序,这样能够将item_index相同的条目放在一起,从而能够在reducer中实现,与Job2相同,放在一个RandomAccessSparseVector中。
个人认为如果不计算后边的权重,是不需要计算item的偏好量的,而很多相似性的算法是不需要计算权重的,所以可以酌情考虑是否需要这个Job,打算自己写推荐系统的时候去掉这部分代码。欢迎大家讨论。
这部分在主函数中以一个任务RowSimilarityJob的形式展现出来,这个job被分配成三个mapreduce的来执行
try { ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] { "-Dmapred.input.dir=" + itemUserMatrixPath, "-Dmapred.output.dir=" + similarityMatrixPath, "--numberOfColumns", String.valueOf(numberOfUsers), "--similarityClassname", similarityClassname, "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1), "--tempDir", tempDirPath.toString() }); } catch (Exception e) { throw new IllegalStateException("item-item-similarity computation failed", e); }
5. 计算weight,这部分的代码我还没有分析清楚,因为我查了similarity.weight(),发现大部分相似性的代码中都没有这部分代码,所以我不是很清楚这个代码的任务。
但是经过这个代码,我们能够看到数据结构的变化。
itemindex, vector<userindex, prefs> => userindex, WeightedOccurrence<itemindex,prefs, weight>。 其中weightoccurrence就是一个简单的数据封装
userindex, WeightedOccurrence<itemindex,prefs, weight> => userindex, WeightedOccurrenceArray, 这个数组的数据项是WeightedOccurrence<itemindex, prefs, weight>
6.
相关文章推荐
- hadoop 0.1.0版本namenode代码分析
- hadoop平台运行python代码
- Hadoop 伪分布式安装、运行测试例子
- hadoop2.5.0版本分布式部署配置过程
- StopWatch 监控Java代码运行时间和分析性能
- hadoop学习笔记(一)——hadoop运行源代码分析
- Hadoop完全分布式环境配置及 Word Count 程序运行
- 分布式安装Hadoop2.7(适用于2.x版本)
- magento -- 1.4版本使用google analytic 流量分析代码的bug
- Hadoop-0.20.0源代码分析(10)
- Hadoop-0.20.0源代码分析(11)
- Hadoop 2.0 Yarn代码:NodeManager端代码分析_NM端各服务模块的启动
- PTAMM阅读笔记之EMGU CV的使用、 VC运行库版本冲突分析处理
- Hadoop-0.20.0源代码分析(19)
- Hadoop2.7.1伪分布式运行实例
- hadoop新版的api接口实现启动运行hadoop代码
- 家用电器用户行为分析与事件识别代码详解+修改后运行无误的代码
- hadoop-2.7.1伪分布式模式下命令行运行java文件(hadoop权威指南学习篇)
- hadoop on yarn 入门系列1-伪分布式环境搭建并运行wordcount