您的位置:首页 > 数据库 > SQL

12.Spark SQL:开窗函数以及top3销售额统计案例实战

2017-11-02 17:23 471 查看
Spark 1.4.x版本以后,为Spark SQL和DataFrame引入了开窗函数,比如最经典,最常用的,row_number(),可以让我们实现分组取topn的逻辑。

案例:统计每个种类的销售额排名前3的产品

java版本
package cn.spark.study.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
* 84讲,row_number()开窗函数实战
* @author leizq120310
*
*/

public class RowNumberWindowFunction {

public static void main(String[] args) {
// 创建SparkConf,集群运行
SparkConf conf = new SparkConf()
.setAppName("RowNumberWindowFunction");

// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());

// 创建销售额表,sales表
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING, "
+ "revenue BIGINT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
+ "INTO TABLE sales");

// 开始编写我们的统计逻辑,使用row_number()开窗函数
// 先说明一下,row_number()开窗函数的作用
// 其实,就是给每个分组的数所在,按照其排序顺序,打上一个分组内的行号
// 比如说,有一个分组date=20151001, 里面有3条数据,1122,1121,1124,
// 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号
// 行号从1开始递增,比如1122 1, 1121 2, 1124, 3
DataFrame top3SaleDF = hiveContext.sql(""
+ "SELECT product, category,revenue "
+ "FROM ("
+ "SELECT "
+ "product, "
+ "category, "
+ "revenue, "
// row_number()开窗函数的语法说明
// 首先可以,在SELECT查询时,使用row_number()函数
// 其次,row_number()函数后面先跟上OVER关键字
// 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组
// 其次是可以用ORDER BY 进行组内排序
// 然后row_number()就可以给每个组内的行,一个组内行号
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank<=3");
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
top3SaleDF.saveAsTable("top3_sales");

// 关闭JavaSparkContext
sc.close();
}
}
scala版本:
package cn.spark.study.sql

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

/**
* 84讲,row_number()开窗函数实战
* @author leizq120310
*
*/

object RowNumberWindowFunction {
def main(args:Array[String])
{
// 创建SparkConf,集群运行
val conf = new SparkConf()
.setAppName("RowNumberWindowFunction");

// 创建JavaSparkContext
val sc = new JavaSparkContext(conf);
val hiveContext = new HiveContext(sc);

// 创建销售额表,sales表
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING, "
+ "revenue BIGINT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
+ "INTO TABLE sales");

// 开始编写我们的统计逻辑,使用row_number()开窗函数
// 先说明一下,row_number()开窗函数的作用
// 其实,就是给每个分组的数所在,按照其排序顺序,打上一个分组内的行号
// 比如说,有一个分组date=20151001, 里面有3条数据,1122,1121,1124,
// 那么对这个分组的每一行使用row_number()开窗函数以后,三行,依次会获得一个组内的行号
// 行号从1开始递增,比如1122 1, 1121 2, 1124, 3
val top3SaleDF = hiveContext.sql(""
+ "SELECT product, category,revenue "
+ "FROM ("
+ "SELECT "
+ "product, "
+ "category, "
+ "revenue, "
// row_number()开窗函数的语法说明
// 首先可以,在SELECT查询时,使用row_number()函数
// 其次,row_number()函数后面先跟上OVER关键字
// 然后括号中,是PARTITION BY,也就是说根据哪个字段进行分组
// 其次是可以用ORDER BY 进行组内排序
// 然后row_number()就可以给每个组内的行,一个组内行号
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank<=3");
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
top3SaleDF.saveAsTable("top3_sales");

// 关闭JavaSparkContext
sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark_sql