您的位置:首页 > 大数据 > Hadoop

spark解决方案系列--------1.spark-streaming实时Join存储在HDFS大量数据的解决方案

2016-03-30 10:05 603 查看
    spark-streaming实时接收数据并处理。一个非常广泛的需求是spark-streaming实时接收的数据需要跟保存在HDFS上的大量数据进行Join。要实现这个需求保证实时性需要解决以下几个问题:

1.spark-streaming的数据接收间隔往往很小,比如只有几秒钟。HDFS上的数据往往很大,不能每个batch都从HDFS读取数据,避免频繁大量磁盘I/O。HDFS的数据也可能会改变,只是改变后数据加载周期比spark-streaming的batch时间要长。

2.Cache到内存的数据,不能在streaming的一个batch处理结束之后被回收。

3.HDFS大量数据在跟Kafka等实时接收的数据进行Join的时候不能shuffle。如果发生shuffle,由于HDFS中数据非常大,势必会影响实时性。

4.HDFS加载到内存中的数据,不能频繁Checkpoint到磁盘。

    一个Spark-streaming application可以创建多个InputDStream,但是所有的InputDStream的数据接收时间间隔必须相同,因为数据接收间隔设置在了StreamingContext上。因此为了解决上面提到的第一个问题,需要实现一个自定义InputDStream。这个InputDStream需要将从HDFS上读取的数据Cache到内存,并且将Cache到内存的数据从前一个DStream传递到下一个DStream。

    Spark-streaming在每个batch创建了RDD之后,如果DStream的StorageLevel不为None会设置DStream的RDD的StorageLevel,源码如下:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {

val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}

rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {//设置DStream对应RDD的存储级别
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}

因此设置自定义InputDStream的StorageLevel为MEMORY_ONLY或者MEMORY_ONLY_SER可以解决HDFS数据cache到内存的问题。相关源码为:

storageLevel = StorageLevel.MEMORY_ONLY


    一个Spark-streaming batch处理结束之后,会发送ClearMetadata事件来清除这个batch的数据,具体源码如下:

private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))//清除remeberDuration时间间隔之前的RDD
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo("Removing blocks of RDD " + b + " of time " + time)
b.removeBlocks()//将RDD占用的内存块从BlockManager释放
case _ =>
}
}
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
}


