您的位置:首页 > 其它

spark mllib als推荐引擎学习

2016-06-14 10:43 246 查看
推荐算法可以分为:UserCF(基于用户的协同过滤)、ItemCF(基于商品的协同过滤)、ModelCF(基于模型的协同过滤),具体介绍见:http://blog.csdn.net/ygrx/article/details/15501679

spark中的协同过滤算法用的是ALS算法,叫做矩阵分解,其实就是补充二维矩阵中缺失的打分,具体见:http://www.dataguru.cn/article-7049-1.html


Spark ALS推荐实例

我们接下去通过代码来描述下利用spark的mllib如何进行商户或者用户的推荐,假设我们的数据如下:

3563904759267333 4731362622801081

3563904759484437 8360756622809611

3563904759746579 1021362622803781

字段1是用户id,第二个字段是商户id,第三个就是打分

[java] view
plain copy

val conf = new SparkConf()  

  .setAppName("Mid Recommend")  

  .set("spark.executor.extraClassPath", "/opt/cloudera/parcels/CDH/lib/hbase/lib/*")  

val sc = new SparkContext(conf)  

这段代码描述了构建SparkContext,同时加入了hbase需要用的jar包,因为我们的数据需要存入hbase

[java] view
plain copy

val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh")  

  

var count = 0  

val cardMapping = trans.map { line =>  

  val field = line.split("\t")  

  field(0)  

}.distinct(1).map { line =>  

  count += 1  

  (count.toString, line)  

}  

  

count = 0  

val midMapping = trans.map { line =>  

  val field = line.split("\t")  

  field(1)  

}.distinct(1).map { line =>  

  count += 1  

  (line, count.toString)  

}  

  

saveToHbase(midMapping.map(convert), "mid_mapping")  

saveToHbase(cardMapping.map(convert), "card_mapping")  

  

cardMapping.map { line =>  

  line._1 + "\t" + line._2  

}.saveAsTextFile("/<span style="font-family: Arial, Helvetica, sans-serif;">xxx</span><span style="font-family: Arial, Helvetica, sans-serif;">/recommend/mapping/card_mapping")</span>  

midMapping.map { line =>  

  line._1 + "\t" + line._2  

}.saveAsTextFile("/xxx/recommend/mapping/mid_mapping")  

[java] view
plain copy

val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh")  

val cardMapping = sc.textFile("/xxx/recommend/mapping/card_mapping").map { line =>  

  val field = line.split("\t")  

  (field(1), field(0))  

}  

val midMapping = sc.textFile("/xxx/recommend/mapping/mid_mapping").map { line =>  

  val field = line.split("\t")  

  (field(0), field(1))  

}  

  

// 进行join  

val rs = trans.map { line =>  

  val field = line.split("\t")  

  (field(0), field(1) + "_" + field(2))  

}.join(cardMapping).map { line =>  

  val field = line._2._1.split("_")  

  (field(0), line._1 + "_" + line._2._2 + "_" + field(1))  

}.join(midMapping).map { line =>  

  val field = line._2._1.split("_")  

  field(1) + "\t" + line._2._2 + "\t" + field(2)  

}  

  

rs.saveAsTextFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh_mapping")  

由于mllib的Rating类的user和product只能是int,而我们的user和product是String类型,所以需要做一个转换,上述代码将原始数据进行了int和String的mapping,最后我们的数据格式为:241893846761,第一个表示用户id,第二个表示商品id,第三个字段表示打分

[java] view
plain copy

val trans = sc.textFile("/xxx/recommend/card_consume_info_sh/card_consume_info_sh_mapping")  

  

val ratings = trans.map { line =>  

  val Array(cardMapping, midMapping, rating) = line.split("\t")  

  Rating(cardMapping.toInt, midMapping.toInt, rating.toDouble)  

}.persist  

  

// 使用ALS训练数据建立推荐模型  

val rank = 8  

val numIterations = 10  

val model = ALS.train(ratings, rank, numIterations, 0.01)  

  

// 从rating中获取user以及product数据集  

val usersProducts = ratings.map {  

  case Rating(user, product, rate) => (user, product)  

}  

  

// 使用推荐模型预对用户和商品进行评分,得到预测评分的数据集  

val predictions = model.predict(usersProducts).map {  

  case Rating(user, product, rate) => ((user, product), rate)  

}  

  

// 真实数据和预测数据进行合并  

val ratesAndPredicts = ratings.map {  

  case Rating(user, product, rate) => ((user, product), rate)  

}.join(predictions)  

  

val MSE = ratesAndPredicts.map { case ((user, product), (r1, r2)) =>  

  val err = (r1 - r2)  

  err * err  

}.mean()  

这段代码我们通过ALS进行模型的训练,主要有3个参数,具体见http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html

这段代码我们只试了3个参数中的一个情况,然后计算MSE值,这个值越小,说明该model越接近正确值,具体模型的评估同样可以见上个博客http://blog.javachen.com/2015/06/01/how-to-implement-collaborative-filtering-using-spark-als.html

其实我们这里可以用3个参数多维度的数据进行循环训练模型,计算出MSE值最小的那个model就可以了~

[java] view
plain copy

// 为每个商户进行推荐  

val products = ratings.map(_.product).distinct(10).collect()  

  

val rawData = new mutable.HashMap[String, String]()  

products.foreach(  

  product => {  

    // 依次为商品推荐用户  

    val rs = model.recommendUsers(product, 10)  

    var value = ""  

    var key = 0  

    // 拼接推荐结果  

    rs.foreach(r => {  

      key = r.product  

      value = value + r.user + ":" + r.rating + ","  

    })  

    rawData.put(key.toString, value.substring(0, value.length - 1))  

  }  

)  

// 存进hbase  

val rs = sc.parallelize(rawData.toSeq).map(convert)  

saveToHbase(rs, "mid_recommend")  

这段代码是为每个商户进行离线推荐,不建议数据量很大,要不然撑爆内存,高版本提供了离线全部推荐的api,只是我的版本是CDH5.4.4,Spark还是1.3.0的版本,所以不支持这个api

[java] view
plain copy

def saveModel(sc: SparkContext, model: MatrixFactorizationModel): Unit = {  

  model.save(sc, "/xxx/recommend/model")  

}  

  

def loadModel(sc: SparkContext): MatrixFactorizationModel = {  

  return MatrixFactorizationModel.load(sc, "/xxx/recommend/model")  

}  

如果我们需要实时推荐,那么就需要将训练好的模型保存下来,然后需要用的时候再load下,然后用recommendUsers进行推荐。

实时推荐引擎的思考

上段最后提到了实时推荐引擎,有如下的思路,仅供参考:
推荐最重要的就是从初始数据中用算法进行打分,这个根据业务的不同设计的方法不同,然后根据打好分的数据进行模型的训练,训练完了保存下来,需要进行推荐的时候再load过来进行实时推荐。
主要流程:原始数据经过ETL处理生成结构化数据然后用算法进行打分,这块耗时比较长,每天可以凌晨进行ETL抽取;然后根据打分好的数据进行模型的训练,这段时间在ETL之后,需要时间也不短;最后保存模型;加载模型进行实时推荐
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: