SparkSQL读取数据源
2016-07-14 15:53
295 查看
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.Row import com.bonree.sdk.behavior.common.Constant /** * Created by Administrator on 2016/7/11. */ object SparkSQL{ def main(args: Array[String]) { val conf = new SparkConf().setAppName("Simple Application").setMaster("local") val sparkContext = new SparkContext(conf) val sqlContext = new SQLContext(sparkContext) // import sqlContext.createSchemaRDD /* sparkSQL读取json文件 */ val jsonPath = "e:/data/text.txt" val dataFrame = sparkSqlReadJson(sqlContext, jsonPath) val txtPath = "e:/data/tt.txt" sparkSQLReadTxt(sqlContext, sparkContext, txtPath) sparkSQLInferrSchemaReadTxt(sqlContext, sparkContext, txtPath) val parquetPath = "e:/data/user.parquet" sparkSqlWriteParquet(sqlContext, parquetPath, jsonPath) sparkSQLReadParquet(sqlContext, parquetPath) } /** * 声明一个schame模型类 * @param name * @param age */ case class Person(name:String,age:String) /** * sparkSQL读取json文件 * @param sqlContext * @param path */ def sparkSqlReadJson(sqlContext:SQLContext, path:String): DataFrame ={ import sqlContext.implicits._ val dataFrame = sqlContext.read.json(path) return dataFrame } /** * Programmatically Specifying the Schema(以编程方式指定架构) 它需要在程序中运用字符串转换成Schema模型 * 然后通过createDataFrame将RDD和模型关联,再通过registerTempTable注册表名 * @param sqlContext * @param sparkContext * @param path */ def sparkSQLReadTxt(sqlContext: SQLContext,sparkContext: SparkContext, path:String): Unit ={ val schemaString = "name age" val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val people = sparkContext.textFile(path).map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleDataFrame = sqlContext.createDataFrame(people, schema) peopleDataFrame.registerTempTable("Person") val results = sqlContext.sql("SELECT name, age FROM Person WHERE age >= 13") results.map(t => "Name: " + t(0)).collect().foreach(println) } /** * Inferring the Schema Using Reflection(使用反射推断模式) 使用这种方式需要先定义好schema的模式然后使用toDF函数转换成DataFrame * 然后运用 registerTempTable注册零时表名 * @param sqlContext * @param sparkContext * @param path */ def sparkSQLInferrSchemaReadTxt(sqlContext:SQLContext, sparkContext: SparkContext, path:String): Unit ={ import sqlContext.implicits._ val peoples = sparkContext.textFile(path).map(_.split(Constant.SPLIT_WORD)).map(people => Person(people(0), people(1))).toDF() peoples.registerTempTable("people") val result = sqlContext.sql("select * from people") result.collect().foreach(println) } /** * 读取json中数据 将数据写入parquet中 * @param sqlContext * @param parquetPath * @param jsonPath */ def sparkSqlWriteParquet(sqlContext:SQLContext, parquetPath:String, jsonPath:String): Unit ={ import sqlContext.implicits._ val dataFrame = sqlContext.read.json(jsonPath) dataFrame.select("name", "age").write.format("parquet").save(parquetPath) } /** * sparkSQL读取 Parquet数据 * @param sqlContext * @param path */ def sparkSQLReadParquet(sqlContext:SQLContext, path:String): Unit ={ val parquetData = sqlContext.read.parquet(path) parquetData.registerTempTable("parquetData") val result = sqlContext.sql("select * from parquetData") result.foreach(println) } }
相关文章推荐
- Oracle Essbase入门系列(一)
- SQL连接查询
- 配置 SQL Server 2005 远程连接(服务器端)
- oracle 用户权限解释
- python实现redis客户端单例+hbase客户端单例
- Codis作者黄东旭细说分布式Redis架构设计和踩过的那些坑们
- MongoDB 连接查询 $lookup
- 数据压缩:自动评估
- redis 写入的时候报错
- mysql explain profile用法
- windows 7下 oracle 11.2安装说明
- mysql+mybatis 分页
- logstash mysql slow
- PostgreSQL相关的一点小工作
- 【MySQL 00】MySQL数据表
- SQL Server游标的使用
- 【MySQL 01】查询--总结
- Mysql5.0存储过程
- redis 入mysql库
- Mysql中如何对按年月日查询时间字段