Flume推送数据到SparkStreaming案例实战和内幕源码解密
2016-08-20 17:14
393 查看
本博文内容主要包含以下几个方面:
Flume on HDFS案例回顾
Flume推送数据到Spark Streaming实战
原理绘图剖析
一:Flume on HDFS 案例回顾:
文件配置在上篇博文中已经详细介绍,接下来的操作基于已经成功安装完成Flume:
拷贝conf/flume-conf.properties.template,更名为conf/flume-cong.properties,只写以下内容:
可以发现本地的NOTICE文件更名为NOTICE.COMPLETED。
同时在Web50070界面上可看到Flume把NOTICE文件传送到 HDFS的/library/flume下,说明当Flume指定的源文件夹中有新文件时,Flume会自动将此文件,导入到Flume配置时指定的HDFS文件夹中。
补充说明:
一般正常业务情况下,应该是把Flume的数据放到Kafka中,然后让不同的数据消费者去消费数据。如果要在Flume和Kafka两者间做选择的话,则要看业务中数据是否持续不断的产生。如果是这样,则应该选择Kafka。如果产生的数据时大时小,甚至有些时间没有数据,则没必要用Kafka,可以节省资源。
修改conf/flume-cong.properties文件,改导入到HDFS,变为推送到Spark Streaming。
编写Spark Streaming应用的Java程序:
代码中用到了FlumeUtils。我们剖析一下代码中用到的FlumeUtils。
以上代码中FlumeUtil的方法createStream:
实际是调用以下createStream方法:
可以看到流处理默认的存储方式是,既在内存,又在磁盘中,同时做序列化,而且用两台机器。
继续看调用的createStream方法:
实际上返回的是FlumeInputDStream对象,而且事件是Flume所定义的事件SparkFlumeEvent。所以要注意,在以上Java代码做flatMap时,FlatMapFunction的输入类型必须是SparkFlumeEvent类型。
再看看FlumeInputDStream的代码:
可以看到getReceiver返回的是用于接收数据的FlumeReceiver对象。再看FlumeReceiver:
把以上的应用Spark Streaming的Java程序运行起来。确认Flume也在运行。
我们找若干文件拷入TestDir文件夹,比如:flume下的若干文本文件。那么在Java运行的控制台,可以发现以下信息:
说明Flume推送数据到了Spark Streaming,Spark Streaming对数据及时进行了处理。
补充说明:
使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。
博文内容源自DT大数据梦工厂Spark课程。相关课程内容视频可以参考:
百度网盘链接:http://pan.baidu.com/s/1slvODe1(如果链接失效或需要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246 获取上述资料)。
Flume on HDFS案例回顾
Flume推送数据到Spark Streaming实战
原理绘图剖析
一:Flume on HDFS 案例回顾:
文件配置在上篇博文中已经详细介绍,接下来的操作基于已经成功安装完成Flume:
拷贝conf/flume-conf.properties.template,更名为conf/flume-cong.properties,只写以下内容:
#agent1表示代理名称 agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 #配置source1 agent1.sources.source1.type=spooldir agent1.sources.source1.spoolDir=/usr/local/flume/tmp/TestDir agent1.sources.source1.channels=channel1 agent1.sources.source1.fileHeader = false agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = timestamp #配置sink1 agent1.sinks.sink1.type=hdfs agent1.sinks.sink1.hdfs.path=hdfs://master:9000/library/flume agent1.sinks.sink1.hdfs.fileType=DataStream agent1.sinks.sink1.hdfs.writeFormat=TEXT agent1.sinks.sink1.hdfs.rollInterval=1 agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d #agent1.sinks.sink1.type=avro #agent1.sinks.sink1.channel=channel1 #agent1.sinks.sink1.hostname=Master #agent1.sinks.sink1.port=9999 #配置channel1 agent1.channels.channel1.type=file agent1.channels.channel1.checkpointDir=/usr/local/flume/tmp/checkpointDir agent1.channels.channel1.dataDirs=/usr/local/flume/tmp/dataDirs flume-env.sh配置: # export JAVA_HOME=/usr/lib/jvm/java-6-sun export JAVA_HOME=/usr/local/jdk/jdk1.8.0_60 # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote" 建立文件夹 /usr/local/flume/tmp/TestDir。 在hdfs上建立/library/flume文件夹。 flume的bin文件夹下启动Flume: ./flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console 在/usr/local/flume/tmp/TestDir下,拷入测试用的文件,比如:NOTICE flume 控制台会有一些相关信息: 16/04/22 11:03:49 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/NOTICE to /usr/local/flume/tmp/TestDir/NOTICE.COMPLETED 16/04/22 11:03:51 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 16/04/22 11:03:51 INFO hdfs.BucketWriter: Creating hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp 16/04/22 11:03:52 INFO hdfs.BucketWriter: Closing hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp 16/04/22 11:03:52 INFO hdfs.BucketWriter: Renaming hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp to hdfs://master:9000/library/flume/2016-04-22.1461294231806
可以发现本地的NOTICE文件更名为NOTICE.COMPLETED。
同时在Web50070界面上可看到Flume把NOTICE文件传送到 HDFS的/library/flume下,说明当Flume指定的源文件夹中有新文件时,Flume会自动将此文件,导入到Flume配置时指定的HDFS文件夹中。
补充说明:
一般正常业务情况下,应该是把Flume的数据放到Kafka中,然后让不同的数据消费者去消费数据。如果要在Flume和Kafka两者间做选择的话,则要看业务中数据是否持续不断的产生。如果是这样,则应该选择Kafka。如果产生的数据时大时小,甚至有些时间没有数据,则没必要用Kafka,可以节省资源。
二, Flume推送数据到Spark Streaming实战
现在我们不把Flume的数据导入到HDFS中,而是把数据推送到Spark Streaming中。修改conf/flume-cong.properties文件,改导入到HDFS,变为推送到Spark Streaming。
#配置sink1 #agent1.sinks.sink1.type=hdfs #agent1.sinks.sink1.hdfs.path=hdfs://master:9000/library/flume #agent1.sinks.sink1.hdfs.fileType=DataStream #agent1.sinks.sink1.hdfs.writeFormat=TEXT #agent1.sinks.sink1.hdfs.rollInterval=1 #agent1.sinks.sink1.channel=channel1 #agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d agent1.sinks.sink1.type=avro agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.hostname=Master agent1.sinks.sink1.port=9999
编写Spark Streaming应用的Java程序:
public class FlumePushData2SparkStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("FlumePushDate2SparkStreaming"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30)); JavaReceiverInputDStream lines = FlumeUtils.createStream(jsc,"Master", 9999); // 注意此处输入的event类型是SparkFlumeEvent类型。 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() { @Override public Iterable<String> call(SparkFlumeEvent event) throws Exception { String line = new String(event.event().getBody().array()); return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
代码中用到了FlumeUtils。我们剖析一下代码中用到的FlumeUtils。
以上代码中FlumeUtil的方法createStream:
实际是调用以下createStream方法:
可以看到流处理默认的存储方式是,既在内存,又在磁盘中,同时做序列化,而且用两台机器。
继续看调用的createStream方法:
实际上返回的是FlumeInputDStream对象,而且事件是Flume所定义的事件SparkFlumeEvent。所以要注意,在以上Java代码做flatMap时,FlatMapFunction的输入类型必须是SparkFlumeEvent类型。
再看看FlumeInputDStream的代码:
可以看到getReceiver返回的是用于接收数据的FlumeReceiver对象。再看FlumeReceiver:
把以上的应用Spark Streaming的Java程序运行起来。确认Flume也在运行。
我们找若干文件拷入TestDir文件夹,比如:flume下的若干文本文件。那么在Java运行的控制台,可以发现以下信息:
说明Flume推送数据到了Spark Streaming,Spark Streaming对数据及时进行了处理。
三、 原理绘图剖析
补充说明:
使用Spark Streaming可以处理各种数据来源类型,如:数据库、HDFS,服务器log日志、网络流,其强大超越了你想象不到的场景,只是很多时候大家不会用,其真正原因是对Spark、spark streaming本身不了解。
博文内容源自DT大数据梦工厂Spark课程。相关课程内容视频可以参考:
百度网盘链接:http://pan.baidu.com/s/1slvODe1(如果链接失效或需要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246 获取上述资料)。
相关文章推荐
- 大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密
- 第87课:Flume推送数据到SparkStreaming案例实战和内幕源码解密--flume安装篇
- 第87讲:Flume推送数据到SparkStreaming案例实战和内幕源码解密
- 第87课:Flume推送数据到SparkStreaming案例实战和内幕源码解密
- 第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 第88课:SparkStreaming从Flume Poll数据案例实战和内部源码解密
- 基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark Streaming从Flume Poll数据案例实战和内幕源码解密
- 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密
- IMF传奇行动第85课:Spark Streaming第四课:基于HDFS的Spark Streaming案例实战和内幕源码解密
- Spark Streaming updateStateByKey案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第93课:SparkStreaming updateStateByKey案例实战和内置源码解密
- 第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密