您的位置:首页 > 大数据

Spark商业案例与性能调优实战100课》第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技巧

2017-01-06 21:49 1146 查看
Spark商业案例与性能调优实战100课》第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技

源代码

package com.dt.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

object Movie_Users_Analyzer {
def main(args: Array[String]): Unit = {
var masterUrl = "local[4]"
var dataPath = "data/movielens/medium/"
if (args.length > 0) {
masterUrl = args(0)
} else if (args.length > 1) {
dataPath = args(1)
}
val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer"))
val usersRDD = sc.textFile(dataPath + "users.dat")
val moviessRDD = sc.textFile(dataPath + "movies.dat")
val occupationsRDD = sc.textFile(dataPath + "occupation.dat")
val ratingsRDD = sc.textFile(dataPath + "ratings.dat")

val usersBasic = usersRDD.map(_.split("::")).map { user =>
(
user(3),
(user(0), user(1), user(2))
)
}

val occupations = occupationsRDD.map(_.split("::")).map(job => (job(0), job(1)))
val userInformation = usersBasic.join(occupations)
userInformation.cache()
for (elem <- userInformation.collect()) {
println(elem)
}

val targetMoive = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1))).filter(_._2.equals("1193"))
val targetUsers = userInformation.map(x => (x._2._1._1, x._2))
val userInformationForSpecificMovie = targetMoive.join(targetUsers)
for (elem <- userInformationForSpecificMovie.collect()) {
println(elem)
}

//users.dat   UserID::Gender::Age::Occupation::Zip-code
//ratings.dat  UserID::MovieID::Rating::Timestamp
//Occupation    6:  "doctor/health care"
// movies.dat  MovieID::Title::Genres

val ratings = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache()
ratings.map(x => (x._2, (x._3.toInt, 1)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 总分,总人数
.map(x => (x._2._1.toDouble / x._2._2, x._1))
.sortByKey(false)
.take(10)
.foreach(println)
//观看人数最多的电影  //ratings.dat  UserID::MovieID::Rating::Timestamp
ratings.map(x => (x._1, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false)
.take(10).foreach(print)

//1,"users.dat":UserID::Gender::Age::OccupationID::Zip-code
//2,"movies.dat":MovieID::Title::Genres
val male = "M"
val female = "F"
//ratings.dat  UserID::MovieID::Rating::Timestamp
val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3))).join(
usersRDD.map(_.split("::")).map(x => (x(0), x(1)))).cache()
genderRatings.take(10).foreach(println)
val maleRatings = genderRatings.filter(x => x._2._2.equals("M")).map(x => x._2._1)
val femaleRatings = genderRatings.filter(x => x._2._2.equals("F")) map (x => x._2._1)

maleRatings.map(x => (x._2, (x._3.toInt, 1)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 总分,总人数
.map(x => (x._2._1.toDouble / x._2._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.take(10)
.foreach(println)
//

femaleRatings.map(x => (x._2, (x._3.toInt, 1)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 总分,总人数
.map(x => (x._2._1.toDouble / x._2._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.take(10)
.foreach(println)

}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