您的位置:首页 > 数据库

Spark-SQL之DataFrame操作大全

2016-10-12 23:31 716 查看
  Spark SQL中的DataFrame类似于一张关系型数据表。在关系型数据库中对单表或进行的查询操作,在DataFrame中都可以通过调用其API接口来实现。可以参考,Scala提供的DataFrame API

  本文中的代码基于Spark-1.6.2的文档实现。

一、DataFrame对象的生成

  Spark-SQL可以以其他RDD对象、parquet文件、json文件、hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。本文将以MySQL数据库为数据源,生成DataFrame对象后进行相关的DataFame之上的操作。

  文中生成DataFrame的代码如下:

object DataFrameOperations {
def main (args: Array[String ]) {
val sparkConf = new SparkConf().setAppName( "Spark SQL DataFrame Operations").setMaster( "local[2]" )
val sparkContext = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sparkContext)
val url = "jdbc:mysql://m000:3306/test"

val jdbcDF = sqlContext.read.format( "jdbc" ).options(
Map( "url" -> url,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_test" )).load()

val joinDF1 = sqlContext.read.format( "jdbc" ).options(
Map("url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join1" )).load()

val joinDF2 = sqlContext.read.format( "jdbc" ).options(
Map ( "url" -> url ,
"user" -> "root",
"password" -> "root",
"dbtable" -> "spark_sql_join2" )).load()

... ...
}
}


  后续代码都在上面
... ...
处。

二、DataFrame对象上Action操作

1、
show
:展示数据

  以表格的形式在输出中展示
jdbcDF
中的数据,类似于
select * from spark_sql_test
的功能。

  
show
方法有四种调用方式,分别为,

(1)
show


  只显示前20条记录。

  示例:

jdbcDF.show


  结果:

  


(2)
show(numRows: Int)


  显示
numRows


  示例:

jdbcDF.show(3)


  结果:

  


(3)
show(truncate: Boolean)


  是否最多只显示20个字符,默认为
true


  示例:

jdbcDF.show(true)
jdbcDF.show(false)


  结果:

  


(4)
show(numRows: Int, truncate: Boolean)


  综合前面的显示记录条数,以及对过长字符串的显示格式。

  示例:

jdbcDF.show(3, false)


  结果:

  


2、
collect
:获取所有数据到数组

  不同于前面的
show
方法,这里的
collect
方法会将
jdbcDF
中的所有数据都获取到,并返回一个
Array
对象。

jdbcDF.collect()


  结果如下,结果数组包含了
jdbcDF
的每一条记录,每一条记录由一个
GenericRowWithSchema
对象来表示,可以存储字段名及字段值。

  


3、
collectAsList
:获取所有数据到List

  功能和
collect
类似,只不过将返回结构变成了
List
对象,使用方法如下

jdbcDF.collectAsList()


  结果如下,

  


4、
describe(cols: String*)
:获取指定字段的统计信息

  这个方法可以动态的传入一个或多个
String
类型的字段名,结果仍然为
DataFrame
对象,用于统计数值类型字段的统计值,比如
count, mean, stddev, min, max
等。

  使用方法如下,其中
c1
字段为字符类型,
c2
字段为整型,
c4
字段为浮点型

jdbcDF .describe("c1" , "c2", "c4" ).show()


  结果如下,

  


5、
first, head, take, takeAsList
:获取若干行记录

  这里列出的四个方法比较类似,其中

  (1)
first
获取第一行记录

  (2)
head
获取第一行记录,
head(n: Int)
获取前n行记录

  (3)
take(n: Int)
获取前n行数据

  (4)
takeAsList(n: Int)
获取前n行数据,并以
List
的形式展现

  以
Row
或者
Array[Row]
的形式返回一行或多行数据。
first
head
功能相同。

  
take
takeAsList
方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生
OutOfMemoryError


  使用和结果略。

二、DataFrame对象上的条件查询和join等操作

  以下返回为DataFrame类型的方法,可以连续调用。

1、where条件相关

(1)
where(conditionExpr: String)
:SQL语言中where关键字后的条件


  传入筛选条件表达式,可以用
and
or
。得到DataFrame类型的返回结果,

  示例:

jdbcDF .where("id = 1 or c1 = 'b'" ).show()


  结果,

  


(2)
filter
:根据字段进行筛选


  传入筛选条件表达式,得到DataFrame类型的返回结果。和
where
使用条件相同

  示例:

jdbcDF .filter("id = 1 or c1 = 'b'" ).show()


  结果,

  


2、查询指定字段

(1)
select
:获取指定字段值


  根据传入的
String
类型字段名,获取指定字段的值,以DataFrame类型返回

  示例:

jdbcDF.select( "id" , "c3" ).show( false)


  结果:

  


  还有一个重载的
select
方法,不是传入
String
类型参数,而是传入
Column
类型参数。可以实现
select id, id+1 from test
这种逻辑。

jdbcDF.select(jdbcDF( "id" ), jdbcDF( "id") + 1 ).show( false)


  结果:

  


  能得到
Column
类型的方法是
apply
以及
col
方法,一般用
apply
方法更简便。

(2)
selectExpr
:可以对指定字段进行特殊处理


  可以直接对指定字段调用UDF函数,或者指定别名等。传入
String
类型参数,得到DataFrame对象。

  示例,查询
id
字段,
c3
字段取别名
time
c4
字段四舍五入:

jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show(false)


  结果,

  


(3)
col
:获取指定字段


  只能获取一个字段,返回对象为Column类型。

  val idCol = jdbcDF.col(“id”)果略。

(4)
apply
:获取指定字段


  只能获取一个字段,返回对象为Column类型

  示例:

val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")


  结果略。

(5)
drop
:去除指定字段,保留其他字段


  返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

  示例:

jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))


  结果:

  


3、limit

  
limit
方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和
take
head
不同的是,
limit
方法不是Action操作。

jdbcDF.limit(3).show( false)


  结果,

  


4、order by

(1)
orderBy
sort
:按指定字段排序,默认为升序

  示例1,按指定字段排序。加个
-
表示降序排序。
sort
orderBy
使用方法相同

jdbcDF.orderBy(- jdbcDF("c4")).show(false)
// 或者
jdbcDF.orderBy(jdbcDF("c4").desc).show(false)


  结果,

  


  示例2,按字段字符串升序排序

jdbcDF.orderBy("c4").show(false)


  结果,

  


(2)
sortWithinPartitions


  和上面的
sort
方法功能类似,区别在于
sortWithinPartitions
方法返回的是按Partition排好序的DataFrame对象。

5、group by

(1)
groupBy
:根据字段进行
group by
操作

  
groupBy
方法有两种调用方式,可以传入
String
类型的字段名,也可传入
Column
类型的对象。

  使用方法如下,

jdbcDF .groupBy("c1" )
jdbcDF.groupBy( jdbcDF( "c1"))


(2)
cube
rollup
:group by的扩展


  功能类似于
SQL
中的
group by cube/rollup
,略。

(3)GroupedData对象

  该方法得到的是
GroupedData
类型对象,在
GroupedData
的API中提供了
group by
之后的操作,比如,

