Spark Streaming监控HDFS输入流
2017-07-11 19:02
330 查看
Spark Streaming实时监控HDFS文件系统写入的数据。
以下是相关代码和注解以及特殊方法的源代码注释:
以下是完整代码的地址(https://github.com/DragonTong/Streaming/blob/master/src/main/scala/streaming/SourceHdfs.scala)
编译程序,在集群环境下运行:
spark-submit –class streaming.SourceHdfs –master spark://master:7077 spark.streaming.pro-1.0-SNAPSHOT.jar
注:第二次运行时会报错需要把Checkpoint_Data文件夹里面的内容删除。深入理解Checkpoint后再解决这个BUG。
报错内容:
以下是相关代码和注解以及特殊方法的源代码注释:
object SourceHdfs { /** * Definition HDFS checkpoint directory address */ val checkpointDirectory = "hdfs://master:9000/sparkStreaming/Checkpoint_Data" def main(args: Array[String]): Unit = { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. */ val context = StreamingContext.getOrCreate(checkpointDirectory, createContext _) /** * Configure inputDStream source that HDFS address * No Receiver, SparkStreaming application monitor batch by timer */ val DStream = context.textFileStream("hdfs://master:9000/quality/clipper_erp/2017-07-11") val wordCount = DStream.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_) wordCount.print() context.start() context.awaitTermination() } /** * Create spark streamingContext function for getOrCreate method */ def createContext(): StreamingContext ={ val conf = new SparkConf() .setAppName("HDFSInputData") .setMaster("spark://master:7077") val ssc = new StreamingContext(conf, Seconds(10)) ssc.checkpoint(checkpointDirectory) ssc } }
以下是完整代码的地址(https://github.com/DragonTong/Streaming/blob/master/src/main/scala/streaming/SourceHdfs.scala)
1. checkpoint 保持RDD状态和容错
/** 1. Set the context to periodically checkpoint the DStream operations for driver 2. fault-tolerance. 3. @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. 4. Note that this must be a fault-tolerant file system like HDFS. */ def checkpoint(directory: String) { if (directory != null) { val path = new Path(directory) val fs = path.getFileSystem(sparkContext.hadoopConfiguration) fs.mkdirs(path) val fullPath = fs.getFileStatus(path).getPath().toString sc.setCheckpointDir(fullPath) checkpointDir = fullPath } else { checkpointDir = null } }
2. getOrCreate 创建StreamingContext
/** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be * recreated from the checkpoint data. If the data does not exist, then the StreamingContext * will be created by called the provided `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc Function to create a new StreamingContext * @param hadoopConf Optional Hadoop configuration if necessary for reading from the * file system bd7a * @param createOnError Optional, whether to create a new StreamingContext if there is an * error in reading checkpoint data. By default, an exception will be * thrown on error. */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { val checkpointOption = CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError) checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) }
编译程序,在集群环境下运行:
spark-submit –class streaming.SourceHdfs –master spark://master:7077 spark.streaming.pro-1.0-SNAPSHOT.jar
注:第二次运行时会报错需要把Checkpoint_Data文件夹里面的内容删除。深入理解Checkpoint后再解决这个BUG。
报错内容:
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@321ca237 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:102) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at streaming.SourceHdfs$.main(SourceHdfs.scala:21) at streaming.SourceHdfs.main(SourceHdfs.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17/07/11 19:19:27 INFO spark.SparkContext: Invoking stop() from shutdown hook
相关文章推荐
- spark streaming监控HDFS文件目录
- Spark Streaming 监控HDFS目录
- spark streaming 应用程序监控
- Spark Streaming之使用Spark Streaming处理文件系统(local/hdfs)的数据
- How-to: make spark streaming collect data from Kafka topics and store data into hdfs
- Python爬虫之Spark Streaming任务监控
- 第85讲:基于HDFS的SparkStreaming案例实战和内幕源码解密
- flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
- SparkStreaming之HDFS操作
- spark streaming 写入db,hdfs
- spark streaming读取HDFS
- Spark Streaming---HDFSwordcount
- Spark Streaming---HDFSwordcount
- Spark Streaming场景应用- Spark Streaming计算模型及监控
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- spark解决方案系列--------1.spark-streaming实时Join存储在HDFS大量数据的解决方案
- IMF传奇行动第85课:Spark Streaming第四课:基于HDFS的Spark Streaming案例实战和内幕源码解密
- Spark Streaming场景应用- Spark Streaming计算模型及监控
- 实时计算实践(spark streaming+kafka+hdfs)
- SparkStream例子HdfsWordCount--Streaming的Job是如何调度的