您的位置:首页 > Web前端

【Hadoop】mahout推荐hadoop偏好矩阵-PreparePreferenceMatrixJob

2013-06-13 21:34 316 查看
mahout推荐包括两部分,一部分是单机版的推荐,主要是以org.apache.mahout.cf.taste.*包下面;另一种则是hadoop版本的推荐主要是以org.apache.mahout.cf.taste.hadoop.*包下面。下面我们针对hadoop版本进行分析。在org.apache.mahout.cf.taste.hadoop.item包下面的RecommenderJob开始。
该job主要分为几个步骤,它首先执行第一个job是PreparePreferenceMatrixJob,该job主要负责构造一个用户的偏好或者叫浏览矩阵;第二个job则是生成协同矩阵,点击查看第二个job计算协同矩阵。最后的输出形式为(itemId, VectorWritable<userId,
pref>)即itemId-> {userId1:score1, userId2:score2}
其中该PreparePreferenceMatrixJob又分为三个步骤完成:

第一个步骤转化itemIDIndex,通过一个mapper与一个reducer完成。

这步主要是将itemId转成一个int。之所以要做,主要是因为它其后面要使用的很多数据结构都是基于int的。如Vector之类还有VectorWritable..
其中mapper与reduce各个输出对应关系为int对应了转化后的long的值。
mapper outkey:VarIntWritable, outvalue:VarLongWritable
reduce outkey:VarIntWritable, outvalue:VarLongWritable
可以看出其实mapper与reducer的输出都是一样子的,但是不知为何reducer还要重复做一遍,直接什么都不做输出就可以了。贴出源码大家感受一下:
long minimumItemID = Long.MAX_VALUE;
for (VarLongWritable varLongWritable : possibleItemIDs) {
long itemID = varLongWritable.get();
if (itemID < minimumItemID) {
minimumItemID = itemID;
}
}
if (minimumItemID != Long.MAX_VALUE) {
context.write(index, new VarLongWritable(minimumItemID));
}


第二个步骤合并。这一步也是通过一个mapper与一个reducer完成。
ToItemPrefsMapper,这个类什么都没做,就是调用了其父类ToEntityPrefsMapper。该mapper的输入还是为原始浏览评分记录,和上一步的最后结果输出没有关系。所以下面的ItemId还是为原始的long类型,而没有采取步骤1中已经转化成int的类型。另外这里面还有一部转置,莫名其妙的一步转置,不做评论。这一步的mapper工作很简单,就是根据booleanData判断是否忽略评分列,对于没有显式的评分的如购买数据。为true的时候则忽略评分列,直接输出long类型的itemId;否则会输出一个EntityPrefWritable对象,该对象封装了一个long类型的id与对应的double类型评分。所以该maper会有两种类型的输出: 

context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));

ToUserVectorsReducer 以long类型的用户id为key,输出成向量形式 (userId, VectorWritable<itemId, pref>)。这里无论上一步的mapper是否输出带有评分列,都会生成pref这一参数。如果上一步没有,则默认为1。注意这里对ItemId又做了和步骤1相同的工作,调用接口把是ItemId转成int的。到校为止我还没看到步骤1到底有什么作用,我能再说一遍真心不知道为何这样设计了吗。具体代码感受一下:
for (VarLongWritable itemPref : itemPrefs) {
int index = TasteHadoopUtils.idToIndex(itemPref.get());
float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
userVector.set(index, value);
}

if (userVector.getNumNondefaultElements() >= minPreferences) {
VectorWritable vw = new VectorWritable(userVector);
vw.setWritesLaxPrecision(true);
context.getCounter(Counters.USERS).increment(1);
context.write(userID, vw);
}


第三个步骤建立浏览矩阵。通过ToItemVectorsMapper与ToItemVectorsReducer完成。

ToItemVectorsMapper的输入是步骤2最后结果的输出,即它的输入文件形式为(userId, VectorWritable<itemId, pref>)。这一步的主要操作是把userId转为为int类型和并把输出由(userId, VectorWritable<itemId, pref>)改为(itemId, VectorWritable<userId, pref>)。即把由userId为key的向量,改为了由itemId为key的向量。itemId与userId都为int类型。其中额外的操作是对要采样的记录个数进行相应的处理。贴出代码感受一下:
int column = TasteHadoopUtils.idToIndex(rowIndex.get());
VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
itemVector.setWritesLaxPrecision(true);

Iterator<Vector.Element> iterator = userRatings.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element elem = iterator.next();
itemVector.get().setQuick(column, elem.get());
ctx.write(new IntWritable(elem.index()), itemVector);
}
ToItemVectorsReducer把上一步mapper输出结果进行合并,把相同itemId下的浏览记录合并到一个向量中。其调用了一个merge方法,代码感受一下:
Vector accumulator = vectors.next().get();
while (vectors.hasNext()) {
VectorWritable v = vectors.next();
if (v != null) {
Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
while (nonZeroElements.hasNext()) {
Vector.Element nonZeroElement = nonZeroElements.next();
accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
}
}
}
return new VectorWritable(accumulator);


在此整个预备工作结束。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息