SparkSQL2.0扩展外部数据源原理
2017-04-27 10:02
901 查看
spark2.0中,提供了两种扩展外部数据源的接口,
第一种外部数据源为文件,第二种外部数据源为系统
spark内部调用外部数据源包的类是下面,包括解析BaseRelation,提取schema等
package org.apache.spark.sql.execution.datasources
文件接口
files是连接的文件,返回SparkSQL的一张表结构。
这里用来提取schema的文件,也会在buildReader方法中获取到
isSplitable
直接返回true就行了,即可被hdfs按块读取。默认是false
buildReader
该方法返回一个方法,返回的方法参数为PartitionedFile,即一个文件块,包括起始和终止。处理这个分片,将数据构造成Internal的Iterator返回。主要实现hasnext和next方法,将数据按表结构一行一行交给sparkSQL
参数中的一些注意的地方:
spark.sql(select * from table_name).show()
这时requiredSchema是表中所有列
spark.read.XXX(path).createOrReplaceTempView(“table_name”)
spark.sql(select * from table_name where c1 = 1).count()
这时requiredSchema是where后出现的列
spark.read.XXX(path).createOrReplaceTempView(“table_name”)
spark.sql(select * from table_name).count()
这时requiredSchema是空,其实spark最后再计数时也没有拿数据,只是调用了count值遍的hasNext和next。可以考虑不真正取数据,只构造count个空的InternalRow,这样会快很多。
baseFolder/a=1/file1,baseFolder/a=2/file2
将baseFolder路径当做path传入时,partitionSchema中就是a,IntegerType类型。在表中会自动添加一列a,值是1或者2。
buildWriter
写文件
为数据源起名,暂时不知道具体在哪里起作用
inferSchema和buildReader之间参数传递:在DefaultSource类里,尽量不要在类中定义成员和方法,如果在inferSchema中初始化,在buildReader中使用会出现类序列化的问题,可以将要传的变量放在DefaultSource的伴生对象里。
创建一个自己实现的连接
返回表结构
其中第一个参数中的包括select语句后的列,也包括filters出现的列。filter是where后面的语法树,seq对象间的关系是and。
这个方法返回一个RDD,RDD中有getPartitions方法,可以重写此方法进行分区读取数据源
第一种外部数据源为文件,第二种外部数据源为系统
spark内部调用外部数据源包的类是下面,包括解析BaseRelation,提取schema等
package org.apache.spark.sql.execution.datasources
文件接口
扩展外部文件数据源
需要实现的接口所在文件fileSourceInterfaces.scala和interfaces.scala接口FileFormat
inferSchemadef inferSchema( sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType]
files是连接的文件,返回SparkSQL的一张表结构。
用xxx代表文件后缀,不同用法对应不同的参数:
通配符指定路径
``` read.xxx("hdfs:///data/*/*.xxx") ``` 这时files中包括所有匹配到的文件
指定文件夹
``` read.xxx("hdfs:///data/") ``` 这时files中会包括/data文件夹下的所有文件
指定文件
``` read.xxx("hdfs:///data/a.xxx") ``` 这时files中就只有一个a.xxx文件
这里用来提取schema的文件,也会在buildReader方法中获取到
isSplitable
def isSplitable( sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = { false }
直接返回true就行了,即可被hdfs按块读取。默认是false
buildReader
def buildReader( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { // TODO: Remove this default implementation when the other formats have been ported // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") }
该方法返回一个方法,返回的方法参数为PartitionedFile,即一个文件块,包括起始和终止。处理这个分片,将数据构造成Internal的Iterator返回。主要实现hasnext和next方法,将数据按表结构一行一行交给sparkSQL
参数中的一些注意的地方:
requiredSchema
spark.read.XXX(path).createOrReplaceTempView(“table_name”)spark.sql(select * from table_name).show()
这时requiredSchema是表中所有列
spark.read.XXX(path).createOrReplaceTempView(“table_name”)
spark.sql(select * from table_name where c1 = 1).count()
这时requiredSchema是where后出现的列
spark.read.XXX(path).createOrReplaceTempView(“table_name”)
spark.sql(select * from table_name).count()
这时requiredSchema是空,其实spark最后再计数时也没有拿数据,只是调用了count值遍的hasNext和next。可以考虑不真正取数据,只构造count个空的InternalRow,这样会快很多。
partitionSchema
当目录结构如下时:其中baseFoder和a=1,a=2都是文件夹名baseFolder/a=1/file1,baseFolder/a=2/file2
将baseFolder路径当做path传入时,partitionSchema中就是a,IntegerType类型。在表中会自动添加一列a,值是1或者2。
buildWriter
写文件
接口DataSourceRegister
shortName为数据源起名,暂时不知道具体在哪里起作用
inferSchema和buildReader之间参数传递:在DefaultSource类里,尽量不要在类中定义成员和方法,如果在inferSchema中初始化,在buildReader中使用会出现类序列化的问题,可以将要传的变量放在DefaultSource的伴生对象里。
object DefaultSource { val logger: Logger = LoggerFactory.getLogger(getClass) val path = "path" val keys = new ArrayBuffer[String]() }
扩展外部系统数据源
需要实现的接口所在文件interfaces.scala接口RelationProvider
createRelationdef createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
创建一个自己实现的连接
抽象类BaseRelation
schema返回表结构
接口PrunedFilteredScan
buildScandef buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
其中第一个参数中的包括select语句后的列,也包括filters出现的列。filter是where后面的语法树,seq对象间的关系是and。
这个方法返回一个RDD,RDD中有getPartitions方法,可以重写此方法进行分区读取数据源
相关文章推荐
- 通过自定义SparkSQL外部数据源实现SparkSQL读取HBase
- Spark SQL之External DataSource外部数据源
- SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)
- 9. Spark SQL:JDBC数据源实战
- 9. Spark SQL:JDBC数据源实战
- [2.6]Spark SQL 操作各种数据源笔记
- 基于spark2.0整合spark-sql + mysql + parquet + HDFS
- 初识Spark2.0之Spark SQL
- Spark2.0-RDD分区原理分析
- spark sql读取mysql数据源并且将结果写入mysql
- Spark利用hive与MySQL外部数据源做join
- 大数据IMF传奇行动绝密课程第69课:Spark SQL通过Hive数据源实战
- SparkSQL编程指南之Java篇二-数据源(上)
- spark2.0原理分析--RDD Lineage(逻辑执行计划)
- sparksql各种数据源
- sparksql读取hive数据源配置
- 2.Spark SQL:数据源之通用的load和save操作
- 2.Spark SQL:数据源之通用的load和save操作
- 2.Spark SQL:数据源之通用的load和save操作
- 4.Spark SQL:数据源Parquet之使用编程方式加载数据