您的位置:首页 > 运维架构 > Linux

Spark Streaming 1:入门程序windows或linux下监听端口或本地目录的wordcount

2017-04-15 21:03 561 查看
Spark Streaming Programming Guide  1.6.2官方指导

http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html




Spark Streaming可以监听本地文件、HDFS、端口、flume、kafka等。

 Linux下监听端口9999实现wordcount:

1.代码实现

配置SparkContext时,需要‘local[2]’,因为需要两个线程,一个端口监听一个计算

每十秒进行一次计算

#------------------------------word count-----------------------------------------------------------
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 10 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 10)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
wordCounts.saveAsTextFiles("/input/","txt")

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate


2. 获取数据源

打开一个命令终端,输入 nc -lk 9999 , 然后在输入东西,sparkstreaming就会每隔10秒进行一次计算

 监听本地目录实现wordcount:

1. 代码实现

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("NetworkWordCount")
ssc = StreamingContext(sc, 10)

# 添加需要监听的本地路径
lines=ssc.textFileStream("file:///input/flume/source")

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
#wordCounts.pprint()
wordCounts.saveAsTextFiles("/input/","txt")

ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate

2. 获取数据源
在linux中使用 cp命令向目录直接添加文件。手动复制不能监听,不知道是什么原因

在windows下,直接复制也是不能监听

可以利用代码向目录中写入文件,win和linux都可行

import time
import os

path = r'file:///input/flume/source'

for name in os.listdir(path):
file = path+'/'+name
os.remove(file)

for i in range(5):
#每隔10秒 写入一次
time.sleep(10)
file = open( path + '/data' +str(i) +r'.txt', 'w')
file.write('word,count\n\
hello,word\n\
666,666')
print i
file.close()

参考:
http://blog.csdn.net/jianghuxiaojin/article/details/51452593

http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息