Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用
2016-10-02 01:10
525 查看
前言
在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。
本文的目标是写一个基于kafka的scala工程,在一个spark standalone的集群环境中运行。
项目结构和文件说明
说明
这个工程包含了两个应用。一个Consumer应用:CusomerApp - 实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。
一个Producer应用:ProducerApp - 实现了向Kafka集群发消息的功能。
文件结构
KafkaSampleApp # 项目目录 |-- build.bat # build文件 |-- src |-- main |-- scala |-- ConsumerApp.scala # Consumer应用 |-- ProducerApp.scala # Producer应用
构建工程目录
可以运行:mkdir KafkaSampleApp mkdir -p /KafkaSampleApp/src/main/scala
代码
build.sbt
name := "kafka-sample-app" version := "1.0" scalaVersion := "2.11.8" scalacOptions += "-feature" scalacOptions += "-deprecation" scalacOptions += "-language:postfixOps" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.0", "org.apache.spark" %% "spark-streaming" % "2.0.0", "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0", "org.apache.kafka" %% "kafka" % "0.8.2.1" )
CusomerApp.scala
这个例子中使用了Spark自带的Stream+Kafka结合的技术,有个限制的绑定了kafka的8.x版本。我个人建议只用Kafka的技术,写一个Consomer,或者使用其自带的Consumer,来接受消息。
然后再使用Spark的技术。
这样可以跳过对kafak版本的限制。
import java.util.Properties import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object ConsumerApp { def main(args: Array[String]) { val brokers = "localhost:9092" val topics = "test-topic" // Create context with 10 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) println("============== Start ==============") wordCounts.print println("============== End ==============") // Start the computation ssc.start() ssc.awaitTermination() } }
ProducerApp.scala
import java.util.Arrays import java.util.List import java.util.Properties import org.apache.kafka.clients.producer._ object ProducerApp { def main(args: Array[String]): Unit = { val props = new Properties() // Must-have properties props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // Optional properties props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none") props.put(ProducerConfig.SEND_BUFFER_CONFIG, (1024*100).toString) props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, (100).toString) props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, (5*60*1000L).toString) //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, (60*1000l).toString) props.put(ProducerConfig.ACKS_CONFIG, (0).toString) //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, (1500).toString) props.put(ProducerConfig.RETRIES_CONFIG, (3).toString) props.put(ProducerConfig.LINGER_MS_CONFIG, (1000).toString) props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (32 * 1024 * 1024L).toString) props.put(ProducerConfig.BATCH_SIZE_CONFIG, (200).toString) props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-app-producer") val producer = new KafkaProducer[String, String](props) // Thread hook to close produer Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { producer.close() } }) // send 10 messages var i = 0 for( i <- (1 to 10)) { val data = new ProducerRecord[String, String]("test-topic", "test-key", s"test-message $i") producer.send(data) } // Reduce package lost Thread.sleep(1000) producer.close() } }
构建工程
进入目录KafkaSampleApp。运行:sbt package
第一次运行时间会比较长。
测试应用
启动Kafka服务
# Start zookeeper server gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash' # Wait zookeeper server is started. sleep 8s # Start kafka server gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash' # Wait kafka server is started. sleep 5s
注:使用Ctrl+C可以中断服务。
创建一个topic
# Create a topic $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic # List topics $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
启动Spark服务
启动spark集群master server$SPARK_HOME/sbin/start-master.sh
master服务,默认会使用
7077这个端口。可以通过其日志文件查看实际的端口号。
启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
启动Consumer应用
新起一个终端,来运行:$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --master spark://$(hostname):7077 --class ConsumerApp target/scala-2.11/kafka-sample-app_2.11-1.0.jar
注:如果定义的topic没有create,第一次运行会失败,再运行一次就好了。
如果出现java.lang.NoClassDefFoundError错误,
请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境,
确保kafka的包在Spark中设置好了。
启动Producer应用
java -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp # or # $KAFKA_HOME/bin/kafka-run-class.sh -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
然后:看看Consumer应用是否收到了消息。
总结
建议写一个Kafka的Consumer,然后调用Spark功能,而不是使用Spark的Stream+Kafka的编程方式。好处是可以使用最新版本的Kafka。
Kafka的包中带有一个Sample代码,可以从中学习一些编写程序的方法。
参照
Apache KafkaKafka 8.2 Quickstart
Spark Streaming + Kafka Integration Guide
相关文章推荐
- Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用
- Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用
- Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境
- Scala-IDE Eclipse(Windows)中开发Spark应用程序,在Ubuntu Spark集群上运行
- Spark入门--基于Intellij IDEA开发Spark应用并在集群上运行
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 云计算
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。
- 第94讲, 使用Scala开发集群运行的Spark 实现在线黑名单过滤程序
- eclipse编写scala应用运行在spark集群上
- 开发系列:02、使用Scala和SBT开发Spark应用
- 第95讲:使用Scala开发集群运行的Spark来实现在线热搜索词获取
- maven环境下使用java、scala混合开发spark应用
- python scala kafka 集成一个流程项目 spark
- JDK8+Scala2.11+spark-2.0.0+Intellij2017.3.4开发wordcount程序并在集群中运行
- Spark RDD编程(Python和Scala版本)----Spark中的RDD就是一个不可变的分布式对象集合,是一种具有兼容性的基于内存的集群计算抽象方法,Spark则是这个方法的抽象。 Spa
- Spark架构开发 大数据视频教程 SQL Streaming Scala Akka Hadoop
- 使用Intellij IDEA开发并提交Spark应用到远程Spark集群
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 虚拟化
- 【译】谷歌正在建立一个叫做Spark的Chrome应用开发环境
- 搭建symbian应用开发环境的一个注意点