从上面的代码可知,当一个batch处理结束之后,会将remeberDuration时间间隔之前的RDD删除,并且将这个RDD在BlockManager中占用的内存块释放。所以通过设置自定义InputDStream的remeberDuration来防止一个batch产生的RDD马上被释放。

    Spark-streaming每个batch都会产生RDD,为了避免每个Batch都从HDFS加载文件产生RDD,需要将HDFS中大文件产生的RDD在前后的DStream进行传递,因此在自定义InputDStream中定义一个成员变量,记录已经生成的RDD,并且这个成员变量只有在rememberDuration时间间隔才会发生改变。自定义InputDStream的compute方法定义为:

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
// Find new files
if (rddGenerateTime == 0 || Duration(validTime.milliseconds -  rddGenerateTime) >= durationToRemember){//从HDFS读取数据生成的RDD的周期为durationToRemember
val newFiles = findNewFiles(validTime.milliseconds)//这个方法会加载目录里面的所有文件
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
val rdds = Some(filesToRDD(newFiles))
rddGenerateTime = validTime.milliseconds//记录从HDFS读取数据的时间
val metadata = Map(
"files" -> newFiles.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
val inputInfo = StreamInputInfo(id, 0, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
rememberRDDs = rdds//记录从HDFS读取数据生成的RDD
rdds
}else {
/*
这样在DStream.getOrCompute方法里面会把这个RDD重复的放入generateRDDs这个HashMap里面.不过没有关系,因为generateRDDs里面的RDD
在超过rememberDuration之后会将RDD从generateRDDs里面删除,并且将RDD所占用的内存块也删除。在实际执行的时候不会发生重复删除内存块,因为删除内存块之前会先检查内存块是否存在
第一次删除之后,
以后再删除的时候发现内存块已经不存在了,会直接返回,不会重复删除
*/
//useRememberRDDTime = validTime.milliseconds
rememberRDDs

}

}

在上面的方法中,从HDFS读取数据生成的RDD的周期为durationToRemember,并且从HDFS读取数据生成的RDD保存在了自定义InputDStream的remeberRDDs成员。如果时间还没有达到durationToRemember则将remeberRDDs中保存的RDD作为这个batch产生的HadoopRDD,这样达到了RDD在前后DStream的传递,避免了频繁的读取HDFS数据。另外这个自定义DStream是仿照FileInputDStream类写的,它借鉴了FileInputDStream.compute方法对目录文件的监控,在一个[b]durationToRemember周期加载一次监控目录的所有文件产生RDD。[/b]

    最终的实现效果如下图所示:



在上图中,spark-streaming DStream的产生batch是2秒,但是NewHadoopRDD 1天才生成一次,并且Cache到了内存。在1天的remeberDuration时间间隔内不同的MyInputDSteam使用相同的NewHadoopRDD.

为了避免进行join的时候发生shuffle,将Kafka DStream每个batch的数据broadcast,进行join。测试代码如下:

kafkaMessages.transformWith[(String, String), (String, String, String)](hadoopMessages, (kafkaRDD:RDD[(String, String)], fileRDD:RDD[(String, String)]) =>{
val sqlContext = SQLContextSingleton.getInstance(kafkaRDD.sparkContext)
import sqlContext.implicits._
val kafkaDf = kafkaRDD.map{case (ip, str) =>KafkaObj(ip, str)}.toDF
//kafkaDf.registerTempTable("kafkatable")
val fileDf = fileRDD.map{case (ip, str) =>FileObj(ip, str)}.toDF
fileDf.join(broadcast(kafkaDf), "ip").rdd.map(row =>{
(row.get(0).toString, row.get(1).toString, row.get(2).toString)
})
})


自定义InputDStream的filesToRDD 方法将所有文件转化成RDD,源码如下:

private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map { file =>
val rdd = serializableConfOpt.map(_.value) match {
case Some(config) => {
context.sparkContext.newAPIHadoopFile(
file,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
config)
.map { case (k, v) =>
val newKey = new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
(newKey, newText)//因为NewHadoopRDD每个元素都占用了同一块内存,所以必须复制每个元素的k、v才能将这个RDD cache到内存

}
}
case None => {
context.sparkContext.newAPIHadoopFile[K, V, F](file)
.map{case (k, v) =>
val newKey =new LongWritable(k.asInstanceOf[LongWritable].get).asInstanceOf[K]
val newText = new Text(v.asInstanceOf[Text].toString).asInstanceOf[V]
(newKey, newText)<span style="font-family: Arial, Helvetica, sans-serif;">//因为NewHadoopRDD每个元素都占用了同一块内存,所以必须复制每个元素的k、v才能将这个RDD cache到内存</span>

}
}
}
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
rdd
}
new UnionRDD(context.sparkContext, fileRDDs)
}

因为NewHadoopRDD每个元素都占用了同一块内存,所以必须复制每个元素的k、v才能将这个RDD cache到内存

从上面的方法知道,filesToRDD返回的RDD并不是根RDD(根据NewHadoopRDD创建了UnionRDD),这个RDD也是自定义InputDStream的compute方法返回的RDD。

spark-streaming的DSteam.getOrCompute方法将compute返回的RDD checkpoint到HDFS,源码如下:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {

val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}

rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()//将DStream的compute方法产生的RDD checkpoint到HDFS
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}


如果设置了checkpoint,则将DStream的compute方法产生的RDD checkpoint到HDFS。

这个UnionRDD cache到了内存,但是如果发生checkpoint,会把这个UnionRDD写到磁盘,这样也会引起大量的磁盘I/O,为了解决这个问题,这个解决方案不能设置checkpoint。如果需要checkpoint,需要将KafkaDirectDStream每个Kafka分区的偏移量同步到zookeeper,每次程序重启的时候从zookeeper获取KafkaDirectDStream上一次读取到的位置。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: