综合案例 第80课:Spark SQL网站搜索综合案例实战 以京东找出搜索平台上用户每天搜索排名5名的产品,The hottest!
2016-04-19 19:25
831 查看
* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20:00YY频道现场授课频道68917580
/**
*@author DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains/
*Created by hp on 2016/4/1.
* 项目:以京东找出搜索平台上用户每天搜索排名5名的产品,Thehottest!
* 元数据:Date、UserID、Item、City、Device;
* 总体思路:混合使用了Spark SQL和Spark Core的内容
* 第一步:原始的ETL,过滤数据后产生目标,在实际企业中可能过滤条件非常复杂(进行广播),使用RDD的filter等进行操作;
* 第二步:对过滤后的目标数据进行指定条件的查询,查询条件有可能非常复杂(进行广播),使用RDD的filter算子;
* 第三步:由于商品是分为种类的,我们在得出最终结果的之前,首先会基于商品进行UV(当然你也可以对用户访问商品的PV进行分析);
* 此时我们要对商品进行UV计算的话,必须构建K-V的RDD,例如过程成为
(date#Item,userID)以方便进行groupByKey操作
* 在调用了groupByKey之后对user进行去重,并计算出每一天每一种商品的UV,最终计算出来的结果的数据类型(date#Item,UV)
* 第四步:使用开窗函数row_number统计出每日商品UV前5名的内容,
* row_number() OVER (PARTITION BY date ORDER BY UV DESC) rank
* 此时会产生以date、item、uv为Row的DataFrame
* 第五步:DataFrame转成RDD,根据日期进行分组并分析出每天排名前5为的热搜索Item;
* 第六步:进行Key-Value交换,然后进行调用sortByKey进行点击热度排名;
* 第七步:再次进行Key-Value交换,得出目标数据为(date#Item,UV)的格式;
* 第八步:通过RDD直接操作Mysql等把结果放入生产系统中的DB中,在通过Java
EE等Server技术可视化结果以供市场
* 营销人员、仓库调度系统、快递系统、管理决策人员使用数据创造价值;
* 当然也可以放在Hive中,Java EE等技术通过JDBC等连接访问Hive;
* 当然也可以就放在Spark SQL中,通过Thrift技术供Jave EE使用等;
* 当时,如果是像双十一等时候,一般首选放在Redis中,这样可以实现类似秒杀系统的响应速度!
*/
运行结果!
综合作业 第80课Spark SQL网站搜索综合案例实战
以京东为例找出搜索平台上用户每天搜索排名5名的产品,The hottest!
用户登录京东网站,在搜索栏搜索的时候,将用户每天搜索排名前5名的商品列出来。
一:生成模拟京东用户搜索的测试数据。
SparkSQLUserlogsHottest.log测试数据文件包括时间、用户id、商品、地点、设备信息 10000条数据
二:根据测试数据实现搜索平台上用户每天搜索排名5名的产品。
创建JavaSparkContext及HiveContext
sc.textFile读入测试数据文件,生成JavaRDD<String>类型的数据集合line0。
sc.broadcast定义广播变量,用于测试数据的查询及过滤。
lines0.filter使用匿名接口函数Function的call方法过滤出包含广播变量的数据,生成JavaRDD<String> 类型的数据集合lines。
测试验证点:
测试数据广播变量的过滤是否成功?打印出lines的数据验证。
lines.mapToPair 使用匿名接口函数PairFunction的call方法对lines中的数据按"\t"进行分割,分割以后将(date#Item#userID)三个字段合并为Key, 将 Value值设置为1,形成 K,V键值对。返回的结果pairs类型为JavaPairRDD<String, Integer>。相当于hadoop mapreduce中的map,将某天某用户点击某商品的次数计数为1。
测试验证点:
测试(date#Item#userID)三个字段合并为Key成功了吗?k,v是否符合预期?
pairs.reduceByKey使用匿名接口函数Function2的call方法对pairs中数据进行reduce汇总,将pairs具有相同key值的数据,累加统计数值,返回的结果reduceedPairs 的类型为JavaPairRDD<String, Integer>。相当于hadoop mapreduce中的reduce,将某天某用户点击某商品的所有点击次数累加汇总。
测试验证点:
测试(date#Item#userID) k,v的reduce统计是否成功?
将reduceedRow中的每行数据拆分,reduceedRow的key值分割为三个字段:时间、用户id、商品,将reduceedRow的商品累计点击数值value记为count,然后将这四个字段时间、用户id、商品、count再拼接成json格式放到peopleInformations列表里面,打印输出。peopleInformations类型为List<String>,为json格式。
sc.parallelize(peopleInformations)将scala空间的变量peopleInformations转换为Spark RDD空间的变量peopleInformationsRDD,类型为JavaRDD<String>
sqlContext.read().json(peopleInformationsRDD)通过内容为JSON的RDD来构造peopleInformationsDF,类型为DataFrame
DataFrame注册成为临时表
peopleInformationsDF.registerTempTable("peopleInformations")
窗口函数:使用子查询的方式完成目标数据的提取,在目标数据内幕使用窗口函数row_number来进行分组排序, PARTITION BY :指定窗口函数分组的Key, ORDER BY:分组后进行排序;
String sqlText = "SELECT UserID,Item, count "
+ "FROM ("
+ "SELECT "
+ "UserID,Item, count,"
+ "row_number() OVER (PARTITION BY UserID ORDER BY count DESC) rank"
+" FROM peopleInformations "
+ ") sub_peopleInformations "
+ "WHERE rank <= 3 " ;
Sql查询语句:
使用窗口函数,查询peopleInformations表,按照用户分组,将同一用户的每天的点击的商品数排名,形成一张子表sub_peopleInformations。
然后从排名后的子表sub_peopleInformations中,查询出前三名的用户点击的商品和计数。
测试验证点:
验证窗口函数sql语句执行是否成功?
sqlContext.sql(sqlText)执行sql查询,execellentNameAgeDF.show()显示sql查询结果。
将execellentNameAgeDF的结果保存为json文件格式
execellentNameAgeDF.write().format("json").save();
三:源代码SparkSQLUserlogsHottestDataManually.java和SparkSQLUserlogsHottest.java
/**
*@author DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains/
*Created by hp on 2016/4/1.
* 项目:以京东找出搜索平台上用户每天搜索排名5名的产品,Thehottest!
* 元数据:Date、UserID、Item、City、Device;
* 总体思路:混合使用了Spark SQL和Spark Core的内容
* 第一步:原始的ETL,过滤数据后产生目标,在实际企业中可能过滤条件非常复杂(进行广播),使用RDD的filter等进行操作;
* 第二步:对过滤后的目标数据进行指定条件的查询,查询条件有可能非常复杂(进行广播),使用RDD的filter算子;
* 第三步:由于商品是分为种类的,我们在得出最终结果的之前,首先会基于商品进行UV(当然你也可以对用户访问商品的PV进行分析);
* 此时我们要对商品进行UV计算的话,必须构建K-V的RDD,例如过程成为
(date#Item,userID)以方便进行groupByKey操作
* 在调用了groupByKey之后对user进行去重,并计算出每一天每一种商品的UV,最终计算出来的结果的数据类型(date#Item,UV)
* 第四步:使用开窗函数row_number统计出每日商品UV前5名的内容,
* row_number() OVER (PARTITION BY date ORDER BY UV DESC) rank
* 此时会产生以date、item、uv为Row的DataFrame
* 第五步:DataFrame转成RDD,根据日期进行分组并分析出每天排名前5为的热搜索Item;
* 第六步:进行Key-Value交换,然后进行调用sortByKey进行点击热度排名;
* 第七步:再次进行Key-Value交换,得出目标数据为(date#Item,UV)的格式;
* 第八步:通过RDD直接操作Mysql等把结果放入生产系统中的DB中,在通过Java
EE等Server技术可视化结果以供市场
* 营销人员、仓库调度系统、快递系统、管理决策人员使用数据创造价值;
* 当然也可以放在Hive中,Java EE等技术通过JDBC等连接访问Hive;
* 当然也可以就放在Spark SQL中,通过Thrift技术供Jave EE使用等;
* 当时,如果是像双十一等时候,一般首选放在Redis中,这样可以实现类似秒杀系统的响应速度!
*/
运行结果!
综合作业 第80课Spark SQL网站搜索综合案例实战
以京东为例找出搜索平台上用户每天搜索排名5名的产品,The hottest!
用户登录京东网站,在搜索栏搜索的时候,将用户每天搜索排名前5名的商品列出来。
一:生成模拟京东用户搜索的测试数据。
SparkSQLUserlogsHottest.log测试数据文件包括时间、用户id、商品、地点、设备信息 10000条数据
二:根据测试数据实现搜索平台上用户每天搜索排名5名的产品。
创建JavaSparkContext及HiveContext
sc.textFile读入测试数据文件,生成JavaRDD<String>类型的数据集合line0。
sc.broadcast定义广播变量,用于测试数据的查询及过滤。
lines0.filter使用匿名接口函数Function的call方法过滤出包含广播变量的数据,生成JavaRDD<String> 类型的数据集合lines。
测试验证点:
测试数据广播变量的过滤是否成功?打印出lines的数据验证。
lines.mapToPair 使用匿名接口函数PairFunction的call方法对lines中的数据按"\t"进行分割,分割以后将(date#Item#userID)三个字段合并为Key, 将 Value值设置为1,形成 K,V键值对。返回的结果pairs类型为JavaPairRDD<String, Integer>。相当于hadoop mapreduce中的map,将某天某用户点击某商品的次数计数为1。
测试验证点:
测试(date#Item#userID)三个字段合并为Key成功了吗?k,v是否符合预期?
pairs.reduceByKey使用匿名接口函数Function2的call方法对pairs中数据进行reduce汇总,将pairs具有相同key值的数据,累加统计数值,返回的结果reduceedPairs 的类型为JavaPairRDD<String, Integer>。相当于hadoop mapreduce中的reduce,将某天某用户点击某商品的所有点击次数累加汇总。
测试验证点:
测试(date#Item#userID) k,v的reduce统计是否成功?
将reduceedRow中的每行数据拆分,reduceedRow的key值分割为三个字段:时间、用户id、商品,将reduceedRow的商品累计点击数值value记为count,然后将这四个字段时间、用户id、商品、count再拼接成json格式放到peopleInformations列表里面,打印输出。peopleInformations类型为List<String>,为json格式。
sc.parallelize(peopleInformations)将scala空间的变量peopleInformations转换为Spark RDD空间的变量peopleInformationsRDD,类型为JavaRDD<String>
sqlContext.read().json(peopleInformationsRDD)通过内容为JSON的RDD来构造peopleInformationsDF,类型为DataFrame
DataFrame注册成为临时表
peopleInformationsDF.registerTempTable("peopleInformations")
窗口函数:使用子查询的方式完成目标数据的提取,在目标数据内幕使用窗口函数row_number来进行分组排序, PARTITION BY :指定窗口函数分组的Key, ORDER BY:分组后进行排序;
String sqlText = "SELECT UserID,Item, count "
+ "FROM ("
+ "SELECT "
+ "UserID,Item, count,"
+ "row_number() OVER (PARTITION BY UserID ORDER BY count DESC) rank"
+" FROM peopleInformations "
+ ") sub_peopleInformations "
+ "WHERE rank <= 3 " ;
Sql查询语句:
使用窗口函数,查询peopleInformations表,按照用户分组,将同一用户的每天的点击的商品数排名,形成一张子表sub_peopleInformations。
然后从排名后的子表sub_peopleInformations中,查询出前三名的用户点击的商品和计数。
测试验证点:
验证窗口函数sql语句执行是否成功?
sqlContext.sql(sqlText)执行sql查询,execellentNameAgeDF.show()显示sql查询结果。
将execellentNameAgeDF的结果保存为json文件格式
execellentNameAgeDF.write().format("json").save();
三:源代码SparkSQLUserlogsHottestDataManually.java和SparkSQLUserlogsHottest.java
相关文章推荐
- 架构师之路:文章记录已分享
- 学习Arm指令的网站汇总
- 50个c/c++源代码网站
- ISO-OSI的七层协议经典架构
- 转:在线流程图制做网站
- Android官方MVP架构示例项目解析
- 某电商网站,研发升级和优化,一点建议
- 某电商网站,研发升级和优化,一点建议
- 某电商网站,研发升级和优化,一点建议
- NodeJS学习笔记(一)——搭建开发框架Express,实现Web网站登录验证
- 天猫浏览型应用的CDN静态化架构演变
- 推荐一篇讲arm架构gcc内联汇编的文章
- 【网虫】做网站必知的两个命令 活用ping与tracert
- 群晖服务器如何搭建网站
- IIS配置网站
- 关于大型网站技术演进的思考(九)--网站静态化处理--总述(1)
- iisexpress局域网内调试网站
- 一个练习打字的网站
- iOS应用架构谈(三):View层的组织和调用方案(下)
- iOS应用架构谈(二):View层的组织和调用方案(上)