您的位置:首页 > 数据库

SparkSQL2.0扩展外部数据源原理

2017-04-27 10:02 901 查看
spark2.0中,提供了两种扩展外部数据源的接口,

第一种外部数据源为文件,第二种外部数据源为系统

spark内部调用外部数据源包的类是下面,包括解析BaseRelation,提取schema等

package org.apache.spark.sql.execution.datasources

文件接口

扩展外部文件数据源

需要实现的接口所在文件fileSourceInterfaces.scala和interfaces.scala

接口FileFormat

inferSchema

def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]


files是连接的文件,返回SparkSQL的一张表结构。

用xxx代表文件后缀,不同用法对应不同的参数:


通配符指定路径

```
read.xxx("hdfs:///data/*/*.xxx")
```
这时files中包括所有匹配到的文件


指定文件夹

```
read.xxx("hdfs:///data/")
```
这时files中会包括/data文件夹下的所有文件


指定文件

```
read.xxx("hdfs:///data/a.xxx")
```
这时files中就只有一个a.xxx文件


这里用来提取schema的文件,也会在buildReader方法中获取到

isSplitable

def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
false
}


直接返回true就行了,即可被hdfs按块读取。默认是false

buildReader

def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}


该方法返回一个方法,返回的方法参数为PartitionedFile,即一个文件块,包括起始和终止。处理这个分片,将数据构造成Internal的Iterator返回。主要实现hasnext和next方法,将数据按表结构一行一行交给sparkSQL

参数中的一些注意的地方:

requiredSchema

spark.read.XXX(path).createOrReplaceTempView(“table_name”)

spark.sql(select * from table_name).show()

这时requiredSchema是表中所有列

spark.read.XXX(path).createOrReplaceTempView(“table_name”)

spark.sql(select * from table_name where c1 = 1).count()

这时requiredSchema是where后出现的列

spark.read.XXX(path).createOrReplaceTempView(“table_name”)

spark.sql(select * from table_name).count()

这时requiredSchema是空,其实spark最后再计数时也没有拿数据,只是调用了count值遍的hasNext和next。可以考虑不真正取数据,只构造count个空的InternalRow,这样会快很多。

partitionSchema

当目录结构如下时:其中baseFoder和a=1,a=2都是文件夹名

baseFolder/a=1/file1,baseFolder/a=2/file2

将baseFolder路径当做path传入时,partitionSchema中就是a,IntegerType类型。在表中会自动添加一列a,值是1或者2。

buildWriter

写文件

接口DataSourceRegister

shortName

为数据源起名,暂时不知道具体在哪里起作用

inferSchema和buildReader之间参数传递:在DefaultSource类里,尽量不要在类中定义成员和方法,如果在inferSchema中初始化,在buildReader中使用会出现类序列化的问题,可以将要传的变量放在DefaultSource的伴生对象里。

object DefaultSource {
val logger: Logger = LoggerFactory.getLogger(getClass)
val path = "path"
val keys = new ArrayBuffer[String]()
}


扩展外部系统数据源

需要实现的接口所在文件interfaces.scala

接口RelationProvider

createRelation

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation


创建一个自己实现的连接

抽象类BaseRelation

schema

返回表结构

接口PrunedFilteredScan

buildScan

def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]


其中第一个参数中的包括select语句后的列,也包括filters出现的列。filter是where后面的语法树,seq对象间的关系是and。

这个方法返回一个RDD,RDD中有getPartitions方法,可以重写此方法进行分区读取数据源


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  SparkSQL