Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。
2017-07-11 14:07
323 查看
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。 * Created by csw on 2017/7/4. */ object HDFSDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val config = new SparkConf().setAppName("Spark shell") val ssc = new StreamingContext(config, Seconds(10)) val lines = ssc.textFileStream("hdfs://master:9000/csw/tmp2/test/") val words: DStream[String] = lines.flatMap(_.split(" ")) val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
//下满是获取Linux本地的文件
val lines = ssc.textFileStream("file:///csw/tmp/test2")
相关文章推荐
- Python3基础 os.path.basename 对一个路径字符串进行处理 返回 文件的名字
- 一个在DOS下处理中文和长名字目录和文件的程序(原创)
- windows下读取一个文件夹下面所有文件的名字
- renamePNG.sh用来进行文件的批处理,替换所有文件的统一的名字
- 主机Window不能访问该虚拟机Linux Samba文件服务提供了一个文件夹
- 写一个脚本,对一个文件夹内指定格式的文件做重命名处理,要求:重命名为数字编号从1开始,当有新增文件的时候,自动重名并跟随末尾编号格式,中间删减的时候,末尾名称自动改名补回空缺序号
- 如果在一个B/S结构的系统中需要传递变量值,但是又不能使用Session、Cookie、Application,您有几种方法进行处理?
- 封装一个对磁盘,文件夹,文件进行遍历的类
- 使用Thumbnails对一个文件夹下的所有图片进行压缩处理
- C#”文件中的类都不能进行设计,因此未能为该文件显示设计器“处理办法
- hdfs中将文件夹下所有.bz2文件进行解压并且合并,然后传到本地
- 递归遍历一个文件夹,对文件进行操作,使用lstat时的悲剧
- 批处理,用一个exe处理一个文件夹下的所有txt文件
- Spark Streaming、HDFS结合Spark JDBC External DataSouces处理案例
- 一个对磁盘,文件夹,文件进行遍历的类
- 编程产生一个int数组,长度为30,并向其中随机插入1-30,并且不能重复输出数组。实现一个冒泡排序算法对其进行排序,输出排序结果
- 复制一个文件夹包括里面的文件,并以指定名字命名
- VS IDE Release或Debug进行运行时,在Debug或Release文件夹中的文件或者库文件不能直接找到,需要加上Debug或Release目录
- java对txt文本文件的复制粘贴(对文件内容进行了简单的修改)以行为一个处理单位
- 一个简单的利用ini配置文件以及用户输入的交互进行文件夹内容拷贝的批处理文件脚本的实现