Spark Streaming + Flume整合官网文档阅读及运行示例
2015-09-23 15:56
477 查看
1,基于Flume的Push模式(Flume-style Push-based Approach)
Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.
1),需求
从集群中选择一台机器,
当Flume+Spark Streaming程序运行时,需要保证Spark的一个worker运行在同一台机器上.
Flume可以通过配置文件指定推送到该台机器的一个端口.
因为在push模式中,streaming程序在运行时,Flume需要通过配置的端口号监听该机器上的receiver,这样Flume才能进行数据推送
2)配置Flume
配置Flume agent用于发送数据到一个Avro sink,使用如下配置文件:
可以参考 Flume’s
documentation 获得更多的配置信息
3)配置Spark Streaming程序
a)连接:在SBT/Maven项目中定义,通过以下配置来配置运行环境
b)编程:在程序中import FlumeUtils,创建DStream
更多可以参考API
docs 和 example.
需要注意的是:选择机器的ip必须与集群中resource manager中某一台机器一致.这样资源分配(resource allocation)才能匹配到这台机器,并且启动receiver.
c)发布(运行)程序:将spark-streaming-flume_2.10以及相关依赖(除spark-core_2.10和spark-streaming_2.10之外)打包到应用程序的jar包中.然后使用spark-submit来启动应用程序(具体可以参考Deploying
section)
4)示例
(a)Spark程序编写
完整程序如下:
启动flume:/usr/local/flume140/jobconf# ../bin/flume-ng agent --conf ../conf/ --conf-file ./spark_avro.conf --name a1 -Dflume.root.logger=INFO,console
(d)结果,此时观察Spark程序的输出结果如下:
可以看到,每隔一定的间隔,就有一个Received 100 flume events的信息输出,表示spark streaming程序已从flume获取到数据.可以将Spark Streaming应用程序按照实际业务需求进行修改.
2,基于Custom Sink的Pull模式(Pull-based
Approach using a Custom Sink)
不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink:
Flume将数据推送到sink中,并且数据保持buffered状态
Spark Streaming使用一个可靠的Flume接收器(reliable Flume receiver )和转换器(transaction)从sink拉取数据.只要当数据被接收并且被Spark
Streaming备份后,转换器才运行成功.
这样,与第一种模式相比,保证了很好的健壮性和容错能力( fault-tolerance
guarantees ).然而,这种模式需要为Flume配置一个正常的sink.以下为配置步骤
1)基本要求
选择一台运行在一个Flume agent中的普通sink节点的机器.Flume其他的pipeline配置成向该agent发送数据.Spark集群中的机器应该可以访问到选为sink节点的那台机器
2)配置Flume
Flume中sink节点的配置
(a)Sink JARS:将如下JAR包添加到Flume中被选为普通sink节点的classpath中,修改FLUME_HOME/conf/flune_env.sh文件中的FLUME_CLASSPATH=""配置信息,将以下三个jar包路径写入,或者将以下三个jar包复制到FLUME_HOME/lib/目录下
包括三种类型的JAR包
(i) 普通的sink JAR包(下载地址 direct
link):
(ii) Scala运行相关JAR包(下载地址 direct
link).:
(iii)Lang 3JAR包(下载地址 direct
link):
(b)配置Flume conf文件
使用如下配置参数配置Flume agent用于发送数据到一个Avro sink
同样需要保证Flume upstream pipline已经配置好向运行上述sink的agent发送数据.更多Flume的配置信息可以在Flume’s
documentation 中查看.
3)编写Spark Streaming应用程序
(a)连接
在SBT/Maven项目定义中,需要引入spark-streaming-flume_2.10相关jar包.可以参考之前的配置
(b)编程
导入FLumeUtils创建一个flumeStream
可以查看示例程序FlumePollingEventCount.
注意:每个input Dstream可以接收从多个sinks中的数据
(c)发布(运行)
将spark-streaming-flume_2.10以及相关依赖(除spark-core_2.10和spark-streaming_2.10之外)打包到应用程序的jar包中.然后使用spark-submit来启动应用程序(具体可以参考Deploying
section)
4)示例
(a)Spark程序:
可以看到与之前一个相比,只是把val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)改成了val stream = FlumeUtils.createPollingStream(ssc,
host, port, StorageLevel.MEMORY_ONLY_SER_2)
(b)配置Flume
(c)运行
需要先启动flume,再启动spark程序
启动flume:/usr/local/flume140/jobconf# ../bin/flume-ng agent --conf ../conf/ --conf-file ./spark_flume_log_pull.conf --name a1 -Dflume.root.logger=INFO,console
(d)结果
比较启动顺序:
在push模式中,先启动spark application,进入等待状态,等待flume push数据,此时启动flume进行数据的传递.
在pull模式中,spark application会从配置的端口pull数据,此时若flume还未启动,spark application会提示端口连接失败.所以需要先启动flume后启动spark application
Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.
1),需求
从集群中选择一台机器,
当Flume+Spark Streaming程序运行时,需要保证Spark的一个worker运行在同一台机器上.
Flume可以通过配置文件指定推送到该台机器的一个端口.
因为在push模式中,streaming程序在运行时,Flume需要通过配置的端口号监听该机器上的receiver,这样Flume才能进行数据推送
2)配置Flume
配置Flume agent用于发送数据到一个Avro sink,使用如下配置文件:
agent.sinks = avroSink agent.sinks.avroSink.type = avro agent.sinks.avroSink.channel = memoryChannel agent.sinks.avroSink.hostname = <选择机器的ip地址> agent.sinks.avroSink.port = <选择机器的端口号> |
documentation 获得更多的配置信息
3)配置Spark Streaming程序
a)连接:在SBT/Maven项目中定义,通过以下配置来配置运行环境
groupId = org.apache.spark artifactId = spark-streaming-flume_2.10 version = 1.5.0 |
import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [选择的机器ip], [选择的机器端口号]) |
docs 和 example.
需要注意的是:选择机器的ip必须与集群中resource manager中某一台机器一致.这样资源分配(resource allocation)才能匹配到这台机器,并且启动receiver.
c)发布(运行)程序:将spark-streaming-flume_2.10以及相关依赖(除spark-core_2.10和spark-streaming_2.10之外)打包到应用程序的jar包中.然后使用spark-submit来启动应用程序(具体可以参考Deploying
section)
4)示例
(a)Spark程序编写
完整程序如下:
object FlumeLog { def main(args: Array[String]) { StreamingExamples.setStreamingLogLevels() val host = "localhost" val port = 19999 val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } } |
用于统计flume的event事件. (b)编写flume的配置文件
a1.channels = c1 a1.sinks = k1 a1.sources = r1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 19999 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/file/bigdatatest/datalake/SougouQ.data a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 |
其中的sinks按照文档中的进行配置.sources用于从日志文件SougouQ.data中读取数据.SougouQ.data中的数据动态生成. (c)运行 首先启动Spark Streaming程序,可以看到如下输出信息:
启动flume:/usr/local/flume140/jobconf# ../bin/flume-ng agent --conf ../conf/ --conf-file ./spark_avro.conf --name a1 -Dflume.root.logger=INFO,console
(d)结果,此时观察Spark程序的输出结果如下:
可以看到,每隔一定的间隔,就有一个Received 100 flume events的信息输出,表示spark streaming程序已从flume获取到数据.可以将Spark Streaming应用程序按照实际业务需求进行修改.
2,基于Custom Sink的Pull模式(Pull-based
Approach using a Custom Sink)
不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink:
Flume将数据推送到sink中,并且数据保持buffered状态
Spark Streaming使用一个可靠的Flume接收器(reliable Flume receiver )和转换器(transaction)从sink拉取数据.只要当数据被接收并且被Spark
Streaming备份后,转换器才运行成功.
这样,与第一种模式相比,保证了很好的健壮性和容错能力( fault-tolerance
guarantees ).然而,这种模式需要为Flume配置一个正常的sink.以下为配置步骤
1)基本要求
选择一台运行在一个Flume agent中的普通sink节点的机器.Flume其他的pipeline配置成向该agent发送数据.Spark集群中的机器应该可以访问到选为sink节点的那台机器
2)配置Flume
Flume中sink节点的配置
(a)Sink JARS:将如下JAR包添加到Flume中被选为普通sink节点的classpath中,修改FLUME_HOME/conf/flune_env.sh文件中的FLUME_CLASSPATH=""配置信息,将以下三个jar包路径写入,或者将以下三个jar包复制到FLUME_HOME/lib/目录下
包括三种类型的JAR包
(i) 普通的sink JAR包(下载地址 direct
link):
groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.10 version = 1.5.0 |
link).:
groupId = org.scala-lang artifactId = scala-library version = 2.10.4 |
link):
groupId = org.apache.commons artifactId = commons-lang3 version = 3.3.2 |
使用如下配置参数配置Flume agent用于发送数据到一个Avro sink
agent.sinks = spark agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink agent.sinks.spark.hostname = <hostname of the local machine> agent.sinks.spark.port = <port to listen on for connection from Spark> agent.sinks.spark.channel = memoryChannel |
documentation 中查看.
3)编写Spark Streaming应用程序
(a)连接
在SBT/Maven项目定义中,需要引入spark-streaming-flume_2.10相关jar包.可以参考之前的配置
(b)编程
导入FLumeUtils创建一个flumeStream
import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]) |
注意:每个input Dstream可以接收从多个sinks中的数据
(c)发布(运行)
将spark-streaming-flume_2.10以及相关依赖(除spark-core_2.10和spark-streaming_2.10之外)打包到应用程序的jar包中.然后使用spark-submit来启动应用程序(具体可以参考Deploying
section)
4)示例
(a)Spark程序:
object FlumeLogPull { def main(args: Array[String]) { StreamingExamples.setStreamingLogLevels() val host = "localhost" val port = 19999 val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createPollingStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } } |
host, port, StorageLevel.MEMORY_ONLY_SER_2)
(b)配置Flume
a1.channels = c1 a1.sinks = k1 a1.sources = r1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 19999 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/file/bigdatatest/datalake/SougouQ.data a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 100000 |
需要先启动flume,再启动spark程序
启动flume:/usr/local/flume140/jobconf# ../bin/flume-ng agent --conf ../conf/ --conf-file ./spark_flume_log_pull.conf --name a1 -Dflume.root.logger=INFO,console
(d)结果
比较启动顺序:
在push模式中,先启动spark application,进入等待状态,等待flume push数据,此时启动flume进行数据的传递.
在pull模式中,spark application会从配置的端口pull数据,此时若flume还未启动,spark application会提示端口连接失败.所以需要先启动flume后启动spark application
相关文章推荐
- 使用etalage插件快速做出jquery放大镜效果
- mysql-5.6.x-win32免安装版配置
- redis的持久化
- test5.21
- PAT(甲级)1003
- 初学HBase的几个问题
- hdu 1576 A/B||hdu 3049 Data Processing 乘法逆元
- Java IO流
- python的编码问题
- C4.5决策树--Java
- 剪花布条---hdu2087(kmp模板)
- Hibernate的3种继承映射策略
- SQL语句 with as 用法
- Fastdfs安装前准备组件安装
- shell脚本助你在linux下实时查看单网卡流量
- UVALive 6930 Wheels(bfs)
- Java Session
- php 限制 上传文件大小
- C#中调用OCX控件
- atitit。 hb Hibernate sql 查询使用