如何实现一个spark数据源
2016-04-07 00:13
190 查看
如何实现一个spark数据源(基于spark 1.6.1)
简介
spark data source API是一套接口,让开发者可以通过实现这些接口,从而将存储在磁盘(或者hdfs whatever)上的各种格式的数据转化为DataFrame;或者将DataFrame写为各种格式的数据保存起来。这些接口定义在:
sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
一般来讲,所有的数据源都要实现以下三个接口:
BaseRelation 代表了一个抽象的数据源。该数据源由一行行有着已知schema的数据组成(关系表)。
TableScan 用于扫描整张表,将数据返回成RDD[Row]。
RelationProvider 顾名思义,根据用户提供的参数返回一个数据源(BaseRelation)。
当然,spark也提供了其他接口来丰富你的数据源的功能, 这些都可以在上述的文件中找到相应的描述。比如PrunedFilteredScan接口,spark将需要的行和列传入,那么你的数据源就可以根据这些信息过滤掉不需要的行和列。如果某些表达式无法处理,那么还可以把不能处理的表达式返回给spark,数据返回后让spark进行处理。再比如CatalystScan接口,实现了这个接口后,spark会把内部的expression传给数据源,让数据源来处理这些表达式。如果数据源本身有索引或者其他meta信息,那么可以得到相应的性能提升。
自定义数据源的使用
可以通过DataFrame来使用import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("full.package.name.to.your.dataSource") .option("key1", "value1") .option("key2", "value2") .load("/path/to/your/data/file")
也可以通过SPARK SQL来使用
create table your_table_name using full.package.name.to.your.datasource option (path "/path/to/your/data/file",...)
当spark执行这条sql时,会寻找 full.package.name.to.your.datas
ource.DefaultSource这个类。这个类应该是一个RelationProvider。spark会实例化一个DefaultSource对象,并调用createRelation方法,返回一个BaseRelation。BaseRelation包含了生成这张表所需要的所有信息。之后spark会把这些信息保存到一个catalog(或者是hive的metastore,或者spark自己的metastore,依赖于你的配置)里面,等待下次查询使用。
查询时执行:
select * from your_table_name where field1 = 'a'
spark 会从从catalog里面找到上述的relation,形成一个LogicalPlan。再经过一系列复杂转换,最终调用buildScan,生成RDD[Row]。完成后将RDD[Row]返回给spark进行下一步的计算,最终返回结果。
自定义数据源的实现
一般情况下BaseRelation和TableScan会放在一个类中实现,然后再额外实现一个RelationProvider,通过createRelation方法返回一个BaseRelation。代码示例
import org.apache.spark.sql.sources._ import org.apache.spark.sql.SQLContext class DefaultSource extends RelationProvider { // maybe some initiation code // option 后面的key-value对会传入parameters override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]):BaseRelation = { // maybe some checking code val path = parameters("path") val param1 = parameters("key1") val param2 = parameters("key2") YourDataSoureRelation( sqlContext, path, param1, param2) } } case class YourDataSourceRelation( sqlContext: SQLContext, path: String, param1: String, param2: String) extends BaseRelation with TableScan { override def buildScan: RDD[Row] = { // your code to create the RDD } }
参考资料:
https://github.com/databricks/spark-csvhttps://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html
Developer Meet-up: The Spark SQL Optimizer and External Data Sources API, https://www.youtube.com/watch?v=GQSNJAzxOr8
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 编译Spark 1.5.2