您的位置:首页 > 其它

Spark DataFrame相关操作

2017-03-16 16:44 260 查看
转自spark dataframe操作集锦(提取前几行, 合并, 入库等)

DataFrame 的函数

Action 操作

collect()
返回值是一个数组, 返回dataframe集合所有的行

collectAsList()
返回值是一个Java类型的数组, 返回dataframe集合所有的行

count()
返回一个number类型的, 返回dataframe集合的行数

describe(cols: String*)
返回一个通过数学计算的类表值(count, mean, stddev, min, and max), 这个可以传多个参数, 中间用逗号分隔, 如果有字段为空, 那么不参与运算, 只这对数值类型的字段. 例如
df.describe("age", "height").show()


first()
返回第一行 , 类型是row类型

head()
返回第一行 , 类型是row类型

head(n:Int)
返回n行 , 类型是row 类型

show()
返回dataframe集合的值 默认是20行, 返回类型是unit

show(n:Int)
返回n行, , 返回值类型是unit

10.
table(n:Int)
返回n行 , 类型是row 类型

基本操作

cache()
同步数据的内存

columns
返回一个string类型的数组, 返回值是所有列的名字

dtypes
返回一个string类型的二维数组, 返回值是所有列的名字以及类型

explan()
打印执行计划 物理的

explain(n:Boolean)
输入值为 false 或者true , 返回值是unit 默认是false , 如果输入true 将会打印 逻辑的和物理的

isLocal
返回值是Boolean类型, 如果允许模式是local返回true 否则返回false

persist(newlevel:StorageLevel)
返回一个
dataframe.this.type
输入存储模型类型

printSchema()
打印出字段名称和类型 按照树状结构来打印

registerTempTable(tablename:String)
返回Unit , 将
df
的对象存放在一张表里面, 这个表随着对象的删除而删除了

schema
返回structType 类型, 将字段名称和类型按照结构体类型返回

toDF()
返回一个新的dataframe类型的

toDF(colnames:String*)
将参数中的几个字段返回一个新的
dataframe
类型的

unpersist()
返回
dataframe.this.type
类型, 去除模式中的数据

unpersist(blocking:Boolean)
返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

集成查询:

agg(expers:column*)
返回
dataframe
类型 , 同数学计算求值

df.agg(max("age"), avg("salary"))


df.groupBy().agg(max("age"), avg("salary"))


agg(exprs: Map[String, String])
返回dataframe类型 , 同数学计算求值 map类型的

df.agg(Map("age" -> "max", "salary" -> "avg"))


df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))


agg(aggExpr: (String, String), aggExprs: (String, String)*)
返回dataframe类型 , 同数学计算求值

df.agg(Map("age" -> "max", "salary" -> "avg"))


df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))


apply(colName: String)
返回column类型, 捕获输入进去列的对象

as(alias: String)
返回一个新的dataframe类型, 就是原来的一个别名

col(colName: String)
返回column类型, 捕获输入进去列的对象

cube(col1: String, cols: String*)
返回一个GroupedData类型, 根据某些字段来汇总

distinct
去重 返回一个
dataframe
类型

drop(col: Column)
删除某列 返回
dataframe
类型

dropDuplicates(colNames: Array[String])
删除相同的列 返回一个dataframe

except(other: DataFrame)
返回一个dataframe, 返回在当前集合存在的在其他集合不存在的

explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B])


返回值是
dataframe
类型, 目的将一个字段进行更多行的拆分

e.g.
df.explode("name","names") {name :String=> name.split(" ")}.show();


name
字段根据空格来拆分, 拆分的字段放在
names
里面

filter(conditionExpr: String)
刷选部分数据, 返回
dataframe
类型

df.filter("age>10").show();  df.filter(df("age")>10).show();


df.where(df("age")>10).show();
都可以

groupBy(col1: String, cols: String*)
根据某写字段来汇总返回groupedate类型

df.groupBy("age").agg(Map("age" ->"count")).show();


df.groupBy("age").avg().show();
都可以

intersect(other: DataFrame)
返回一个dataframe, 在2个dataframe都存在的元素

join(right: DataFrame, joinExprs: Column, joinType: String)


第一个是关联的
dataframe
, 第二个是关联的条件, 第三个是关联的类型:
inner, outer, left_outer, right_outer, leftsemi


df.join(ds,df("name")===ds("name") and df("age")===ds("age"),"outer").show();


limit(n: Int)
返回
dataframe
类型 取n条数据出来

na: DataFrameNaFunctions
可以调用
dataframe.na.functions
的功能区做过滤
df.na.drop().show();
删除为空的行

orderBy(sortExprs: Column*)
做alise排序

select(cols:string*)
dataframe
做字段的筛选

df.select($"colA", $"colB" + 1)


selectExpr(exprs: String*)
做字段的筛选
df.selectExpr("name","name as names","upper(name)","age+1").show();


sort(sortExprs: Column*)
排序

df.sort(df("age").desc).show();
默认是asc

unionAll(other:Dataframe)
合并
df.unionAll(ds).show();


withColumnRenamed(existingName: String, newName: String)
修改列表

df.withColumnRenamed("name","names").show();


withColumn(colName: String, col: Column)
增加一列
df.withColumn("aa",df("name")).show();


将DataFrame保存成文件

  下面我来介绍如何将DataFrame保存到一个文件里面。前面我们加载csv文件用到了load函数,与之对于的用于保存文件可以使用save函数。具体操作包括以下两步:

  1、首先创建一个map对象,用于存储一些save函数需要用到的一些属性。这里我将制定保存文件的存放路径和csv的头信息。

[python] view plain copy 在CODE上查看代码片派生到我的代码片

val saveOptions = Map(“header” -> “true”, “path” -> “iteblog.csv”)

为了基于学习的态度,我们从DataFrame里面选择出studentName和email两列,并且将studentName的列名重定义为name。

[python] view plain copy 在CODE上查看代码片派生到我的代码片

val copyOfStudents = students.select(students(“studentName”).as(“name”), students(“email”))

2、下面我们调用save函数保存上面的DataFrame数据到iteblog.csv文件夹中

[python] view plain copy 在CODE上查看代码片派生到我的代码片

copyOfStudents.write.format(“com.databricks.spark.csv”).mode(SaveMode.Overwrite).options(saveOptions).save()

mode函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。从名字就可以很好的理解,Overwrite代表覆盖目录下之前存在的数据;Append代表给指定目录下追加数据;Ignore代表如果目录下已经有文件,那就什么都不执行;ErrorIfExists代表如果保存目录下存在文件,那么抛出相应的异常。

<
4000
link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/production/markdown_views-ea0013b516.css" />
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: