Spark构建推荐引擎之二:基于Spark Streaming 实时推荐计算
2015-02-03 20:44
731 查看
1.1 数据输入模型
1)用户数据输入数据格式:用户ID,物品ID,点击次数。
2)相似矩阵输入数据格式:
物品ID,物品ID,相似度
1.2 物品相似矩阵
采用SparkContext读取物品的相似矩阵://2 sc 读取相似矩阵
valsimi_path1 ="hdfs://192.168.180.100:9000/data/simi/simi.txt"
valsimi_rdd1 =sc.textFile(simi_path1,10)
valsimi_rdd2 =simi_rdd1.map(line => {
valfileds = line.split(",")
(fileds(0),fileds(1),fileds(2).toDouble)
})
simi_rdd2.cache()
1.3 用户实时评分计算
采用Spark Streaming实时计算用户的评分数据://3
构建Streaming对象
valssc =new StreamingContext(sc,batchDuration)
ssc.checkpoint("hdfs://192.168.180.100:9000/data/check")
//4 用户实时数据收集
//4.1 设置监听目录
valSpath1 =directory
valStream_User_Act1 =ssc.fileStream[LongWritable, Text, TextInputFormat](Spath1).map(_._2.toString)
Stream_User_Act1.checkpoint(slideDuration)
//4.2 用户操作数据——流式窗口处理
//用户ID,物品ID,点击次数
valStream_User_Act2 =Stream_User_Act1.map(line => {
valfileds = line.split(",")
((fileds(0),fileds(1)),fileds(2).toInt)
})
valStream_User_Act3 =Stream_User_Act2.reduceByKeyAndWindow(_ + _, _ -
_,windowDuration,slideDuration)
valStream_User_Act4 =Stream_User_Act3.reduceByKey(_ + _).map(f => (f._1._1,
f._1._2, f._2))
1.4 实时推荐计算
//5实时推荐计算
valRecommend_app1 =Stream_User_Act4.transform(rdd
=> Recommend1(simi_rdd2, rdd,r_number))
def Recommend1(
items_similar: RDD[(String, String, Double)],
user_prefer: RDD[(String, String, Int)],
r_number: Int): (RDD[(String, String, Double)]) = {
// 1 矩阵计算——i行与j列join
valrdd_app1_R2 =items_similar.map(f => (f._1, (f._2,
f._3))).
join(user_prefer.map(f => (f._2, (f._1, f._3))))
// 2 矩阵计算——i行与j列元素相乘
valrdd_app1_R3 =rdd_app1_R2.map(f => ((f._2._2._1,
f._2._1._1), f._2._2._2
* f._2._1._2))
// 3 矩阵计算——用户:元素累加求和
valrdd_app1_R4 =rdd_app1_R3.reduceByKey((x, y) => x + y).map(f => (f._1._1,
(f._1._2, f._2)))
// 4 矩阵计算——用户:用户对结果排序,过滤
valrdd_app1_R5 =rdd_app1_R4.groupByKey()
valrdd_app1_R6 =rdd_app1_R5.map(f => {
vali2 = f._2.toBuffer
vali2_2 =i2.sortBy(_._2)
if (i2_2.length > r_number)i2_2.remove(0,
(i2_2.length - r_number))
(f._1, i2_2.toIterable)
})
valrdd_app1_R7 =rdd_app1_R6.flatMap(f => {
valid2 = f._2
for (w <-id2)
yield (f._1,w._1,w._2)
})
rdd_app1_R7.map(f => f._1).distinct.count
rdd_app1_R7
}
1.5 结果输出
//6 结果输出valoutpath ="hdfs://192.168.180.79:9000/output/recommend"
Recommend_app1.saveAsTextFiles(outpath,"rdd")
//7 监听启动
ssc.start()
ssc.awaitTermination()
转载请注明出处:
/article/1477375.html
相关文章推荐
- 如何基于 Spark Streaming 构建实时计算平台
- 如何基于 Spark Streaming 构建实时计算平台
- 如何基于 Spark Streaming 构建实时计算平台
- 潘国庆:基于 Spark Streaming 构建实时计算平台实战解析
- 构建基于Spark的推荐引擎(python实现)
- Spark-构建基于Spark的推荐引擎
- Spark学习笔记——构建基于Spark的推荐引擎
- 基于Spark构建推荐引擎
- 基于spark-streaming实时推荐系统( 二)
- 基于spark-streaming实时推荐系统(三)
- 基于spark-streaming实时推荐系统(一)
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算 + Spark 基于pyspark下的实时日志分析
- 构建基于Spark的推荐引擎(Python)
- 读书笔记:构建基于Spark的推荐引擎
- 《Spark机器学习》第4章--构建基于Spark的推荐引擎
- 基于Spark构建推荐引擎之一:基于物品的协同过滤推荐
- 基于Azure构建PredictionIO和Spark的推荐引擎服务
- Spark Streaming实时计算框架介绍
- mahout入门实例-基于 Apache Mahout 构建社会化推荐引擎-实战(参考IBM)
- 基于 Apache Mahout 构建社会化推荐引擎