您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第70课:Spark SQL内置函数解密与实战

2017-03-09 23:23 381 查看

Spark SQL内置函数解密与实战

1、Spark Sql内置函数解析

2、Spark Sql内置函数实战

/**
* scala代码
*/
package com.tom.spark.sql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

/**
* 使用Spark SQL中的内置函数对数据进行分析,与普通的Spark SQL API不同的是DataFrame内置函数操作的结果是返回一个Column对象,
* 而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础
* 并提供了极大的方便性,例如在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建复杂的业务逻辑而言
* 非常可以极大地减少不必要的时间消耗(基本上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的。
* Spark 1.5.x开始提供了大量的内置函数,例如agg:
* def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
*   groupBy().agg(aggExpr, aggExprs : _*)
* }
* 还有max,mean,min,sum,avg,explode,size,sort_array,day,to_date,abs,acos,asin,atan
* 总体而言,内置函数包含五大基本类型:
* 1,聚合函数,例如countDistinct,sumDistinct等
* 2,集合函数,例如sort_array
* 3,日期时间函数,例如hour,quarter,next_day
* 4,数学函数,例如asin,atan,sqrt,tan,round等;
* 5,开窗函数,例如rowNumber等
* 6,字符串函数,concat,format_number,rexexp_extract
* 7,其它函数,isNaN,sha,randn,callUDF,callUDAF
*/
object SparkSQLAgg {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkSQLAgg")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换
import sqlContext.implicits._

/**
* 模拟电商访问的数据,实际情况会比模拟数据复杂很多,最后生成RDD
*/
val userData = Array(
"2016-3-27,001,http://spark.apache.org/,1000",
"2016-3-27,001,http://hadoop.apache.org/,1001",
"2016-3-27,002,http://flink.apache.org/,1002",
"2016-3-28,003,http://kafka.apache.org/,1020",
"2016-3-28,004,http://spark.apache.org/,1010",
"2016-3-28,002,http://hive.apache.org/,1200",
"2016-3-28,001,http://parquet.apache.org/,1500",
"2016-3-28,001,http://spark.apache.org/,1800"
)
val userDataRDD = sc.parallelize(userData)  //生成RDD分布式集合对象

/**
* 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型,
* 与此同时要提供DataFrame中的columns的元数据信息描述
*/
val userDataRDDRow = userDataRDD.map(row=>{
val splited = row.split(",")
Row(splited(0), splited(1).toInt, splited(2), splited(3).toInt)
})
val structType = StructType(Array(
StructField("time", StringType, true),
StructField("id", IntegerType, true),
StructField("url", StringType, true),
StructField("amount", IntegerType, true)

))

val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structType)

/**
* 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自动进行CG
*/
userDataDF.groupBy("time").agg('time, countDistinct('id))
.map(row=> Row(row(1),row(2))).collect.foreach(println)

userDataDF.groupBy("time").agg('time, sum('amount)).show

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  scala
相关文章推荐