pyspark-Spark Streaming编程指南
2017-10-17 19:47
435 查看
参考:
1、http://spark.apache.org/docs/latest/streaming-programming-guide.html
2、https://github.com/apache/spark/tree/v2.2.0
A Quick Example
Basic Concepts
Linking
Initializing
StreamingContext
Discretized
Streams (DStreams)
Input
DStreams and Receivers
Transformations
on DStreams
Output
Operations on DStreams
DataFrame
and SQL Operations
MLlib Operations
Caching
/ Persistence
Checkpointing
Accumulators,
Broadcast Variables, and Checkpoints
Deploying
Applications
Monitoring
Applications
Performance
Tuning
Reducing
the Batch Processing Times
Setting
the Right Batch Interval
Memory Tuning
Fault-tolerance
Semantics
Where
to Go from Here
Join 操作
Stream-stream joins
Stream-dataset joins
DataFrame and SQL 操作
1、http://spark.apache.org/docs/latest/streaming-programming-guide.html
2、https://github.com/apache/spark/tree/v2.2.0
Spark Streaming编程指南
OverviewA Quick Example
Basic Concepts
Linking
Initializing
StreamingContext
Discretized
Streams (DStreams)
Input
DStreams and Receivers
Transformations
on DStreams
Output
Operations on DStreams
DataFrame
and SQL Operations
MLlib Operations
Caching
/ Persistence
Checkpointing
Accumulators,
Broadcast Variables, and Checkpoints
Deploying
Applications
Monitoring
Applications
Performance
Tuning
Reducing
the Batch Processing Times
Setting
the Right Batch Interval
Memory Tuning
Fault-tolerance
Semantics
Where
to Go from Here
一个快速的例子
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
初始化StreamingContext
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext(master, appName) ssc = StreamingContext(sc, 1)
基本资料
streamingContext.textFileStream(dataDirectory)
UpdateStateByKey操作
def updateFunction(newValues, runningCount): if runningCount is None: runningCount = 0 return sum(newValues, runningCount) # add the new values with the previous running count to get the new count runningCounts = pairs.updateStateByKey(updateFunction)
变换操作
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information # join data stream with spam information to do data cleaning cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
窗口操作
# Reduce last 30 seconds of data, every 10 seconds windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
Join 操作
Stream-stream joinsstream1 = ... stream2 = ... joinedStream = stream1.join(stream2) windowedStream1 = stream1.window(20) windowedStream2 = stream2.window(60) joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
dataset = ... # some RDD windowedStream = stream.window(20) joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
DStreams的输出操作
print() saveAsTextFiles(prefix, [suffix]) saveAsObjectFiles(prefix, [suffix]) saveAsHadoopFiles(prefix, [suffix]) foreachRDD(func)
使用foreachRDD的设计模式
def sendRecord(rdd): connection = createNewConnection() # executed at the driver rdd.foreach(lambda record: connection.send(record)) connection.close() dstream.foreachRDD(sendRecord) def sendRecord(record): connection = createNewConnection() connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) def sendPartition(iter): connection = createNewConnection() for record in iter: connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition)) def sendPartition(iter): # ConnectionPool is a static, lazily initialized pool of connections connection = ConnectionPool.getConnection() for record in iter: connection.send(record) # return to the pool for future reuse ConnectionPool.returnConnection(connection) dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
DataFrame and SQL 操作
# Lazily instantiated global instance of SparkSession def getSparkSessionInstance(sparkConf): if ("sparkSessionSingletonInstance" not in globals()): globals()["sparkSessionSingletonInstance"] = SparkSession \ .builder \ .config(conf=sparkConf) \ .getOrCreate() return globals()["sparkSessionSingletonInstance"] ... # DataFrame operations inside your streaming program words = ... # DStream of strings def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass words.foreachRDD(process)
如何配置 Checkpointing
# Function to create and setup a new StreamingContext def functionToCreateContext(): sc = SparkContext(...) # new context ssc = StreamingContext(...) lines = ssc.socketTextStream(...) # create DStreams ... ssc.checkpoint(checkpointDirectory) # set checkpoint directory return ssc # Get StreamingContext from checkpoint data or create a new one context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) # Do additional setup on context that needs to be done, # irrespective of whether it is being started or restarted context. ... # Start the context context.start() context.awaitTermination()
Accumulators, Broadcast Variables, and Checkpoints
def getWordBlacklist(sparkContext): if ("wordBlacklist" not in globals()): globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"]) return globals()["wordBlacklist"] def getDroppedWordsCounter(sparkContext): if ("droppedWordsCounter" not in globals()): globals()["droppedWordsCounter"] = sparkContext.accumulator(0) return globals()["droppedWordsCounter"] def echo(time, rdd): # Get or register the blacklist Broadcast blacklist = getWordBlacklist(rdd.context) # Get or register the droppedWordsCounter Accumulator droppedWordsCounter = getDroppedWordsCounter(rdd.context) # Use blacklist to drop words and use droppedWordsCounter to count them def filterFunc(wordCount): if wordCount[0] in blacklist.value: droppedWordsCounter.add(wordCount[1]) False else: True counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) wordCounts.foreachRDD(echo)
数据接收中的并行级别
numStreams = 5 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)] unifiedStream = streamingContext.union(*kafkaStreams) unifiedStream.pprint()
相关文章推荐
- spark官方文档之——Spark Streaming Programming Guid spark streaming编程指南
- Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN
- Spark Streaming-1:Spark Streaming编程指南
- pyspark-Spark编程指南
- Spark Streaming编程指南
- Spark Streaming编程指南
- [置顶] Spark2.1.0文档:Spark Streaming 编程指南(下)-性能调优和容错语义
- Spark Streaming编程指南(部分)
- Spark Streaming编程指南
- Spark Streaming编程指南(三)
- Spark-Streaming编程指南
- Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN
- Spark Streaming编程指南
- Spark Streaming编程指南(四)
- Spark2.1.0文档:Spark Streaming 编程指南(上)
- Spark Streaming编程指南
- #########好####### pyspark-Spark Streaming编程指南
- Spark2.1.0文档:Spark Streaming 编程指南(下)-性能调优和容错语义
- Spark Streaming编程指南(一)
- Spark Streaming编程指南