您的位置:首页 > 编程语言

Spark Streaming编程指南

2015-08-25 08:57 375 查看
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。

它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。



它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。



它支持的数据流叫Dstream,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs,下面是一个例子帮助大家理解Dstream。

A Quick Example

// 创建StreamingContext,1秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(1));

// 穿件一个DStream来连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);

// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);

// 输出结果
wordCount.print();

ssc.start();             // 开始
ssc.awaitTermination();  // 计算完毕退出


具体的代码可以访问这个页面:

完整代码

使用下面这句命令来启动Netcat:

$ nc -lk 9999


接着启动example

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999


在Netcat这端输入hello world,看Spark这边的

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


编写代码

首先我们要在SBT或者Maven工程添加以下信息:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating


//需要使用一下数据源的,还要添加相应的依赖
Source    Artifact
Kafka     spark-streaming-kafka_2.10
Flume     spark-streaming-flume_2.10
Twitter     spark-streaming-twitter_2.10
ZeroMQ     spark-streaming-zeromq_2.10
MQTT     spark-streaming-mqtt_2.10


接着就是示例化

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
这是之前的例子对DStream的操作。




数据源

除了sockets之外,我们还可以这样创建Dstream

streamingContext.fileStream(dataDirectory)


这里有3个要点:

(1)dataDirectory下的文件格式都是一样

(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的

(3)一旦文件进去之后就不能再改变

假设我们要创建一个Kafka的Dstream。

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, …)


如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html

Operations

对于Dstream,我们可以进行两种操作,transformations 和 output

Transformations

Transformation                          Meaning
map(func)                        对每一个元素执行func方法
flatMap(func)                    类似map函数,但是可以map到0+个输出
filter(func)                     过滤
repartition(numPartitions)       增加分区,提高并行度
union(otherStream)               合并两个流
count()                    统计元素的个数
reduce(func)                     对RDDs里面的元素进行聚合操作,2个输入参数,1个输出参数
countByValue()                   针对类型统计,当一个Dstream的元素的类型是K的时候,调用它会返回一个新的Dstream,包含<K,Long>键值对,Long是每个K出现的频率。
reduceByKey(func, [numTasks])    对于一个(K, V)类型的Dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也可以指定numTasks
join(otherStream, [numTasks])    把(K, V)和(K, W)的Dstream连接成一个(K, (V, W))的新Dstream
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream连接成一个(K, Seq[V], Seq[W])的新Dstream
transform(func)                  转换操作,把原来的RDD通过func转换成一个新的RDD
updateStateByKey(func)           针对key使用func来更新状态和值,可以将state该为任何值


UpdateStateByKey Operation

使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:

(1)定义状态,这个状态可以是任意的数据类型

(2)定义状态更新函数,从前一个状态更改新的状态

下面展示一个例子:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ...  // add the new values with the previous running count to get the new count
Some(newCount)
}


参考链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: