您的位置:首页 > 编程语言

pyspark-Spark Streaming编程指南

2017-10-17 19:47 435 查看
参考:

1、http://spark.apache.org/docs/latest/streaming-programming-guide.html

2、https://github.com/apache/spark/tree/v2.2.0

Spark Streaming编程指南

Overview
A Quick Example
Basic Concepts
Linking
Initializing
StreamingContext
Discretized
Streams (DStreams)
Input
DStreams and Receivers
Transformations
on DStreams
Output
Operations on DStreams
DataFrame
and SQL Operations
MLlib Operations
Caching
/ Persistence
Checkpointing
Accumulators,
Broadcast Variables, and Checkpoints
Deploying
Applications
Monitoring
Applications

Performance
Tuning
Reducing
the Batch Processing Times
Setting
the Right Batch Interval
Memory Tuning

Fault-tolerance
Semantics
Where
to Go from Here

一个快速的例子

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate


初始化StreamingContext

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)


基本资料

streamingContext.textFileStream(dataDirectory)


UpdateStateByKey操作

def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

runningCounts = pairs.updateStateByKey(updateFunction)


变换操作

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))


窗口操作

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)


Join 操作

Stream-stream joins
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)


Stream-dataset joins

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))


DStreams的输出操作

print()
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)


使用foreachRDD的设计模式

def sendRecord(rdd):
connection = createNewConnection()  # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()

dstream.foreachRDD(sendRecord)

def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))

def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))


DataFrame and SQL 操作

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())

# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)

# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")

# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass

words.foreachRDD(process)


如何配置 Checkpointing

# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...)  # new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...)  # create DStreams
...
ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()


Accumulators, Broadcast Variables, and Checkpoints

def getWordBlacklist(sparkContext):
if ("wordBlacklist" not in globals()):
globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordBlacklist"]

def getDroppedWordsCounter(sparkContext):
if ("droppedWordsCounter" not in globals()):
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
return globals()["droppedWordsCounter"]

def echo(time, rdd):
# Get or register the blacklist Broadcast
blacklist = getWordBlacklist(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)

# Use blacklist to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in blacklist.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True

counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)


数据接收中的并行级别

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: