您的位置:首页 > 大数据

SparkStreaming数据源Flume实际案例分享

2016-05-02 07:56 302 查看
本期内容:

1.Spark Streaming on polling from Flume实战

2.Spark Streaming on polling fromFlume源码

FlumeConnection:分布式连接的Flume实体

I.实战

一.通过Spark Streaming主动从Flume这边获取数据,首先配置Flume-config配置文件

二.编写源代码SparkStreamingPullFlume.java

/*DT大数据梦工厂微信公众号DT_Spark 

*/

package com.dt.spark.SparkApps.SparkStreaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

import org.apache.spark.streaming.flume.FlumeUtils;

import org.apache.spark.streaming.flume.SparkFlumeEvent;

import scala.Tuple2;

public class SparkStreamingPullFlume {

public static void main(String[] args) {

final SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("FlumePushDate2SparkStreaming");

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));

     JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils.createPollingStream(jsc, "Master", 9999);

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {

private static final long serialVersionUID = 1L;

public Iterable<String> call(SparkFlumeEvent arg0) throws Exception {

String line = new String(arg0.event().getBody().array());

return Arrays.asList(line.split(" "));

}

});

JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String arg0) throws Exception {

return new Tuple2<String, Integer>(arg0, 1);

}

});

/*IMF晚8点大数据实战YY直播频道号:68917580*/

JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

wordsCount.print();

jsc.start();

jsc.awaitTermination();

jsc.close();

}

}

三.安装三个jar包到flume的lib目录下

commons-lang3-3.3.2.jar,scala-library-2.10.4.jar,spark-streaming-flume-sink_2.10-1.6.1.jar

 

四.运行eclipse中的程序SparkStreamingPullFlume.java



五.运行flume-ng

./flume-ng agent -n agent1 -c conf -f/usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console

六.将hello.txt拷贝到/usr/local/flume/apache-flume-1.6.0-bin/tmp/TestDir/下面

Hello Spark

Hello Hadoop

Hello Kafka

Hello HDFS

II.源代码分析

一.创建createPollingStre





继承自ReceiverInputDstream中覆写getReciver方法,调用FlumePollingReciver接口



使用lazy和工厂方法创建线程和Nio客户端socket





工作线程从Flume Polling中pull
数据,实质上是从消息队列中获取数据





看run()方法中的receiver.getConnections.poll()中的poll方法



发现dequeue出消息队列:进入dequeue中观察奇妙的现象出来了发现enqueue投递消息队列中我们发现调用enqueue方法的地方



发现FlumePollingInputDStream.ca

[FlumeConnectionfaa





其中   
 sendNack(batchReceived,
client,
seq)
  }
case exception:
Exception =>
  logWarning("Error while receiving data from Flume",
exception)
  sendNack(batchReceived,
client,
seq)
sendNack,sendAck个人理解像tcp三次握手协议一样,传输协议之际数据传输的互相约定,再回头看poll接口其实就是从消息队列中获取要处理的消息



poll:监听数据链接端口,获得数据后放入收入消息的线程的主消息队列中,然后分发到工作线程的消息队列中进行数据提取

新浪微博:http://www.weibo.com/ilovepains
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息