spark源码阅读一-spark-mongodb代码分析
2017-07-31 15:47
483 查看
源码的github地址https://github.com/mongodb/mongo-spark,是mongodb发布的spark connection接口库,可以方便的使用spark读写mongodb数据
1.rdd写入mongodb
两种方式将生成的rdd写入mongodb,事例代码:
val sc = getSparkContext(args)
import com.mongodb.spark._
import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents)
import com.mongodb.spark.config._
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)
调用函数如下
MongoSpark.save(rdd)
MongoSpark.save(rdd, writeConfig))
看MongoSpark.save的定义:def save[D: ClassTag](rdd: RDD[D]): Unit = save(rdd, WriteConfig(rdd.sparkContext)),
实际最终都调用到了MongoSpark.save(rdd, writeConfig)),来看这个函数实现:
具体就是mongoConnector.withCollectionDo对rdd每个partition每条记录写入mongodb。
2.从mongodb读出数据到rdd
读取的方式也是两种,事例代码
val rdd = MongoSpark.load(sc)
println(rdd.count)
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
跟写入mongdb save函数类似,读取函数最终调用MongoSpark.load(sc, readConfig)
最终返回mongo RDD,它是继承于spark RDD,有了RDD就可以进行各种map reduce处理了。那么现在就有个疑问了,mongo db里面存储的数据记录非常多,而每个RDD都是有多个partition,是如何拆分这些partition呢?涉及到read config和partition配置,说明文档在https://github.com/mongodb/mongo-spark/blob/master/doc/2-configuring.md。
举个例子说明下,ReadConfig可以配置partitioner,默认是MongoDefaultPartitioner。看看MongoDefaultPartitioner的参数,它封装了MongoSamplePartitioner。其他的partitioner可以参考上述说明文档。
具体的代码实现MongoSamplePartitioner.scala和PartitionerHelper.scala。
从MongoSamplePartitioner.scala开始,先取出所有数据,计算每个分区记录条数
假设每个记录1k,那么默认就是每个分区64k个文档对象;还要算上每个分区取样个数,默认是10个取样,假设db有128k记录,那么按照算法取样数numberOfSamples就是20个。最终就是分成2个分区,每个分区10条文档记录。
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1
val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}
最后就到关键点了,创建分区,进入PartitionerHelper.scala
val partitions = PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector), addMinMax)
根据前面划分好的mongo db索引区间生成MongoPartition。我们可以看到partition只是保存了每条记录的key和db server ip,是在真正计算的时候才读取出来。
其他的分区方式代码就是类似了,其实使用多的应该是MongoShardedPartitioner和MongoSplitVectorPartitioner,这里就不再说明,参考代码。
1.rdd写入mongodb
两种方式将生成的rdd写入mongodb,事例代码:
val sc = getSparkContext(args)
import com.mongodb.spark._
import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents)
import com.mongodb.spark.config._
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)
调用函数如下
MongoSpark.save(rdd)
MongoSpark.save(rdd, writeConfig))
看MongoSpark.save的定义:def save[D: ClassTag](rdd: RDD[D]): Unit = save(rdd, WriteConfig(rdd.sparkContext)),
实际最终都调用到了MongoSpark.save(rdd, writeConfig)),来看这个函数实现:
def save[D: ClassTag](rdd: RDD[D], writeConfig: WriteConfig): Unit = { val mongoConnector = MongoConnector(writeConfig.asOptions) rdd.foreachPartition(iter => if (iter.nonEmpty) { mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] => iter.grouped(writeConfig.maxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava)) }) }) } |
2.从mongodb读出数据到rdd
读取的方式也是两种,事例代码
val rdd = MongoSpark.load(sc)
println(rdd.count)
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
跟写入mongdb save函数类似,读取函数最终调用MongoSpark.load(sc, readConfig)
def load[D: ClassTag](sc: SparkContext, readConfig: ReadConfig)(implicit e: D DefaultsTo Document): MongoRDD[D] = builder().sparkContext(sc).readConfig(readConfig).build().toRDD[D]() def builder(): Builder = new Builder Builder代码 def sparkContext(sparkContext: SparkContext): Builder = { this.sparkSession = Option(SparkSession.builder().config(sparkContext.getConf).getOrCreate()) this } def readConfig(readConfig: ReadConfig): Builder = { this.readConfig = Option(readConfig) this } def build(): MongoSpark = { require(sparkSession.isDefined, "The SparkSession must be set, either explicitly or via the SparkContext”) val session = sparkSession.get val readConf = readConfig.isDefined match { case true => ReadConfig(options, readConfig) case false => ReadConfig(session.sparkContext.getConf, options) } val mongoConnector = connector.getOrElse(MongoConnector(readConf)) val bsonDocumentPipeline = pipeline.map(x => x.toBsonDocument(classOf[Document], mongoConnector.codecRegistry)) new MongoSpark(session, mongoConnector, readConf, bsonDocumentPipeline) } 根据readconfig配置mongo connector和pipeline,最终还是调用MongoSpark.toRDD代码 def toRDD[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] = rdd[D] private def rdd[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] = new MongoRDD[D](sparkSession, sparkSession.sparkContext.broadcast(connector), readConfig, pipeline) |
举个例子说明下,ReadConfig可以配置partitioner,默认是MongoDefaultPartitioner。看看MongoDefaultPartitioner的参数,它封装了MongoSamplePartitioner。其他的partitioner可以参考上述说明文档。
Property name | Description | Default value |
partitionKey | The field to partition the collection by. The field should be indexed and contain unique values.用哪个字段来进行分区 | _id |
partitionSizeMB | The size (in MB) for each partition.每个partition大小 | 64 |
samplesPerPartition | The number of sample documents to take for each partition.每个paritition里抽取的文档记录个数 |
从MongoSamplePartitioner.scala开始,先取出所有数据,计算每个分区记录条数
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue() val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt |
def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1
val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}
最后就到关键点了,创建分区,进入PartitionerHelper.scala
val partitions = PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector), addMinMax)
根据前面划分好的mongo db索引区间生成MongoPartition。我们可以看到partition只是保存了每条记录的key和db server ip,是在真正计算的时候才读取出来。
其他的分区方式代码就是类似了,其实使用多的应该是MongoShardedPartitioner和MongoSplitVectorPartitioner,这里就不再说明,参考代码。
相关文章推荐
- spark源码阅读一-spark读写文件代码分析
- spark源码阅读一-spark读写hbase代码分析
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- Spark源码阅读笔记:Standalone模式集群核心角色代码浅析
- python源码分析阅读理解chapter01~05 (纯阅读书籍,代码未看)
- spark源码阅读3-Task运行期之函数调用关系分析
- Spark BlockManager的通信及内存占用分析(源码阅读九)
- spark2.1源码分析1:Win10下IDEA源码阅读环境的搭建
- spark2.1源码分析1:Win10下IDEA源码阅读环境的搭建
- MongoDB源码阅读之Shard源码分析--CongfigServer启动
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- spark源码分析(2)-源码阅读环境准备
- Spark2.2源码之Task任务提交源码分析
- Spark-2.0 搭建源码阅读环境
- Mongodb源码分析--更新记录
- 第44讲:Scala中View Bounds代码实战及其在Spark中的应用源码解析
- Mongodb源码分析--删除记录
- Scala 深入浅出实战经典 第48讲:Scala类型约束代码实战及其在Spark中的应用源码解析
- 第四课 Scala模式匹配、类型系统彻底精通与Spark源码阅读
- Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理