您的位置:首页 > 其它

SparkStream:5)Spark streaming+kafka整合实战

2016-11-22 20:35 316 查看
Spark streaming+kafka整合实战

kafka的部署请参考:http://blog.csdn.net/jiangpeng59/article/details/53241693

本文主要是参加Spark提供的官网文档进行实际操作,文档地址:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html

测试环境:Spark1.6.0 + Kafka0.9.0 + zookeeper3.4.3

1.使用maven添加spark-streaming-kafka_2.10的jar包依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>

2.使用一个测试程序往kafka的服务器输送数据

import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

object KafkaWordCountProducer {
def main(args: Array[String]) {
val topic = "test"
val brokers = "slave01:9092,slave02:9092,slave03:9092"
val messagesPerSec=1 //每秒发送几条信息
val wordsPerMessage =4 //一条信息包括多少个单词
// Zookeeper connection properties
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)
// Send some messages
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
println(message)
}
Thread.sleep(1000)
}
}
}
简单解释下上面的代码,每秒发生1条信息到Kafka的服务器端,其消息的内容格式如下

ProducerRecord(topic=test, partition=null, key=null, value=4 6 1 1
ProducerRecord(topic=test, partition=null, key=null, value=7 1 5 0
value字段就是我们所需要的东东

3.开启kafka接受信息进程

在执行上面的代码之前,先开启一个kafka进程来接受从Produce(上面的代码)传递来的信息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
执行后,其会一直监听这个是否有数据发送到名为test的topic下,有消息则会接受,并保存打印出来

[root@slave02 kafka_2.10-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
8 5 6 0
5 4 0 8
9 0 9 0
5 1 7 0
7 4 4 3
2 7 6 8


4.SparkStream消费端
在官方整合文档中,也介绍目前StreamSpark从Kafka拉取数据的方式有两种,一种是基于Receiver的,一种是直接访问不依赖于Receiver的createDirectStream。官方介绍了第二种方法的几个优势,而且Spark2.0是默认使用第二种方法的。

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
*   <brokers> is a list of one or more Kafka brokers
*   <topics> is a list of one or more kafka topics to consume from
*
* Example:
*    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
*    topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
|  <brokers> is a list of one or more Kafka brokers
|  <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}

val Array(brokers, topics) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}


上面的代码,对从kafka中传递过来的word(数字)进行了简单的单词统计

把上面的文件在IDEA中打包,把spark-streaming-kafka_2.10的相关jar包整合到项目输出的jar包中,虽然文件大了些,但是省了不少麻烦



在Spark集群中提交这个jar包(先运行2、3步骤)

spark-submit --master=spark://master:7077 \
--class DirectKafkaWordCount \
./Spark1.6.0.jar \
slave01:9092,slave02:9092,slave03:9092 test
可以看到如下结果

16/11/22 18:46:56 INFO scheduler.DAGScheduler: Job 301 finished: print at DirectKafkaWordCount.scala:65, took 0.015078 s
-------------------------------------------
Time: 1479811616000 ms
-------------------------------------------
(8,1)
(9,1)
(3,1)
(1,1)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: