您的位置:首页 > 其它

Spark学习七:spark streaming与flume集成

2016-05-09 00:31 288 查看

Spark学习七:spark streaming与flume集成

标签(空格分隔): Spark

一,启动flume

flume-conf.properties文件

agent002.sources = sources002
agent002.channels = channels002
agent002.sinks = sinks002

## define sources
agent002.sources.sources002.type = exec
agent002.sources.sources002.command = tail -F /opt/app/apache-flume-1.5.0-bin/monitor/log.input

## define channels
agent002.channels.channels002.type = memory
agent002.channels.channels002.capacity = 10000
agent002.channels.channels002.transactionCapacity = 10000
agent002.channels.channels002.byteCapacityBufferPercentage = 20
agent002.channels.channels002.byteCapacity = 800000

##define sinks
agent002.sinks.sinks002.type = avro
agent002.sinks.sinks002.hostname=study.com.cn
agent002.sinks.sinks002.port=9999

##relationship
agent002.sources.sources002.channels = channels002
agent002.sinks.sinks002.channel = channels002


bin/flume-ng agent --conf conf --name agent002 --conf-file conf/flume-conf.properties -Dflume.root.logger=INFO,console


二,spark stream开发和运行

1,pom.xml添加依赖的配置

groupId = org.apache.spark
artifactId = spark-streaming-flume_2.10
version = 1.3.0


2,准备需要的jar包



3,启动spark本地应用模式(添加响应的jar包)

bin/spark-shell \
--jars /opt/app/spark-1.3.0-bin-2.5.0/externaljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/app/spark-1.3.0-bin-2.5.0/externaljars/flume-avro-source-1.5.0.jar,/opt/app/spark-1.3.0-bin-2.5.0/externaljars/flume-ng-sdk-1.5.0.jar \
--master local[2]


4,flume001.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._

val ssc = new StreamingContext(sc, Seconds(5))

val stream = FlumeUtils.createStream(ssc, "study.com.cn", 9999)

// val eventsCount = stream.count.map(cnt => "Recevied " + cnt + " flume events.")
// eventsCount.print()
val wordCountStream = stream.map(x => new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountStream.print()

ssc.start()
ssc.awaitTermination()


5,执行应用

scala > :load /opt/app/spark-1.3.0-bin-2.5.0/test/flume001.scala


6,测试

echo "hadoop hive spark" >>log.input
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: