您的位置:首页 > 数据库

SparkSQL读取数据源

2016-07-14 15:53 295 查看
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
import com.bonree.sdk.behavior.common.Constant

/**
* Created by Administrator on 2016/7/11.
*/
object SparkSQL{
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
//    import sqlContext.createSchemaRDD
/*
sparkSQL读取json文件
*/
val jsonPath = "e:/data/text.txt"
val dataFrame = sparkSqlReadJson(sqlContext, jsonPath)

val txtPath = "e:/data/tt.txt"
sparkSQLReadTxt(sqlContext, sparkContext, txtPath)
sparkSQLInferrSchemaReadTxt(sqlContext, sparkContext, txtPath)

val parquetPath = "e:/data/user.parquet"
sparkSqlWriteParquet(sqlContext, parquetPath, jsonPath)
sparkSQLReadParquet(sqlContext, parquetPath)

}

/**
* 声明一个schame模型类
* @param name
* @param age
*/
case class Person(name:String,age:String)
/**
* sparkSQL读取json文件
* @param sqlContext
* @param path
*/
def sparkSqlReadJson(sqlContext:SQLContext, path:String): DataFrame ={
import sqlContext.implicits._
val dataFrame = sqlContext.read.json(path)
return dataFrame
}

/**
* Programmatically Specifying the Schema(以编程方式指定架构) 它需要在程序中运用字符串转换成Schema模型
* 然后通过createDataFrame将RDD和模型关联,再通过registerTempTable注册表名
* @param sqlContext
* @param sparkContext
* @param path
*/
def sparkSQLReadTxt(sqlContext: SQLContext,sparkContext: SparkContext, path:String): Unit ={
val schemaString = "name age"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val people = sparkContext.textFile(path).map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(people, schema)
peopleDataFrame.registerTempTable("Person")
val results = sqlContext.sql("SELECT name, age FROM Person WHERE age >= 13")
results.map(t => "Name: " + t(0)).collect().foreach(println)
}

/**
* Inferring the Schema Using Reflection(使用反射推断模式) 使用这种方式需要先定义好schema的模式然后使用toDF函数转换成DataFrame
* 然后运用 registerTempTable注册零时表名
* @param sqlContext
* @param sparkContext
* @param path
*/
def sparkSQLInferrSchemaReadTxt(sqlContext:SQLContext, sparkContext: SparkContext, path:String): Unit ={
import sqlContext.implicits._
val peoples = sparkContext.textFile(path).map(_.split(Constant.SPLIT_WORD)).map(people => Person(people(0), people(1))).toDF()
peoples.registerTempTable("people")
val result = sqlContext.sql("select * from people")
result.collect().foreach(println)
}

/**
* 读取json中数据 将数据写入parquet中
* @param sqlContext
* @param parquetPath
* @param jsonPath
*/
def sparkSqlWriteParquet(sqlContext:SQLContext, parquetPath:String, jsonPath:String): Unit ={
import sqlContext.implicits._
val dataFrame = sqlContext.read.json(jsonPath)
dataFrame.select("name", "age").write.format("parquet").save(parquetPath)
}

/**
* sparkSQL读取 Parquet数据
* @param sqlContext
* @param path
*/
def sparkSQLReadParquet(sqlContext:SQLContext, path:String): Unit ={
val parquetData = sqlContext.read.parquet(path)
parquetData.registerTempTable("parquetData")
val result = sqlContext.sql("select * from parquetData")
result.foreach(println)
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: