您的位置:首页 > 其它

如何实现一个spark数据源

2016-04-07 00:13 190 查看

如何实现一个spark数据源(基于spark 1.6.1)

简介

spark data source API是一套接口,让开发者可以通过实现这些接口,从而将存储在磁盘(或者hdfs whatever)上的各种格式的数据转化为DataFrame;或者将DataFrame写为各种格式的数据保存起来。

这些接口定义在:

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

一般来讲,所有的数据源都要实现以下三个接口:

BaseRelation 代表了一个抽象的数据源。该数据源由一行行有着已知schema的数据组成(关系表)。

TableScan 用于扫描整张表,将数据返回成RDD[Row]。

RelationProvider 顾名思义,根据用户提供的参数返回一个数据源(BaseRelation)。

当然,spark也提供了其他接口来丰富你的数据源的功能, 这些都可以在上述的文件中找到相应的描述。比如PrunedFilteredScan接口,spark将需要的行和列传入,那么你的数据源就可以根据这些信息过滤掉不需要的行和列。如果某些表达式无法处理,那么还可以把不能处理的表达式返回给spark,数据返回后让spark进行处理。再比如CatalystScan接口,实现了这个接口后,spark会把内部的expression传给数据源,让数据源来处理这些表达式。如果数据源本身有索引或者其他meta信息,那么可以得到相应的性能提升。

自定义数据源的使用

可以通过DataFrame来使用

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("full.package.name.to.your.dataSource")
.option("key1", "value1")
.option("key2", "value2")
.load("/path/to/your/data/file")


也可以通过SPARK SQL来使用

create table your_table_name
using full.package.name.to.your.datasource
option (path "/path/to/your/data/file",...)


当spark执行这条sql时,会寻找 full.package.name.to.your.datas

ource.DefaultSource这个类。这个类应该是一个RelationProvider。spark会实例化一个DefaultSource对象,并调用createRelation方法,返回一个BaseRelation。BaseRelation包含了生成这张表所需要的所有信息。之后spark会把这些信息保存到一个catalog(或者是hive的metastore,或者spark自己的metastore,依赖于你的配置)里面,等待下次查询使用。

查询时执行:

select * from your_table_name where field1 = 'a'


spark 会从从catalog里面找到上述的relation,形成一个LogicalPlan。再经过一系列复杂转换,最终调用buildScan,生成RDD[Row]。完成后将RDD[Row]返回给spark进行下一步的计算,最终返回结果。

自定义数据源的实现

一般情况下BaseRelation和TableScan会放在一个类中实现,然后再额外实现一个RelationProvider,通过createRelation方法返回一个BaseRelation。

代码示例

import org.apache.spark.sql.sources._
import org.apache.spark.sql.SQLContext
class DefaultSource extends RelationProvider {
// maybe some initiation code

// option 后面的key-value对会传入parameters
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]):BaseRelation = {
// maybe some checking code
val path = parameters("path")
val param1 = parameters("key1")
val param2 = parameters("key2")
YourDataSoureRelation(
sqlContext,
path,
param1,
param2)
}
}

case class YourDataSourceRelation(
sqlContext: SQLContext,
path: String,
param1: String,
param2: String) extends BaseRelation with TableScan {
override def buildScan: RDD[Row] = {
// your code to create the RDD
}

}


参考资料:

https://github.com/databricks/spark-csv

https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html

Developer Meet-up: The Spark SQL Optimizer and External Data Sources API, https://www.youtube.com/watch?v=GQSNJAzxOr8
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark