您的位置:首页 > 数据库

Spark-SQL 之DataFrame操作大全

2017-02-18 19:56 627 查看
package com.sdcet

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2017/2/18.
*/
object JdbcText {
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4")
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(conf)
val sql = new SQLContext(sc)
// 设置mysql的连接串
val url = "jdbc:mysql://localhost:3306/bdde"
// 读取mysql的数据
val jdbcDF = sql.read.format("jdbc").options(
Map("url" -> url,
"user" -> "root",
"password" -> "mysql",
"dbtable" -> "mbgl_rpt_time")).load()

// 显示数据(默认的显示前20条)
val showDate: Unit = jdbcDF.show()
println(showDate)
// 指定显示的行数
val show3: Unit = jdbcDF.show(3)
println(show3)
// 实收显示前20个
jdbcDF.show(true)
jdbcDF.show(false)
//
jdbcDF.show(3, false)
// 把数据转化为Array形式
jdbcDF.collect()
// describe指定字段的信息
jdbcDF.describe("mbmc_name", "stat_date", "the_year").show()
// the_month 大于10月的
jdbcDF.where("the_month > 10").show()
// 过滤字段的操作
jdbcDF.filter("the_month > 10 or the_month < 12").show()
// describe指定字段的信息
jdbcDF.select("mbmc_name", "stat_date").show()
// 对数据进行累加
jdbcDF.select(jdbcDF("the_month"), jdbcDF("the_month") + 1).show(false)
//selectExpr 对字段进行特殊处理
jdbcDF.selectExpr("the_month", "the_month as time", "round(c4)").show(false)
// 获取指定字段
val idCol1 = jdbcDF.apply("id")
val idCol2 = jdbcDF("id")
println("idCol1:" + idCol1 + "idCol2:" + idCol2)
// 去除一些字段
jdbcDF.drop("id")
jdbcDF.drop(jdbcDF("id"))
// 获取前n行的数据
jdbcDF.limit(3).show(false)
// orderBy和sort:按指定字段排序,默认为升序,
// 示例1,按指定字段排序。加个-表示降序排序。sort和orderBy使用方法相同
jdbcDF.orderBy(-jdbcDF("c4")).show(false)
//    jdbcDF.orderBy(jdbcDF("c4").desc).show(false)
// 按照字段升续
jdbcDF.orderBy("c4").show(false)
// 根据字段进行group by操作
jdbcDF.groupBy("c1")
jdbcDF.groupBy(jdbcDF("c1"))
// 返回去重的字段
jdbcDF.distinct()
//dropDuplicates:根据指定字段去重
jdbcDF.dropDuplicates(Seq("c1"))
// 对数据聚合
jdbcDF.agg("id" -> "max", "c4" -> "sum")
//unionAll方法:对两个DataFrame进行组合
jdbcDF.unionAll(jdbcDF.limit(1))
//join 操作
//    joinDF1.join(joinDF2)
// using一个字段形式
//    joinDF1.join(joinDF2, "id")
//using多个字段形式
//    joinDF1.join(joinDF2, Seq("id", "name"))
// 指定join类型
//    joinDF1.join(joinDF2, Seq("id", "name"), "inner"))
// 使用Column类型来join
// joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"))
//在指定join字段同时指定join类型
//    joinDF1.join(joinDF2 , joinDF1("id" ) === joinDF2( "t1_id"), "inner")
// 获取指定字段统计信息
jdbcDF.stat.freqItems(Seq("c1"), 0.3).show()
//获取两个DataFrame中共有的记录
jdbcDF.intersect(jdbcDF.limit(1)).show(false)
//获取一个DataFrame中有另一个DataFrame中没有的记录
jdbcDF.except(jdbcDF.limit(1)).show(false)
// 操作字段名
jdbcDF.withColumnRenamed("id", "idx")
// withColumn:往当前DataFrame中新增一列
jdbcDF.withColumn("id2", jdbcDF("id")).show(false)
//行转列
jdbcDF.explode("c3", "c3_") { time: String => time.split(" ") }
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: