spark-sql(不包含hive-sql)查询编程
2016-04-20 16:25
351 查看
spark-sql(不包含hive-sql)查询编程
//1:将json格式的数据转化为dataFrame
//2:RDD直接转化方法dataFrame
//当转化为表的数据个格式不能提前确定的情况,可以由下面三个步骤创建dataFrame
//通用的加载/保存功能,在最简单的形式中,默认的数据源(parquet文件非spark.sql.sources.default需要另外配置),将用于所有操作
//手动指定选项,
//可以手动指定你想使用的数据源,dataSources需要使用完全限定域名(即org.apache.spark.sql.parquet)规定,但是如果是内置的来源,你也可以使用短名称(eg:josn,parquet,jdbc)。使用此语法,任何类型的数据帧可被转换成其他类型
//装载数据
//模式合并
//将两个表合并,以及合并两表相应的模式,由于这种操作消耗较大,所以在spark1.5之后的版本默认的将这种功能进行关闭,不过你也可以向下面的例子一样手动设置为为true,或设置全局SQL选项**spark.sql.parquet.mergeSchema为true。
//1:将json格式的数据转化为dataFrame
val df =sqlContext.read.json("hdfs://localhost:9000/jacksoom/people.json")//读取json格式的数据,转化为dataFrame df.show()//显示dataFrame内容 df.printSchema()//显示表字段格式 df.select("name").show()//查找name的行,显示 df.select(df("name"), df("age") + 1).show()//查找name和age字段,并将age字段+1处理,然后显示 df.filter(df("age") > 21).show()//过滤age>21的字段,并显示 df.groupBy("age").count().show()//对age字段调用groupBy函数,返回年龄和该年龄用户个数
//2:RDD直接转化方法dataFrame
case class Person(name: String, age: Int)//创建Person类 val people = sc.textFile("hdfs://localhost:9000/jacksoom/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()//先读取文件为RDD,然后制定分隔符,转化为RDD[Person]格式,然后使用toDF()转化为dataFrame people.registerTempTable("people")//将该dataFrame注册表名为“people”的表 val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")//使用sql语言查询 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)//输出13=<age<=19的name teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)//简单的sql查询输出 teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)//sql查询
//当转化为表的数据个格式不能提前确定的情况,可以由下面三个步骤创建dataFrame
import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType};//依赖jar val people = sc.textFile("hdfs://localhost:9000/jacksoom/people.txt")//step1:创建RDD val schemaString = "name age"//定义字段字符串 val schema =StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))//step2:定义表格式 val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)//step3:根据表格式和行RDD,创建dataFrame peopleDataFrame.registerTempTable("people")//注册表 val results = sqlContext.sql("SELECT name FROM people")//简单使用spl查询 results.map(t => "Name: " + t(0)).collect().foreach(println)//对查询结果进行输出
//通用的加载/保存功能,在最简单的形式中,默认的数据源(parquet文件非spark.sql.sources.default需要另外配置),将用于所有操作
val df = sqlContext.read.load("hdfs://localhost:9000/jacksoom/users.parquet")//载入parquet文件,返回为dataFrame格式 df.select("name", "favorite_color").write.save("hdfs://localhost:9000/jacksoom/namesAndFavColors.parquet")//然后使用查询,并将查询结果保存为相应的parquet文件
//手动指定选项,
//可以手动指定你想使用的数据源,dataSources需要使用完全限定域名(即org.apache.spark.sql.parquet)规定,但是如果是内置的来源,你也可以使用短名称(eg:josn,parquet,jdbc)。使用此语法,任何类型的数据帧可被转换成其他类型
val df = sqlContext.read.format("json").load("hdfs://localhost:9000/jacksoom/people.json")//指定读入的为json df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")//sql查找,然后指定保存格式为parquet
//装载数据
case class Person(name: String, age: Int)//创建Person类 val people = sc.textFile("hdfs://localhost:9000/jacksoom/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.toDF().write.parquet("hdfs://localhost:9000/jacksoom/test2people.parquet")//将数据写入test2people.parquet val df = sqlContext.sql("SELECT name FROM parquet.`hdfs://localhost:9000/jacksoom/users.parquet`")//直接通过路径查询(spark1.6才可以)
//模式合并
//将两个表合并,以及合并两表相应的模式,由于这种操作消耗较大,所以在spark1.5之后的版本默认的将这种功能进行关闭,不过你也可以向下面的例子一样手动设置为为true,或设置全局SQL选项**spark.sql.parquet.mergeSchema为true。
import sqlContext.implicits._ val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") //合并这两个表,首先需要将默认关闭的‘mergeSchema’设置为true val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
相关文章推荐
- MongoDB基本命令用【转】
- mysqli 批量执行多条语句
- Oracle ALL, ANY and SOME比较
- windows下设置redis密码
- redis操作命令
- redis操作命令
- MYSQL中取当前年份的第一天和当前周,月,季度的第一天/最后一天
- MongoDB入门整理
- C#中的SQL数据库操作
- Oracle闪回设置
- MYSQL 分组合并函数
- windows下启动redis
- mysqli 获取查询结果集信息属性、方法
- RMAN异机恢复
- java中redis实现篇
- jmeter测试mysql数据库
- MySQL InnoDB/MYISAM/MERGE/BDB/HEAP的区别
- MySQL主从复制原理 ---- mysqldump
- Mysql中的 IFNULL NULLIF ISNULL 的用法
- MySQL中的UPDATE语句和DELETE语句