Kafka学习笔记-Flume整合Kafka整合SparkStreaming
2016-06-22 17:45
471 查看
Flume Kafka SparkStreaming环境信息:
apache-flume-1.6.0-bin.tar.gz
kafka_2.11-0.10.0.0.tgz
spark-1.6.1-bin-hadoop2.6.tgz
scala 2.11
1、启动Zookeeper(这里用的是Kafka自带的Zookeeper)
$
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2、启动Kafka并且开启Consumer监听指定的Topic
$
nohup bin/kafka-server-start.sh config/server.properties &
$
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
通过测试发现主题可以自己提前创建好,也可以不创建,Flume会自动创建出主题
$
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
$
bin/kafka-topics.sh --list --zookeeper master:2181
3、编辑配置文件flume-conf.properties
4、启动Flume
$
bin/flume-ng agent -n fk -c conf -f conf/flume-conf.properties -Dflume.root.logger=error,console
5、执行SparkgStreaming官方提供的实例KafkaWordCount
$bin/run-example org.apache.spark.examples.streaming.KafkaWordCount master:2181 test-consumer-group test 1
或
$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCount --master
spark://master:7077 /home/hadoop/software/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar master:2181 test-consumer-group test 1
6、拷贝文件至目标文件夹下,可在第二步窗口、第五步窗口中查看到相关信息
apache-flume-1.6.0-bin.tar.gz
kafka_2.11-0.10.0.0.tgz
spark-1.6.1-bin-hadoop2.6.tgz
scala 2.11
1、启动Zookeeper(这里用的是Kafka自带的Zookeeper)
$
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2、启动Kafka并且开启Consumer监听指定的Topic
$
nohup bin/kafka-server-start.sh config/server.properties &
$
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
通过测试发现主题可以自己提前创建好,也可以不创建,Flume会自动创建出主题
$
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
$
bin/kafka-topics.sh --list --zookeeper master:2181
3、编辑配置文件flume-conf.properties
fk.sources = f1 fk.channels = c1 fk.sinks = k1 fk.sources.f1.type = spooldir fk.sources.f1.spoolDir = /home/hadoop/software/data/ fk.sources.f1.fileHeader = true fk.sources.f1.deletePolicy = immediate fk.sources.f1.batchSize = 1000 fk.sources.f1.channels = c1 fk.sources.f1.deserializer.maxLineLength = 1048576 fk.channels.c1.type = memory fk.channels.c1.capacity = 1000 fk.channels.c1.transactionCapacity = 100 fk.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink fk.sinks.k1.topic = test1 fk.sinks.k1.brokerList = master:9092 fk.sinks.k1.requiredAcks = 1 fk.sinks.k1.batchSize = 20 fk.sinks.k1.channel = c1
4、启动Flume
$
bin/flume-ng agent -n fk -c conf -f conf/flume-conf.properties -Dflume.root.logger=error,console
5、执行SparkgStreaming官方提供的实例KafkaWordCount
package org.apache.spark.examples.streaming import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
$bin/run-example org.apache.spark.examples.streaming.KafkaWordCount master:2181 test-consumer-group test 1
或
$ bin/spark-submit --class org.apache.spark.examples.streaming.KafkaWordCount --master
spark://master:7077 /home/hadoop/software/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar master:2181 test-consumer-group test 1
6、拷贝文件至目标文件夹下,可在第二步窗口、第五步窗口中查看到相关信息
相关文章推荐
- Kafka 之 中级
- Flume环境部署和配置详解及案例大全
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- Play! Akka Flume实现的完整数据收集
- log4j + flume 1.6 集成
- EJB3.0 JBoss的JMS实例
- flume自定义Interceptor
- 使用Flume聚合Tomcat 日志
- #Note# Analyzing Twitter Data with Apache Hadoo...
- RocketMQ Filtersrv详解
- Logstash 与Elasticsearch整合使用示例
- 大数据实验室(大数据基础培训)——Kafka的安装、配置及基础使用
- 大数据实验室(大数据基础培训)——概要
- kafka-manager 的编译和使用(附安装包)
- Kafka+Log4j实现日志集中管理
- apache flume 配置存储在Linux本地服务器
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)