max(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段

min(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段

mean(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段

sum(colNames: String*)
方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段

count()
方法,获取分组中的元素个数

  运行结果示例:

  
count


  


  
max


  


  这里面比较复杂的是以下两个方法,

agg
,该方法和下面介绍的类似,可以用于对指定字段进行聚合操作。

pivot


6、distinct

(1)
distinct
:返回一个不包含重复记录的DataFrame


  返回当前DataFrame中不重复的Row记录。该方法和接下来的
dropDuplicates()
方法不传入指定字段时的结果相同。

  示例:

jdbcDF.distinct()


  结果,

  


(2)
dropDuplicates
:根据指定字段去重


  根据指定字段去重。类似于
select distinct a, b
操作

  示例:

jdbcDF.dropDuplicates(Seq("c1"))


  结果:

  


7、聚合

  聚合操作调用的是
agg
方法,该方法有多种调用方式。一般与
groupBy
方法配合使用。

  以下示例其中最简单直观的一种用法,对
id
字段求最大值,对
c4
字段求和。

jdbcDF.agg("id" -> "max", "c4" -> "sum")


  结果:

  


8、union

  
unionAll
方法:对两个DataFrame进行组合

  类似于
SQL
中的
UNION ALL
操作。

  示例:

jdbcDF.unionALL(jdbcDF.limit(1))


  结果:

  


9、join

  重点来了。在
SQL
语言中用得很多的就是
join
操作,DataFrame中同样也提供了
join
的功能。

  接下来隆重介绍
join
方法。在DataFrame中提供了六个重载的
join
方法。

(1)、笛卡尔积

joinDF1.join(joinDF2)


(2)、
using
一个字段形式


  下面这种join类似于
a join b using column1
的形式,需要两个DataFrame中有相同的一个列名,

joinDF1.join(joinDF2, "id")


  
joinDF1
joinDF2
根据字段
id
进行
join
操作,结果如下,
using
字段只显示一次。

  


(3)、
using
多个字段形式


  除了上面这种
using
一个字段的情况外,还可以
using
多个字段,如下

joinDF1.join(joinDF2, Seq("id", "name"))


(4)、指定
join
类型


  两个DataFrame的
join
操作有
inner, outer, left_outer, right_outer, leftsemi
类型。在上面的
using
多个字段的join情况下,可以写第三个
String
类型参数,指定
join
的类型,如下所示

joinDF1.join(joinDF2, Seq("id", "name"), "inner")


(5)、使用
Column
类型来
join


  如果不用
using
模式,灵活指定
join
字段的话,可以使用如下形式

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))


  结果如下,

  


(6)、在指定
join
字段同时指定
join
类型


  如下所示

joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")


10、获取指定字段统计信息

  
stat
方法可以用于计算指定字段或指定字段之间的统计信息,比如方差,协方差等。这个方法返回一个
DataFramesStatFunctions
类型对象。

  下面代码演示根据
c4
字段,统计该字段值出现频率在
30%
以上的内容。在
jdbcDF
中字段
c1
的内容为
"a, b, a, c, d, b"
。其中
a
b
出现的频率为
2 / 6
,大于
0.3


jdbcDF.stat.freqItems(Seq ("c1") , 0.3).show()


  结果如下:

  


11、获取两个DataFrame中共有的记录

  
intersect
方法可以计算出两个DataFrame中相同的记录,

jdbcDF.intersect(jdbcDF.limit(1)).show(false)


  结果如下:

  


12、获取一个DataFrame中有另一个DataFrame中没有的记录

  示例:

jdbcDF.except(jdbcDF.limit(1)).show(false)


  结果如下,

  


13、操作字段名

(1)
withColumnRenamed
:重命名DataFrame中的指定字段名


  如果指定的字段名不存在,不进行任何操作。下面示例中将
jdbcDF
中的
id
字段重命名为
idx


jdbcDF.withColumnRenamed( "id" , "idx" )


  结果如下:

  


(2)
withColumn
:往当前DataFrame中新增一列


  
whtiColumn(colName: String , col: Column)
方法根据指定
colName
往DataFrame中新增一列,如果
colName
已存在,则会覆盖当前列。

  以下代码往
jdbcDF
中新增一个名为
id2
的列,

jdbcDF.withColumn("id2", jdbcDF("id")).show( false)


  结果如下,

  


14、行转列

  有时候需要根据某个字段内容进行分割,然后生成多行,这时可以使用
explode
方法

  下面代码中,根据
c3
字段中的空格将字段内容进行分割,分割的内容存储在新的字段
c3_
中,如下所示

jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}


  结果如下,

  


15、其他操作

  API中还有
na, randomSplit, repartition, alias, as
方法,待后续补充。

三、DataFrame对象上的结构类操作

四、DataFrame对象上的输出操作

五、DataFrame对象上的RDD操作

六、DataFrame对象上的未归类操作

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: