Spark Streaming 3:转换操作
2017-04-19 16:45
260 查看
1.6.2 spark streaming programming guide http://spark.apache.org/docs/1.6.2/streaming-programming-guide.html
DStreams转换操作
Transformations on DStreams
与rdd类似,DStream也有许多转换操作,常用的如下
transform(func)
可以对DStream中的rdd进行操作
updateStateByKey(func)
返回一个新的DStream。根据给定的func更新之前批次状态的结果,实现sparkstreaming计算结果的跨批次更新
案例:wordcount中实现跨批次计数
#encoding=utf8
"""SimpleApp"""
from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,Row
from pyspark.streaming import StreamingContext
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# test upddateStateByKey function
def updateFunc(newValues,states):
return sum(newValues) + (states or 0)
sc = SparkContext("local[2]","streamApp")
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc,30)
ssc.checkpoint('file:///input/checkpoint')
lines = ssc.textFileStream("file:///input/flume").flatMap(lambda line:line.split(',')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
output = lines.updateStateByKey(updateFunc)
output.pprint()
ssc.start()
ssc.awaitTermination()
DStreams转换操作
Transformations on DStreams
与rdd类似,DStream也有许多转换操作,常用的如下
Transformations | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasksargument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
可以对DStream中的rdd进行操作
updateStateByKey(func)
返回一个新的DStream。根据给定的func更新之前批次状态的结果,实现sparkstreaming计算结果的跨批次更新
案例:wordcount中实现跨批次计数
#encoding=utf8
"""SimpleApp"""
from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext,Row
from pyspark.streaming import StreamingContext
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# test upddateStateByKey function
def updateFunc(newValues,states):
return sum(newValues) + (states or 0)
sc = SparkContext("local[2]","streamApp")
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc,30)
ssc.checkpoint('file:///input/checkpoint')
lines = ssc.textFileStream("file:///input/flume").flatMap(lambda line:line.split(',')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
output = lines.updateStateByKey(updateFunc)
output.pprint()
ssc.start()
ssc.awaitTermination()
相关文章推荐
- <<Spark Streaming Programming Guide>> - Part 3 转换操作
- [spark]Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark Streaming中的操作函数分析
- Scala中隐式转换内幕操作规则揭秘、最佳实践及其在Spark中的应用源码解析之Scala学习笔记-55
- Spark算子:RDD基本转换操作(4)–union、intersection、subtract
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD基本转换操作(6)–zip、zipPartitions
- Spark Streaming的窗口操作
- 第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密
- 第95课:通过Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- Spark算子:RDD基本转换操作(5)–mapPartitions、
- Spark算子:RDD基本转换操作(mapPartitions、mapPartitionsWithIndex)
- [2.3]Spark DataFrame操作(二)之通过编程动态完成RDD与DataFrame的转换
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex