您的位置:首页 > 其它

Spark构建推荐引擎之二:基于Spark Streaming 实时推荐计算

2015-02-03 20:44 731 查看

1.1 数据输入模型

1)用户数据输入数据格式:
用户ID,物品ID,点击次数。

2)相似矩阵输入数据格式:

物品ID,物品ID,相似度

1.2 物品相似矩阵

采用SparkContext读取物品的相似矩阵:


//2 sc 读取相似矩阵

valsimi_path1 ="hdfs://192.168.180.100:9000/data/simi/simi.txt"

valsimi_rdd1 =sc.textFile(simi_path1,10)

valsimi_rdd2 =simi_rdd1.map(line => {

valfileds = line.split(",")

(fileds(0),fileds(1),fileds(2).toDouble)

})
simi_rdd2.cache()

1.3 用户实时评分计算

采用Spark Streaming实时计算用户的评分数据:

//3
构建Streaming对象

valssc =new StreamingContext(sc,batchDuration)

ssc.checkpoint("hdfs://192.168.180.100:9000/data/check")


//4 用户实时数据收集

//4.1 设置监听目录

valSpath1 =directory

valStream_User_Act1 =ssc.fileStream[LongWritable, Text, TextInputFormat](Spath1).map(_._2.toString)

Stream_User_Act1.checkpoint(slideDuration)

//4.2 用户操作数据——流式窗口处理

//用户ID,物品ID,点击次数

valStream_User_Act2 =Stream_User_Act1.map(line => {

valfileds = line.split(",")

((fileds(0),fileds(1)),fileds(2).toInt)

})

valStream_User_Act3 =Stream_User_Act2.reduceByKeyAndWindow(_ + _, _ -
_,windowDuration,slideDuration)

valStream_User_Act4 =Stream_User_Act3.reduceByKey(_ + _).map(f => (f._1._1,
f._1._2, f._2))

1.4 实时推荐计算

//5
实时推荐计算
valRecommend_app1 =Stream_User_Act4.transform(rdd
=> Recommend1(simi_rdd2, rdd,r_number))

def Recommend1(

items_similar: RDD[(String, String, Double)],

user_prefer: RDD[(String, String, Int)],

r_number: Int): (RDD[(String, String, Double)]) = {

// 1 矩阵计算——i行与j列join

valrdd_app1_R2 =items_similar.map(f => (f._1, (f._2,
f._3)))
.

join(user_prefer.map(f => (f._2, (f._1, f._3))))

// 2 矩阵计算——i行与j列元素相乘

valrdd_app1_R3 =rdd_app1_R2.map(f => ((f._2._2._1,
f._2._1._1), f._2._2._2
* f._2._1._2))

// 3 矩阵计算——用户:元素累加求和

valrdd_app1_R4 =rdd_app1_R3.reduceByKey((x, y) => x + y).map(f => (f._1._1,
(f._1._2, f._2)))

// 4 矩阵计算——用户:用户对结果排序,过滤

valrdd_app1_R5 =rdd_app1_R4.groupByKey()

valrdd_app1_R6 =rdd_app1_R5.map(f => {

vali2 = f._2.toBuffer

vali2_2 =i2.sortBy(_._2)

if (i2_2.length > r_number)i2_2.remove(0,
(i2_2.length - r_number))

(f._1, i2_2.toIterable)

})

valrdd_app1_R7 =rdd_app1_R6.flatMap(f => {

valid2 = f._2

for (w <-id2)
yield (f._1,w._1,w._2)

})

rdd_app1_R7.map(f => f._1).distinct.count

rdd_app1_R7
}

1.5 结果输出

//6 结果输出

valoutpath ="hdfs://192.168.180.79:9000/output/recommend"

Recommend_app1.saveAsTextFiles(outpath,"rdd")

//7 监听启动

ssc.start()
ssc.awaitTermination()


转载请注明出处:

/article/1477375.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: