您的位置:首页 > 运维架构

Kafka学习笔记-Flume整合Kafka整合SparkStreaming

2016-06-22 17:45 471 查看
Flume Kafka SparkStreaming环境信息:
apache-flume-1.6.0-bin.tar.gz
kafka_2.11-0.10.0.0.tgz
spark-1.6.1-bin-hadoop2.6.tgz

scala 2.11

1、启动Zookeeper(这里用的是Kafka自带的Zookeeper)
$
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

2、启动Kafka并且开启Consumer监听指定的Topic
$
nohup bin/kafka-server-start.sh config/server.properties &
$
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
通过测试发现主题可以自己提前创建好,也可以不创建,Flume会自动创建出主题
$
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
$
bin/kafka-topics.sh --list --zookeeper master:2181

3、编辑配置文件flume-conf.properties
fk.sources = f1
fk.channels = c1
fk.sinks = k1

fk.sources.f1.type = spooldir
fk.sources.f1.spoolDir = /home/hadoop/software/data/
fk.sources.f1.fileHeader = true
fk.sources.f1.deletePolicy = immediate
fk.sources.f1.batchSize = 1000
fk.sources.f1.channels = c1
fk.sources.f1.deserializer.maxLineLength = 1048576

fk.channels.c1.type = memory
fk.channels.c1.capacity = 1000
fk.channels.c1.transactionCapacity = 100

fk.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
fk.sinks.k1.topic = test1
fk.sinks.k1.brokerList = master:9092
fk.sinks.k1.requiredAcks = 1
fk.sinks.k1.batchSize = 20
fk.sinks.k1.channel = c1


4、启动Flume
$
bin/flume-ng agent -n fk -c conf -f conf/flume-conf.properties -Dflume.root.logger=error,console

5、执行SparkgStreaming官方提供的实例KafkaWordCount

package org.apache.spark.examples.streaming

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}


$bin/run-example org.apache.spark.examples.streaming.KafkaWordCount master:2181 test-consumer-group test 1

$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCount --master

spark://master:7077 /home/hadoop/software/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar master:2181 test-consumer-group test 1

6、拷贝文件至目标文件夹下,可在第二步窗口、第五步窗口中查看到相关信息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息