Spark Streaming 1:入门程序windows或linux下监听端口或本地目录的wordcount
2017-04-15 21:03
561 查看
Spark Streaming Programming Guide 1.6.2官方指导
http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
Spark Streaming可以监听本地文件、HDFS、端口、flume、kafka等。
配置SparkContext时,需要‘local[2]’,因为需要两个线程,一个端口监听一个计算
每十秒进行一次计算
2. 获取数据源
打开一个命令终端,输入 nc -lk 9999 , 然后在输入东西,sparkstreaming就会每隔10秒进行一次计算
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("NetworkWordCount")
ssc = StreamingContext(sc, 10)
# 添加需要监听的本地路径
lines=ssc.textFileStream("file:///input/flume/source")
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
#wordCounts.pprint()
wordCounts.saveAsTextFiles("/input/","txt")
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
2. 获取数据源
在linux中使用 cp命令向目录直接添加文件。手动复制不能监听,不知道是什么原因
在windows下,直接复制也是不能监听
可以利用代码向目录中写入文件,win和linux都可行
import time
import os
path = r'file:///input/flume/source'
for name in os.listdir(path):
file = path+'/'+name
os.remove(file)
for i in range(5):
#每隔10秒 写入一次
time.sleep(10)
file = open( path + '/data' +str(i) +r'.txt', 'w')
file.write('word,count\n\
hello,word\n\
666,666')
print i
file.close()
参考:
http://blog.csdn.net/jianghuxiaojin/article/details/51452593
http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
Spark Streaming可以监听本地文件、HDFS、端口、flume、kafka等。
Linux下监听端口9999实现wordcount:
1.代码实现配置SparkContext时,需要‘local[2]’,因为需要两个线程,一个端口监听一个计算
每十秒进行一次计算
#------------------------------word count----------------------------------------------------------- from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 10 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 10) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() wordCounts.saveAsTextFiles("/input/","txt") ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
2. 获取数据源
打开一个命令终端,输入 nc -lk 9999 , 然后在输入东西,sparkstreaming就会每隔10秒进行一次计算
监听本地目录实现wordcount:
1. 代码实现# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("NetworkWordCount")
ssc = StreamingContext(sc, 10)
# 添加需要监听的本地路径
lines=ssc.textFileStream("file:///input/flume/source")
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
#wordCounts.pprint()
wordCounts.saveAsTextFiles("/input/","txt")
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
2. 获取数据源
在linux中使用 cp命令向目录直接添加文件。手动复制不能监听,不知道是什么原因
在windows下,直接复制也是不能监听
可以利用代码向目录中写入文件,win和linux都可行
import time
import os
path = r'file:///input/flume/source'
for name in os.listdir(path):
file = path+'/'+name
os.remove(file)
for i in range(5):
#每隔10秒 写入一次
time.sleep(10)
file = open( path + '/data' +str(i) +r'.txt', 'w')
file.write('word,count\n\
hello,word\n\
666,666')
print i
file.close()
参考:
http://blog.csdn.net/jianghuxiaojin/article/details/51452593
http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
相关文章推荐
- SparkStreaming快速入门程序----WordCount
- spark快速入门与WordCount程序机制深度解析 spark研习第二季
- Windows 上面搭建 Spark + Pycharm/idea scala/python 本地编写Spark程序,测试通过后再提交到Linux集群上
- Spark Streaming WordCount 入门
- spark学习11(Wordcount程序-本地测试)
- Spark Streaming开发入门——WordCount(Java&Scala)
- spark on yarn 出现的问题【测试wordcount程序遇到的错误,ipc.client连接到yarn的端口失败】
- Spark2.x学习笔记:16、Spark Streaming入门实例NetworkWordCount
- spark入门实战windows本地测试程序
- Windows下以Local模式调试SparkStreaming的WordCount例子
- spark程序入门-wordCount详解总结
- linux和windows下如何知道端口是被那个程序监听占用?
- 在windows本地编写spark的wordcount
- Spark实战----(1)使用Scala开发本地测试的Spark WordCount程序
- Spark组件之Spark Streaming学习5--WindowsWordCount学习
- spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
- SparkStreaming---WordCount程序
- 监听获取程序退出事件(Linux、Windows、Java、C++)
- spark入门之wordcount
- windows下的端口监听、程序端口查找命令