Spark Streaming 10 Spark Streaming整合kafka(一)Receiver-based
2018-09-05 09:21
316 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lihaogn/article/details/82414337
1 添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>
2 KafKaReceiverWC.scala
package com.lihaogn.sparkKafka import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark streaming & kafka -> receiver */ object KafKaReceiverWC { def main(args: Array[String]): Unit = { if (args.length != 4) { System.err.println("usage: KafKaReceiverWC <zkQuorum> <group> <topics> <numThreads>") } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // spark straming 对接 kafka val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print() ssc.start() ssc.awaitTermination() } }
3 编译代码
mvn clean package -DskipTests
4 启动zookeeper
zkServer.sh start
5 启动kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
6 创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-streaming-topic
7 启动producer,用来生产消息
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streaming-topic
8 提交spark程序
spark-submit \ --class com.lihaogn.sparkKafka.KafKaReceiverWC \ --master local[2] \ --name KafKaReceiverWC \ --jars /Users/Mac/software/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar \ /Users/Mac/my-lib/Kafka-train-1.0.jar \ localhost:2181 test kafka-streaming-topic 1
9 测试
相关文章推荐
- sparkstreaming整合kafka参数设置,message偏移量写入mysql
- 40:Spark Streaming中KafkaReceiver内幕实现彻底解密
- SparkStreaming整合Kafka
- spark-streaming-kafka-0-8 和 0-10的使用区别
- Maven+Eclipse+SparkStreaming+Kafka整合
- SparkStreaming整合kafka编程
- Spark Streaming和Kafka整合是如何保证数据零丢失
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- Maven+Eclipse+SparkStreaming+Kafka整合
- Kafka+Spark Streaming+Redis实时计算整合实践
- SparkStreaming通过Kafka获取数据(Receiver方式)
- 第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战
- Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- SparkStreaming和Kafka整合
- sparkstreaming和kafka0.10版本整合
- Kafka+Spark Streaming+Redis实时计算整合实践
- Spark Streaming整合kafka(1)
- 整合Kafka到Spark Streaming——代码示例和挑战
- SparkStreaming与Kafka整合遇到的问题及解决方案