您的位置:首页 > 运维架构

Spark Streaming监控HDFS输入流

2017-07-11 19:02 330 查看
Spark Streaming实时监控HDFS文件系统写入的数据。

以下是相关代码和注解以及特殊方法的源代码注释:

object SourceHdfs {

/**
* Definition HDFS checkpoint directory address
*/
val checkpointDirectory = "hdfs://master:9000/sparkStreaming/Checkpoint_Data"

def main(args: Array[String]): Unit = {

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
*/
val context = StreamingContext.getOrCreate(checkpointDirectory, createContext _)

/**
* Configure inputDStream source that HDFS address
* No Receiver, SparkStreaming application monitor batch by timer
*/
val DStream = context.textFileStream("hdfs://master:9000/quality/clipper_erp/2017-07-11")

val wordCount = DStream.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_)

wordCount.print()

context.start()
context.awaitTermination()

}

/**
* Create spark streamingContext function for getOrCreate method
*/
def createContext(): StreamingContext ={

val conf = new SparkConf()
.setAppName("HDFSInputData")
.setMaster("spark://master:7077")

val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint(checkpointDirectory)
ssc
}

}


以下是完整代码的地址(https://github.com/DragonTong/Streaming/blob/master/src/main/scala/streaming/SourceHdfs.scala)

1. checkpoint 保持RDD状态和容错

/**
1. Set the context to periodically checkpoint the DStream operations for driver
2. fault-tolerance.
3. @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
4.                  Note that this must be a fault-tolerant file system like HDFS.
*/

def checkpoint(directory: String) {
if (directory != null) {
val path = new Path(directory)
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
fs.mkdirs(path)
val fullPath = fs.getFileStatus(path).getPath().toString
sc.setCheckpointDir(fullPath)
checkpointDir = fullPath
} else {
checkpointDir = null
}
}


2. getOrCreate 创建StreamingContext

/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc`.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc   Function to create a new StreamingContext
* @param hadoopConf     Optional Hadoop configuration if necessary for reading from the
*                       file system

bd7a
* @param createOnError  Optional, whether to create a new StreamingContext if there is an
*                       error in reading checkpoint data. By default, an exception will be
*                       thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}


编译程序,在集群环境下运行:

spark-submit –class streaming.SourceHdfs –master spark://master:7077 spark.streaming.pro-1.0-SNAPSHOT.jar

注:第二次运行时会报错需要把Checkpoint_Data文件夹里面的内容删除。深入理解Checkpoint后再解决这个BUG。

报错内容:

Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@321ca237 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:102)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at streaming.SourceHdfs$.main(SourceHdfs.scala:21)
at streaming.SourceHdfs.main(SourceHdfs.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/07/11 19:19:27 INFO spark.SparkContext: Invoking stop() from shutdown hook
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: