大数据IMF传奇行动绝密课程第70课:Spark SQL内置函数解密与实战
2017-03-09 23:23
381 查看
Spark SQL内置函数解密与实战
1、Spark Sql内置函数解析2、Spark Sql内置函数实战
/** * scala代码 */ package com.tom.spark.sql import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.functions._ /** * 使用Spark SQL中的内置函数对数据进行分析,与普通的Spark SQL API不同的是DataFrame内置函数操作的结果是返回一个Column对象, * 而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础 * 并提供了极大的方便性,例如在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建复杂的业务逻辑而言 * 非常可以极大地减少不必要的时间消耗(基本上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的。 * Spark 1.5.x开始提供了大量的内置函数,例如agg: * def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { * groupBy().agg(aggExpr, aggExprs : _*) * } * 还有max,mean,min,sum,avg,explode,size,sort_array,day,to_date,abs,acos,asin,atan * 总体而言,内置函数包含五大基本类型: * 1,聚合函数,例如countDistinct,sumDistinct等 * 2,集合函数,例如sort_array * 3,日期时间函数,例如hour,quarter,next_day * 4,数学函数,例如asin,atan,sqrt,tan,round等; * 5,开窗函数,例如rowNumber等 * 6,字符串函数,concat,format_number,rexexp_extract * 7,其它函数,isNaN,sha,randn,callUDF,callUDAF */ object SparkSQLAgg { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkSQLAgg") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换 import sqlContext.implicits._ /** * 模拟电商访问的数据,实际情况会比模拟数据复杂很多,最后生成RDD */ val userData = Array( "2016-3-27,001,http://spark.apache.org/,1000", "2016-3-27,001,http://hadoop.apache.org/,1001", "2016-3-27,002,http://flink.apache.org/,1002", "2016-3-28,003,http://kafka.apache.org/,1020", "2016-3-28,004,http://spark.apache.org/,1010", "2016-3-28,002,http://hive.apache.org/,1200", "2016-3-28,001,http://parquet.apache.org/,1500", "2016-3-28,001,http://spark.apache.org/,1800" ) val userDataRDD = sc.parallelize(userData) //生成RDD分布式集合对象 /** * 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型, * 与此同时要提供DataFrame中的columns的元数据信息描述 */ val userDataRDDRow = userDataRDD.map(row=>{ val splited = row.split(",") Row(splited(0), splited(1).toInt, splited(2), splited(3).toInt) }) val structType = StructType(Array( StructField("time", StringType, true), StructField("id", IntegerType, true), StructField("url", StringType, true), StructField("amount", IntegerType, true) )) val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structType) /** * 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自动进行CG */ userDataDF.groupBy("time").agg('time, countDistinct('id)) .map(row=> Row(row(1),row(2))).collect.foreach(println) userDataDF.groupBy("time").agg('time, sum('amount)).show } }
相关文章推荐
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第71课:Spark SQL窗口函数解密与实战
- 大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第93课:SparkStreaming updateStateByKey案例实战和内置源码解密
- 大数据IMF传奇行动绝密课程第72课:Spark SQL UDF和UDAF解密与实战
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第61课:Spark SQL数据加载和保存内幕深度解密实战
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第12课:HA下的Spark集群工作原理解密
- 大数据IMF传奇行动绝密课程第29课:Master HA彻底解密
- 大数据IMF传奇行动绝密课程第14课:Spark RDD解密
- 大数据IMF传奇行动绝密课程第11课:彻底解密WordCount运行原理
- 大数据IMF传奇行动绝密课程第33课:Spark Executor内幕彻底解密
- 大数据IMF传奇行动绝密课程第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作
- 大数据IMF传奇行动绝密课程第30课:Master的注册机制和状态管理解密
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- 大数据IMF传奇行动绝密课程第13课:Spark内核架构解密
- 大数据IMF传奇行动绝密课程第27课:Spark on Yarn彻底解密