您的位置:首页 > 大数据

07大数据内存计算spark系列贴-spark SQL

2014-12-15 10:10 549 查看
(原文地址:http://blog.csdn.net/codemosi/article/category/2777045,转载麻烦带上原文地址。hadoop hive hbase mahout storm spark kafka flume,等连载中,做个爱分享的人

)
[align=left]版本 spark 1.1.0[/align]

[align=left]spark SQL 简介[/align]
[align=left] spark SQL 用来替代shark来,做基于spark内核的,面向结构化数据的的数据仓库。1.10支持 jdbc[/align]

spark SQL查询的结构化数据(RDD,parquet,json),可以作为spark 的分布式的数据集(RDD),集成API在Python中,Scala和Java程序中,这意味着spark SQL可以集成其他spark的组件,如mllib+spark SQL提供SQL+机器学习的超复杂程序。SchemaRDD 对象对各种结构化数据源提供了一致的编程API。对程序员来说。spark SQL ==SchemaRDD
的API。

[align=left] spark SQL 可以直接复用 hive 的数据仓库,和任务脚本。[/align]

[align=left]spark SQL 编程API介绍[/align]
[align=left]//1 数据源转成SchemaRDD[/align]
/**

* Registers this RDD as a temporary table using the given name. The lifetime of this temporary

* table is tied to the [[SQLContext]] that was used to create this SchemaRDD.

*

* @group schema

*/

def registerTempTable(tableName: String): Unit = {

sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)

}

[align=left]//2 SchemaRDD的sql查询方法[/align]
/**

* Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is

* used for SQL parsing can be configured with 'spark.sql.dialect'.

*

* @group userf

*/

def sql(sqlText: String //
要执行的SQL语句
): SchemaRDD = {

if (dialect == "sql") { //默认的sqlcontext 的 dialect 方言默认使用sql 来查询

new SchemaRDD(this, parseSql(sqlText))

} else { //HIVE 通过改变 方言,可以改为 hiveQL 语法,来解析 语句

sys.error(s"Unsupported SQL dialect: $dialect")

}

}

[align=left]//3 spark sql = SchemaRDD,SchemaRDD继承RDD,所以sql后的结果集,可以使用RDD的方法来解析[/align]
class SchemaRDD(

@transient val sqlContext: SQLContext,

@transient val baseLogicalPlan: LogicalPlan)

extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

[align=left]spark SQL的sql官方例子[/align]
[align=left]1 spark sql的基础使用流程[/align]

[align=left]//0 准备spark sql的上下文环境[/align]
[align=left] val sparkConf = new SparkConf().setAppName("rdc spark sql test")[/align]
val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

import sqlContext._

[align=left]//1 准备数据[/align]
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))

//2 转成SchemaRDD
[align=left] rdd.registerTempTable("records")[/align]

//3 执行sql查询

sql("SELECT * FROM records").collect().foreach(println)

[align=left]2 spark sql 读取hive[/align]
[align=left][/align]
//0 准备spark sql 连 hive的上下文环境
val sparkConf = new SparkConf().setAppName("HiveFromSpark")

val sc = new SparkContext(sparkConf)

val hiveContext = new HiveContext(sc)

import hiveContext._

//2 spark sql查询hive原有数据

val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

[align=left]2 spark sql 1.1.0 中,使用 jdbc来使用spark sql[/align]

[align=left]和jdbc访问mysql一样。[/align]

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: