您的位置:首页 > 大数据 > Hadoop

spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数

2015-10-16 08:26 633 查看

spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数

自己新建一个scala文件命名为:HdfsWordCount ,包路径为com.pdl,然后内容如下:

object HdfsWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <hdfs dir>")
System.exit(1)
}
// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}


然后打包放到master上

我的主机的名为node-25

命令如下:./bin/spark-submit --master spark://node-25:7077 --class com.pdl.HdfsWordCount XXX.jar zyx

需要注意的是:首先在hdfs上新建一个文件夹叫zyx,然后在spark的HdfsWordCount程序启动后,不断的向hdfs上传文件,上传的文件类型要一致

遇到的问题是:间隔时间要大,如果文件正在上传中就开始读取,那么会出现错误。

再说一下如何如何用spark-submit运行example中的NetworkWordCount 程序:

自己新建一个scala文件命名为:NetworkWordCount ,包路径为com.pdl,然后将example中的NetworkWordCount中的内容拷贝到你新建的scala中,代码如下:

object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}


然后打包放到master上

我的主机的名为node-25

命令如下:./bin/spark-submit --master spark://node-25:7077 --class com.pdl.NetworkWordCount XXX.jar 10.107.20.25 9999

其中后两个为main函数的参数



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: