您的位置:首页 > 其它

spark streaming消费kafka数据

2017-12-15 00:00 344 查看
摘要: spark streaming消费kafka数据

一 环境
CDH 5.8.2
jdk8
scala2.10.6
eclipse scala IDE for linux
spark2.X(具体版本不记得)

zookeeper
hadoop1:2181,
hadoop2:2181,
hadoop3:2181

kafka broker list
hadoop2:9095,
hadoop3:9095,
hadoop4:9095
注:我这边用了两套IP,配了listener用于外网访问所以自己用9095端口,稍后会在另一篇帖子里面贴出怎么配置;

二 kafka
(1)创建kafka命令
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181
--create --topic mykafka01 --replication-factor 3 --partitions 3

(2)查看kafka topic
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --list
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --describe mykafka01

(3)kafka 生产者控制台
kafka-console-producer --topic mykafka01 --broker-list hadoop2:9095,hadoop3:9095,hadoop4:9095

(4)kafka 消费者控制台
kafka-console-consumer --zookeeperhadoop3:2181,hadoop2:2181,hadoop1:2181 --topic
mykafka01 --from-beginning

(5)kafka producer 代码
import java.util.Properties
import scala.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random
import org.json.JSONObject

object KafkaProducer Test{

private val users = Array(
"lin", "jack",
"cai", "han",
"zhao", "wang",
"li", "zhou",
"chu", "xu")
private val os = Array("IOS","Android","YunOS")

private val random = new Random()

private var offset = 0

def getOS() : String = {
offset = offset + random.nextInt(3)
if(offset >= os.length) {
offset = 0
os(offset)
} else {
os(offset)
}
}

def getUserID() : String = {
offset = offset + random.nextInt(10)
if(offset >= users.length) {
offset = 0
users(offset)
} else {
users(offset)
}
}

def times() : Double = {
random.nextInt(5)
}

def main(args: Array[String]): Unit = {
val topic = "mykafka01"
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)

while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os", getOS)

7fe0
.put("times", times)

// produce event message
producer.send(new KeyedMessage[String, String](topic,"key" ,event.toString))
println(event)

Thread.sleep(500)
}
}
}

三 spark streaming代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder

object MyTest {
def main(args: Array[String]): Unit = {

// 创建spark application
val conf = new SparkConf().setMaster("local[2]").setAppName("hello")
val ssc = new StreamingContext(conf, Seconds(5))

val topics = Set("mykafka01")//kafka topics set
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
//kafka 参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder"//kafka 序列化类
)

// 创建streams
val kafkaStreamRDDs = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)

//打印每行值
kafkaStreamRDDs.foreachRDD(rdd =>
rdd.foreach(line =>
print(line)
)
)

ssc.start()
ssc.awaitTermination()

}
}

四 dependencies

<dependency><!-- Spark Streaming Kafka -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20170516</version>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: