spark streaming消费kafka数据
2017-12-15 00:00
344 查看
摘要: spark streaming消费kafka数据
一 环境
CDH 5.8.2
jdk8
scala2.10.6
eclipse scala IDE for linux
spark2.X(具体版本不记得)
zookeeper
hadoop1:2181,
hadoop2:2181,
hadoop3:2181
kafka broker list
hadoop2:9095,
hadoop3:9095,
hadoop4:9095
注:我这边用了两套IP,配了listener用于外网访问所以自己用9095端口,稍后会在另一篇帖子里面贴出怎么配置;
二 kafka
(1)创建kafka命令
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181
--create --topic mykafka01 --replication-factor 3 --partitions 3
(2)查看kafka topic
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --list
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --describe mykafka01
(3)kafka 生产者控制台
kafka-console-producer --topic mykafka01 --broker-list hadoop2:9095,hadoop3:9095,hadoop4:9095
(4)kafka 消费者控制台
kafka-console-consumer --zookeeperhadoop3:2181,hadoop2:2181,hadoop1:2181 --topic
mykafka01 --from-beginning
(5)kafka producer 代码
import java.util.Properties
import scala.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random
import org.json.JSONObject
object KafkaProducer Test{
private val users = Array(
"lin", "jack",
"cai", "han",
"zhao", "wang",
"li", "zhou",
"chu", "xu")
private val os = Array("IOS","Android","YunOS")
private val random = new Random()
private var offset = 0
def getOS() : String = {
offset = offset + random.nextInt(3)
if(offset >= os.length) {
offset = 0
os(offset)
} else {
os(offset)
}
}
def getUserID() : String = {
offset = offset + random.nextInt(10)
if(offset >= users.length) {
offset = 0
users(offset)
} else {
users(offset)
}
}
def times() : Double = {
random.nextInt(5)
}
def main(args: Array[String]): Unit = {
val topic = "mykafka01"
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os", getOS)
7fe0
.put("times", times)
// produce event message
producer.send(new KeyedMessage[String, String](topic,"key" ,event.toString))
println(event)
Thread.sleep(500)
}
}
}
三 spark streaming代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
object MyTest {
def main(args: Array[String]): Unit = {
// 创建spark application
val conf = new SparkConf().setMaster("local[2]").setAppName("hello")
val ssc = new StreamingContext(conf, Seconds(5))
val topics = Set("mykafka01")//kafka topics set
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
//kafka 参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder"//kafka 序列化类
)
// 创建streams
val kafkaStreamRDDs = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//打印每行值
kafkaStreamRDDs.foreachRDD(rdd =>
rdd.foreach(line =>
print(line)
)
)
ssc.start()
ssc.awaitTermination()
}
}
四 dependencies
<dependency><!-- Spark Streaming Kafka -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20170516</version>
</dependency>
一 环境
CDH 5.8.2
jdk8
scala2.10.6
eclipse scala IDE for linux
spark2.X(具体版本不记得)
zookeeper
hadoop1:2181,
hadoop2:2181,
hadoop3:2181
kafka broker list
hadoop2:9095,
hadoop3:9095,
hadoop4:9095
注:我这边用了两套IP,配了listener用于外网访问所以自己用9095端口,稍后会在另一篇帖子里面贴出怎么配置;
二 kafka
(1)创建kafka命令
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181
--create --topic mykafka01 --replication-factor 3 --partitions 3
(2)查看kafka topic
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --list
kafka-topics --zookeeper hadoop3:2181,hadoop2:2181,hadoop1:2181 --describe mykafka01
(3)kafka 生产者控制台
kafka-console-producer --topic mykafka01 --broker-list hadoop2:9095,hadoop3:9095,hadoop4:9095
(4)kafka 消费者控制台
kafka-console-consumer --zookeeperhadoop3:2181,hadoop2:2181,hadoop1:2181 --topic
mykafka01 --from-beginning
(5)kafka producer 代码
import java.util.Properties
import scala.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random
import org.json.JSONObject
object KafkaProducer Test{
private val users = Array(
"lin", "jack",
"cai", "han",
"zhao", "wang",
"li", "zhou",
"chu", "xu")
private val os = Array("IOS","Android","YunOS")
private val random = new Random()
private var offset = 0
def getOS() : String = {
offset = offset + random.nextInt(3)
if(offset >= os.length) {
offset = 0
os(offset)
} else {
os(offset)
}
}
def getUserID() : String = {
offset = offset + random.nextInt(10)
if(offset >= users.length) {
offset = 0
users(offset)
} else {
users(offset)
}
}
def times() : Double = {
random.nextInt(5)
}
def main(args: Array[String]): Unit = {
val topic = "mykafka01"
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os", getOS)
7fe0
.put("times", times)
// produce event message
producer.send(new KeyedMessage[String, String](topic,"key" ,event.toString))
println(event)
Thread.sleep(500)
}
}
}
三 spark streaming代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
object MyTest {
def main(args: Array[String]): Unit = {
// 创建spark application
val conf = new SparkConf().setMaster("local[2]").setAppName("hello")
val ssc = new StreamingContext(conf, Seconds(5))
val topics = Set("mykafka01")//kafka topics set
val brokers = "hadoop2:9095,hadoop3:9095,hadoop4:9095"
//kafka 参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder"//kafka 序列化类
)
// 创建streams
val kafkaStreamRDDs = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//打印每行值
kafkaStreamRDDs.foreachRDD(rdd =>
rdd.foreach(line =>
print(line)
)
)
ssc.start()
ssc.awaitTermination()
}
}
四 dependencies
<dependency><!-- Spark Streaming Kafka -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20170516</version>
</dependency>
相关文章推荐
- spark streaming从指定offset处消费Kafka数据
- sparkstreaming消费kafka中的数据
- 2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式
- Spark Streaming通过直连的方式消费Kafka中的数据
- SparkStreaming消费kafka数据
- spark streaming从指定offset处消费Kafka数据
- Spark Streaming消费Kafka Direct方式数据零丢失实现
- spark streaming从指定offset处消费Kafka数据
- spark-streaming 编程(三)连接kafka消费数据
- SparkStreaming读取Kafka数据
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统(转)
- 如何管理Spark Streaming消费Kafka的偏移量(二)
- Spark Streaming获取kafka数据的两种方式
- spark streaming 接收 kafka 数据java代码WordCount示例
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Spark Streaming kafka实现数据零丢失的几种方式
- (转载)Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- Sparak-Streaming基于Offset消费Kafka数据
- spark streaming统计kafka数据计数不准的问题
- spark streaming从kafka获取数据,计算处理后存储到redis