spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
2015-10-16 08:26
633 查看
spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
自己新建一个scala文件命名为:HdfsWordCount ,包路径为com.pdl,然后内容如下:object HdfsWordCount { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: HdfsWordCount <hdfs dir>") System.exit(1) } // Create the context with a 5 second batch size val sparkConf = new SparkConf().setAppName("HdfsWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
然后打包放到master上
我的主机的名为node-25
命令如下:./bin/spark-submit --master spark://node-25:7077 --class com.pdl.HdfsWordCount XXX.jar zyx
需要注意的是:首先在hdfs上新建一个文件夹叫zyx,然后在spark的HdfsWordCount程序启动后,不断的向hdfs上传文件,上传的文件类型要一致
遇到的问题是:间隔时间要大,如果文件正在上传中就开始读取,那么会出现错误。
再说一下如何如何用spark-submit运行example中的NetworkWordCount 程序:
自己新建一个scala文件命名为:NetworkWordCount ,包路径为com.pdl,然后将example中的NetworkWordCount中的内容拷贝到你新建的scala中,代码如下:object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
然后打包放到master上
我的主机的名为node-25
命令如下:./bin/spark-submit --master spark://node-25:7077 --class com.pdl.NetworkWordCount XXX.jar 10.107.20.25 9999
其中后两个为main函数的参数
相关文章推荐
- HDFS客户端的权限错误:Permission denied
- Hadoop之HBase学习笔记
- hadoop-2.7.1 datanode
- HDFS 与 GFS 的设计差异
- 后端分布式系列:分布式存储-HDFS 与 GFS 的设计差异
- 后端分布式系列:分布式存储-HDFS 与 GFS 的设计差异
- 后端分布式系列:分布式存储-HDFS 与 GFS 的设计差异
- CDH版本 HDFS NFS Gateway 无法启动、挂载失败问题
- HDFS balancer实践
- 自动化挂载HDFS文件系统到本地目录
- Spark On Yarn(HDFS HA)详细配置过程
- Hadoop “Unable to load native-hadoop library for y
- flume 采集数据到hdfs
- Sqoop--关系型数据库跟hdfs数据传输工具
- HDFS简介
- Hadoop学习笔记-随手记
- 【转】HDFS 安全模式
- HDFS文件的合并
- HDFD 四个配置文件(core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml )的简单介绍
- flume+kafka+hdfs详解