您的位置:首页 > 数据库 > Mongodb

Spark - 利用 Spark SQL + MongoDB 对PandaTV主播进行等级分类

2017-04-20 14:34 232 查看

Spark SQL

使用Spark SQL时,最主要的两个组件就是DataFrame和SQLContext。

1. DataFrame

DataFrame是一个分布式的,按照命名列的形式组织的数据集合。与关系型数据库中的数据库表类似。通过调用将DataFrame的内容作为行RDD(RDD of Rows)返回的rdd方法,可以将DataFrame转换成RDD。

可以通过如下数据源创建DataFrame:
已有的RDD
结构化数据文件
JSON数据集
Hive表
外部数据库

2. SQL Context

Spark SQL提供SQLContext封装Spark中的所有关系型功能。

此外,Spark SQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器编写查询语句以及从Hive表中读取数据时使用。在Spark程序中使用HiveContext无需既有的Hive环境

Spark + Mongo的应用场景

对那些已经使用MongoDB的用户,如果你希望在你的MongoDB驱动的应用上提供个性化功能,比如说像Yahoo一样为你找感兴趣的新闻,能够在你的MongoDB数据上利用到Spark强大的机器学习或者流处理,你就可以考虑在MongoDB集群上部署Spark来实现这些功能。
如果你已经使用Hadoop而且数据已经在HDFS里面,你可以考虑使用Spark来实现更加实时更加快速的分析型需求,并且如果你的分析结果有数据量大、格式多变以及这些结果数据要及时提供给前台APP使用的需求,那么MongoDB可以用来作为你分析结果的一个存储方案。
原文:MongoDB + Spark: 完整的大数据解决方案

RDD、DataFrame和DataSet的区别

RDD和DataFrame



RDD和DataSet



DataFrame和DataSet

Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:



原文:RDD、DataFrame和DataSet的区别

练习:根据熊猫TV房间订阅人数对主播进行分类

原始数据为熊猫tv英雄联盟类别直播间信息,共[7257条],部分如下
/* 1 */
{
"_id" : ObjectId("58e77b94a9231916402a4d8b"),
"r_id" : "511386",
"r_name" : "柯柯:6K把德莱文电一王者局",
"r_classification" : {
"cname" : "英雄联盟",
"ename" : "lol"
},
"u_name" : "SteinsGate1",
"u_avatar_url" : "http://i7.pdim.gs/1a1399db3dd5dde23d673204c34d3e89.jpeg",
"time" : "2017-04-07 19:44:20",
"u_follower" : 4896
}

/* 2 */
{
"_id" : ObjectId("58e77b94a9231916402a4d8c"),
"r_id" : "419546",
"r_name" : "大根:国服第一龙王!",
"r_classification" : {
"cname" : "英雄联盟",
"ename" : "lol"
},
"u_name" : "龙大大大大大根",
"u_avatar_url" : "http://i6.pdim.gs/4c0a0e43d7956e9dd8aa8c07df6acb7b.png",
"time" : "2017-04-07 19:44:20",
"u_follower" : 91682
}

/* 3 */
{
"_id" : ObjectId("58e77b94a9231916402a4d8d"),
"r_id" : "7000",
"r_name" : "LPL春季赛Snake vs RNG",
"r_classification" : {
"cname" : "英雄联盟",
"ename" : "lol"
},
"u_name" : "LPL熊猫官方直播",
"u_avatar_url" : "http://i5.pdim.gs/4558e050a485b215de8474e3ab64d04e.png",
"time" : "2017-04-07 19:44:20",
"u_follower" : 846436
}

从mongodb中读取原始数据之后,根据"u_follower"对主播进行分类,类别如下
100 <-> 1000 
1000 <- >1w
1w <-> 10w
10w <-> 100w
100w <-> 1000w

实现如下
import com.mongodb.spark.MongoSpark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

/**
* Created by peerslee on 17-4-20.
*/
object MongoSQL {
def main(args: Array[String]): Unit = {
// 1. 创建SparkContext -> spark 入口
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("MongoSQL")
.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/lol.panda")
.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/lol")
val sc = new SparkContext(conf)
// 2. 创建SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
// 3. 使用MongoSpark创建Dataframe(其他方式创建见文档)
val df = MongoSpark.load(sqlContext)
//    df.show()
// 4. 使用filter根据"u_follower"对直播间进行分类
var border = 100
/*
[rdd => DataFrame]
1. 推断Schema
原理: Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型
*/
val schemaStr = "id,follower,rank"
val schema = StructType(schemaStr.split(",")
.map(column => StructField(column, StringType, true)))
for(i <- 1 to 5) {
//      val rowRdd = df.filter(df("u_follower") > border && df("u_follower") < border*10).show()
/*
2. 指定Schema
2.1 从原来的RDD创建一个元祖或列表的RDD
2.2 用StructType 创建一个和步骤一中创建的RDD中元组或列表的结构相匹配的Schema
2.3 通过SQLContext提供的createDataFrame方法将schema 应用到RDD上
*/
val rowRdd = df.filter(df("u_follower") > border && df("u_follower") < border*10)
.map(room => Row(room(2), room(6).toString, i.toString))
.sortBy(room => room(1).toString.toLong, false)
val rankDF = sqlContext.createDataFrame(rowRdd, schema)
MongoSpark.save(rankDF.write.option("collection", "panda_rank").mode("append"))
border *= 10
}
}
}


rank 库共[1771]条数据,部分如下

/* 1 */
{
"_id" : ObjectId("58f84fbfa830984cc36ef9e0"),
"id" : "6666",
"follower" : "3998068",
"rank" : "5"
}
/* 1 */
{
"_id" : ObjectId("58f84fbea830984cc36ef9b8"),
"id" : "16666",
"follower" : "974142",
"rank" : "4"
}
/* 1 */
{
"_id" : ObjectId("58f84fbea830984cc36ef915"),
"id" : "246868",
"follower" : "99866",
"rank" : "3"
}
/* 1 */
{
"_id" : ObjectId("58f84fbea830984cc36ef558"),
"id" : "357838",
"follower" : "9980",
"rank" : "2"
}

/* 1 */
{
"_id" : ObjectId("58f84fbda830984cc36ef2fb"),
"id" : "633473",
"follower" : "999",
"rank" : "1"
}


等级5的所有主播



参考文档:

MONGODB SPARK CONNECTOR 1.1 Spark SQL

DataFrame
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: