您的位置:首页 > 运维架构

Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。

2017-07-11 14:07 323 查看
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。
* Created by csw on 2017/7/4.
*/
object HDFSDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val config = new SparkConf().setAppName("Spark shell")
val ssc = new StreamingContext(config, Seconds(10))
val lines = ssc.textFileStream("hdfs://master:9000/csw/tmp2/test/")
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}


//下满是获取Linux本地的文件

val lines = ssc.textFileStream("file:///csw/tmp/test2")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