kafka生产者消费者API 与sparkStreaming 整合(scala版)
2018-03-07 11:16
495 查看
maven配置文件
生产者执行程序:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --&g 4000 t; <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency>
1. kafka生产者
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import scala.io.Source import scala.reflect.io.Path class KafkaProduceMsg extends Runnable { private val BROKER_LIST = "slave6:9092,slave7:9092" private val TOPIC = "kafka" private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt" /** * 1、配置属性 * metadata.broker.list : kafka集群的broker * serializer.class : 如何序列化发送消息 * request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0 * producer.type : 默认就是同步sync */ private val props = new Properties() props.put("bootstrap.servers",BROKER_LIST) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("request.required.acks", "1") props.put("producer.type", "async") private val producer = new KafkaProducer[String,String](props) def run(): Unit = { println("开始生产消息!!!!!!!!!!") while(true){ val files = Path(this.DIR).walkFilter(p => p.isFile) try { for(file <- files){ val reader = Source.fromFile(file.toString(),"UTF-8") for(line <- reader.getLines()){ var m = 0 while(m < 10){ val record = new ProducerRecord[String,String](this.TOPIC,"key",line) m = m + 1 println(m + "" + record) producer.send(record) } try{ Thread.sleep(3000) }catch { case e : Exception => println(e) } } } }catch{ case e : Exception => println(e) } } } }
生产者执行程序:
object Msg { def main(args: Array[String]): Unit = { new Thread(new KafkaProduceMsg()).start() } }
2. 消费者sparkStreaming
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 2.spark-streaming消费数据,匹配应用层是否含有制定关键字, * 如果包含就存储下来,不包含就丢弃 */ object KafkaConsumer { def main(args: Array[String]): Unit = { // 创建sparksession val conf = new SparkConf().setAppName("Consumer") val ssc = new StreamingContext(conf,Seconds(5)) // 设置中间存储的检查点,可以进行累计计算 // ssc.checkpoint("hdfs://master:9000/xxx") // 读取kafka数据 val kafkaParam = Map("metadata.broker.list" -> "slave6:9092,slave7:9092") val topic = "kafka".split(",").toSet // 获取日志数据 val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2) logDStream.print() ssc.start() ssc.awaitTermination() ssc.stop() } }
相关文章推荐
- 第89课程 Spark STREAMING kafka 测试完成!生产者发数据,消费者收数据
- Spark streaming kafka1.4.1中的低阶api createDirectStream使用总结(Scala实现)
- Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合
- Zookeeper+Kafka+Spark streaming单机整合开发
- Spark Streaming + Kafka整合
- [Spark][kafka]kafka 生产者,消费者 互动例子
- 整合Kafka到Spark Streaming——代码示例和挑战
- Kafka+Spark Streaming+Redis实时计算整合实践
- Kafka+Spark Streaming+Redis实时计算整合实践
- 整合Kafka到Spark Streaming——代码示例和挑战
- Maven+Eclipse+SparkStreaming+Kafka整合
- 【总结】Spark Streaming和Kafka整合保证数据零丢失
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- Kafka 生产者消费者 Java API 编程
- 整合Kafka到Spark Streaming——代码示例和挑战
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- Spark Streaming整合Flume + Kafka wordCount
- Spark Streaming整合logstash + Kafka wordCount
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结(转)
- Kafka+Spark Streaming+Redis实时计算整合实践