您的位置:首页 > 其它

scala应用-基于user协同过滤的推荐系统

2015-09-29 23:03 357 查看
以下是一个只用scala语言开发的推荐系统,可以参考一下,这个能运行一些小数据集,当然可以改造成多线程,实践中表明,百M级别多线程和分布式的相同配置的机器在运算上并没有很大的时间上的差别。这个要研究scala源码的线程池对象(和java的很类似)。

SimilarityMetrics.scala:

package com.glad.ml

object SimilarityMetrics {
/**
* Calculate Pearson Correlation
* @param u
* @param v
* @return
*/
def pearsonCorrelation(u: Map[Int, Double], v: Map[Int, Double]): Double = {

val xMean = u.values.sum / u.values.size
val yMean = v.values.sum / v.values.size

val diffValues = (u.keys ++ v.keys).map { key =>
// what to do with missing values?
val x = u.getOrElse(key, 0.0)
val y = v.getOrElse(key, 0.0)

(x - xMean, y - yMean)
}

val (cov, sx, sy) = diffValues.foldLeft((0.0, 0.0, 0.0)) { (a, b) =>
val (x, y) = b
(a._1 + (x * y), a._2 + (x * x), a._3 + (y * y))
}
// pearson correlation

cov / (Math.sqrt(sx) * Math.sqrt(sy))
}
}


UserNeighborhoodRecommender.scala:

package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection.immutable.TreeMap

object UserNeighborhoodRecommender {

def recommender(
preferences: Map[Long, Map[Int, Double]],
itemIds: Set[Int],
userIds: Set[Long],
similarityFunction: ((Map[Int, Double], Map[Int, Double]) => Double),
u: Long): List[Tuple2[Int, Double]] = {
/*
for every other user w
compute a similarity s between u and w
retain the top users, ranked by similarity, as a neighbourhood n
for item i in neighbourhood except the ones rated by u
for user v in neighbourhood who has a preference for i
compute a similarity s between u and v
incorporate v's preference for i, weighted by s, into a running average
return the top items, ranked by weighted average

*/

val topN = 2

val neighbours = userIds.filterNot(_ == u).toList.map { w =>
val sim = similarityFunction(preferences(u), preferences(w))
(w, sim)
}.sortBy(_._2).reverse.map(_._1).take(topN)

// find items rated by neighbour and not by u
val itemsInNeighbourhood = neighbours.flatMap { neighbour =>
preferences(neighbour).keys.filterNot(item => preferences(u).contains(item))
}

val weightedPreferences = itemsInNeighbourhood.flatMap { i =>
val ratersOfItem = neighbours.filter(v => preferences(v).contains(i))
ratersOfItem.map { v =>
val sim = similarityFunction(preferences(u), preferences(v))
val pref = preferences(v)(i)
val weightedPref = sim * pref
(i, weightedPref)
}
}

val redommendedItems = weightedPreferences.groupBy(_._1).map { x =>
val (item, weightedPrefs) = x
val sum = weightedPrefs.foldLeft(0.0)((a, b) => a + b._2)
(item, sum / weightedPrefs.size)
}.toList.sortBy(_._2).reverse

redommendedItems
}

}


UserNeighborhoodRecommenderMain.scala:

package com.glad.ml

import scala.io.Source
import java.io.File
import scala.collection.immutable.TreeMap

object UserNeighborhoodRecommenderMain {

def main(args: Array[String]) {
val inputFile = if (args.length > 0) args(0) else "train.csv"
val modelFile = new File(inputFile)
if (!modelFile.exists()) {
println("Please, specify name of file, or put file 'train.csv' into current directory!")
System.exit(1)
}

val src = Source.fromFile(modelFile)
val preferences = src
.getLines
.map { line => line.split(",") }
.filter { e => e.length == 3 }
.map { e => (e(0).toLong, e(1).toInt, e(2).toDouble) }
.foldLeft(TreeMap[Long, TreeMap[Int, Double]]()) { (m, e) =>
val (userId, itemId, preference) = e
val values = m.getOrElse(userId, TreeMap[Int, Double]())
m + (userId -> (values + (itemId -> preference)))
}

val userIds = preferences.keySet
val itemIds = preferences.values.map(x => x.keySet).reduce((x, y) => x union y)
println(userIds)
println(itemIds)
preferences foreach println

val recList = userIds.map{ userId =>
val rs = UserNeighborhoodRecommender.recommender(preferences, itemIds, userIds,
SimilarityMetrics.pearsonCorrelation, userId)
println("Recommendations" + userId + ": ")
rs foreach println
(userId, rs)
}

//TODO 评估推荐效果 recList和testData比较
}
}


如果看过我另外一篇文章Spark RDD转换成其他数据类型就知道这两个是有关联的。因为当初我很傻很天真的以为只要是scala在Spark上就会分布式,然而并不是,想要在Spark分布式运算还是老实用RDD吧。而RDD编码用这种思路是不行的…
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  scala 推荐系统