您的位置:首页 > 其它

kafka生产者消费者API 与sparkStreaming 整合(scala版)

2018-03-07 11:16 495 查看
maven配置文件

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --&g
4000
t;
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>


1. kafka生产者


import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.Source
import scala.reflect.io.Path

class KafkaProduceMsg extends Runnable {

private val BROKER_LIST = "slave6:9092,slave7:9092"
private val TOPIC = "kafka"
private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt"

/**
* 1、配置属性
* metadata.broker.list : kafka集群的broker
* serializer.class : 如何序列化发送消息
* request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0
* producer.type : 默认就是同步sync
*/
private val props = new Properties()
props.put("bootstrap.servers",BROKER_LIST)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("request.required.acks", "1")
props.put("producer.type", "async")

private val producer = new KafkaProducer[String,String](props)

def run(): Unit = {
println("开始生产消息!!!!!!!!!!")
while(true){
val files = Path(this.DIR).walkFilter(p => p.isFile)
try {
for(file <- files){
val reader = Source.fromFile(file.toString(),"UTF-8")
for(line <- reader.getLines()){
var m = 0
while(m < 10){
val record = new ProducerRecord[String,String](this.TOPIC,"key",line)
m = m + 1
println(m + "" + record)
producer.send(record)
}
try{
Thread.sleep(3000)
}catch {
case e : Exception => println(e)
}
}
}
}catch{
case e : Exception => println(e)
}
}
}
}


生产者执行程序:

object Msg {
def main(args: Array[String]): Unit = {
new Thread(new KafkaProduceMsg()).start()
}

}


2. 消费者sparkStreaming


import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* 2.spark-streaming消费数据,匹配应用层是否含有制定关键字,
*   如果包含就存储下来,不包含就丢弃
*/
object KafkaConsumer {
def main(args: Array[String]): Unit = {
//    创建sparksession
val conf = new SparkConf().setAppName("Consumer")
val ssc = new StreamingContext(conf,Seconds(5))
//    设置中间存储的检查点,可以进行累计计算
//    ssc.checkpoint("hdfs://master:9000/xxx")
//    读取kafka数据
val kafkaParam = Map("metadata.broker.list" -> "slave6:9092,slave7:9092")
val topic = "kafka".split(",").toSet
//    获取日志数据
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
logDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: