您的位置:首页 > 编程语言

mahout Taste 代码分析, 可运行在hadoop上的分布式版本

2013-03-20 20:55 274 查看
1. 将itemid从long型装换成int型,然后找出最小的itemindex的值,具体可以看map和reduce的代码,这里不多写。下面是这个job的代码部分。前边的代码都是一些参数的配置,略过。

/*
* Job 1: itemid -> itemindex
*/
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job itemIDIndex = prepareJob(
inputPath, itemIDIndexPath, TextInputFormat.class,
ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
SequenceFileOutputFormat.class);
itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
itemIDIndex.waitForCompletion(true);
}
2. userid, itemid, prefs => userid, <itemid, prefs> => userid, vector<itemindex, prefs>

/*
* Job 2: userid itemid pref -> userid vector<itemid pref>
*/
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job toUserVector = prepareJob(
inputPath, userVectorPath, TextInputFormat.class,
ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
toUserVector.getConfiguration().setBoolean(BOOLEAN_DATA, booleanData);
toUserVector.getConfiguration().setInt(ToUserVectorReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser);
toUserVector.waitForCompletion(true);
}


ToItemPrefsMapper继承自ToEntityPrefsMapper,这里主要是为了区分item-based 和 user-based 的方法,如果item-based的话,key就是userid。因此有了transpose用来转置我们输入的数据(userid, itemid).

public abstract class ToEntityPrefsMapper extends
Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {

public static final String TRANSPOSE_USER_ITEM = "transposeUserItem";

private static final Pattern DELIMITER = Pattern.compile("[\t,]");

private boolean booleanData;
private boolean transpose;
private final boolean itemKey;

ToEntityPrefsMapper(boolean itemKey) {
this.itemKey = itemKey;
}

@Override
protected void setup(Context context) {
Configuration jobConf = context.getConfiguration();
booleanData = jobConf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
transpose = jobConf.getBoolean(TRANSPOSE_USER_ITEM, false);
}

@Override
public void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = ToEntityPrefsMapper.DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
long itemID = Long.parseLong(tokens[1]);
/*
* 转置的部分,如果是user-based cf 的算法,itemid将作为我们以后计算的key值进行推荐
*/
if (itemKey ^ transpose) {
// If using items as keys, and not transposing items and users, then users are items!
// Or if not using items as keys (users are, as usual), but transposing items and users,
// then users are items! Confused?
long temp = userID;
userID = itemID;
itemID = temp;
}
if (booleanData) {
context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) : 1.0f;
context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}

}


ToUserVectorReducer,主要将相同userid的<itemid, Prefs>组成一个vector, 其中MIN_PREFERENCES_PRE_USER被设置为对于一个user来说,至少要有多少“购物”的经历,才能被认为是一个合格的研究点放在推荐系统中被研究。

public final class ToUserVectorReducer extends
Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {

public static final String MIN_PREFERENCES_PER_USER = ToUserVectorReducer.class.getName() + ".minPreferencesPerUser";

private int minPreferences;

@Override
protected void setup(Context ctx) throws IOException, InterruptedException {
super.setup(ctx);
minPreferences = ctx.getConfiguration().getInt(MIN_PREFERENCES_PER_USER, 1);
}

@Override
protected void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs,
Context context) throws IOException, InterruptedException {
// 新建一个RandomAccessSparseVector,用来封装所有的<itemindex, prefs>
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
int index = TasteHadoopUtils.idToIndex(itemPref.get());
float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
userVector.set(index, value);
}

if (userVector.getNumNondefaultElements() >= minPreferences) {
VectorWritable vw = new VectorWritable(userVector);
vw.setWritesLaxPrecision(true);
context.write(userID, vw);
}
}

}


3. 计算user数目,在RecommenderJob中的代码是:这部分和整个项目是独立的,即使再主程序中注销掉这个程序,程序依然可以正常执行,是整个taste中的一个附加功能,能够计算出有多少user.

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job countUsers = prepareJob(userVectorPath,
countUsersPath,
SequenceFileInputFormat.class,
CountUsersMapper.class,
CountUsersKeyWritable.class,
VarLongWritable.class,
CountUsersReducer.class,
VarIntWritable.class,
NullWritable.class,
TextOutputFormat.class);
countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class);
countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class);
countUsers.waitForCompletion(true);
}


map:org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersMapper

public class CountUsersMapper extends
Mapper<VarLongWritable,VectorWritable,CountUsersKeyWritable,VarLongWritable> {

@Override
protected void map(VarLongWritable key,
VectorWritable value,
Context context) throws IOException, InterruptedException {
long userID = key.get();
context.write(new CountUsersKeyWritable(userID), new VarLongWritable(userID));
}

}


CountUsersKeyWritable 是用户自己定义的可序列化的一个类,其实与VarLongWritable相似,就是将一个Long型的变量序列化。

从代码中可见map的结果是userid, vector<itemindex, prefs> 变成 userid, userid

Reducer: org.apache.mahout.cf.taste.hadoop.similarity.item.CountUsersReducer

protected void reduce(CountUsersKeyWritable key,
Iterable<VarLongWritable> userIDs,
Context context) throws IOException, InterruptedException {

long lastSeenUserID = Long.MIN_VALUE;
int numberOfUsers = 0;

for (VarLongWritable writable : userIDs) {
long currentUserID = writable.get();
if (currentUserID > lastSeenUserID) {
lastSeenUserID = currentUserID;
numberOfUsers++;
}
}
context.write(new VarIntWritable(numberOfUsers), NullWritable.get());
}
计算user的数量,现在有疑问是为什么多做了一层判断。还有numberOfUsers是局部变量吧

4. 计算item的user偏好量,job配置的代码如下:

Job maybePruneAndTransponse = prepareJob(userVectorPath,
itemUserMatrixPath,
SequenceFileInputFormat.class,
MaybePruneRowsMapper.class,
IntWritable.class,
DistributedRowMatrix.MatrixEntryWritable.class,
ToItemVectorsReducer.class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class);
maybePruneAndTransponse.getConfiguration().setInt(MaybePruneRowsMapper.MAX_COOCCURRENCES,
maxCooccurrencesPerItem);
maybePruneAndTransponse.waitForCompletion(true);


map: org.apache.mahout.cf.taste.hadoop.MaybePruneRowsMapper

protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector vector = vectorWritable.get();
countSeen(vector);

int numElementsBeforePruning = vector.getNumNondefaultElements();
vector = maybePruneVector(vector);
int numElementsAfterPruning = vector.getNumNondefaultElements();

ctx.getCounter(Elements.USED).increment(numElementsAfterPruning);
ctx.getCounter(Elements.NEGLECTED).increment(numElementsBeforePruning - numElementsAfterPruning);

DistributedRowMatrix.MatrixEntryWritable entry = new DistributedRowMatrix.MatrixEntryWritable();
int colIndex = TasteHadoopUtils.idToIndex(rowIndex.get());
entry.setCol(colIndex);
Iterator<Vector.Element> iterator = vector.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element elem = iterator.next();
entry.setRow(elem.index());
entry.setVal(elem.get());
ctx.write(new IntWritable(elem.index()), entry);
}
}


代码比较复杂,基本的意思是将userid vector<item_index, prefs>的形式变成item_index <user_index, prefs>。注意这里不是vector的形式。新建了DistributedRowMatrix的可序列化矩阵,但是其实,该矩阵只有一个元素,设置了行为item_index, 列是user_index。当mapper执行完之后,会进行排序,这样能够将item_index相同的条目放在一起,从而能够在reducer中实现,与Job2相同,放在一个RandomAccessSparseVector中。

个人认为如果不计算后边的权重,是不需要计算item的偏好量的,而很多相似性的算法是不需要计算权重的,所以可以酌情考虑是否需要这个Job,打算自己写推荐系统的时候去掉这部分代码。欢迎大家讨论。

这部分在主函数中以一个任务RowSimilarityJob的形式展现出来,这个job被分配成三个mapreduce的来执行

try {
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
"-Dmapred.input.dir=" + itemUserMatrixPath,
"-Dmapred.output.dir=" + similarityMatrixPath,
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem + 1),
"--tempDir", tempDirPath.toString() });
} catch (Exception e) {
throw new IllegalStateException("item-item-similarity computation failed", e);
}


5. 计算weight,这部分的代码我还没有分析清楚,因为我查了similarity.weight(),发现大部分相似性的代码中都没有这部分代码,所以我不是很清楚这个代码的任务。

但是经过这个代码,我们能够看到数据结构的变化。

itemindex, vector<userindex, prefs> => userindex, WeightedOccurrence<itemindex,prefs, weight>。 其中weightoccurrence就是一个简单的数据封装

userindex, WeightedOccurrence<itemindex,prefs, weight> => userindex, WeightedOccurrenceArray, 这个数组的数据项是WeightedOccurrence<itemindex, prefs, weight>

6.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: