您的位置:首页 > 其它

sparkstreaming消费kafka中的数据

2016-06-04 18:56 399 查看
版本一

package com.kafka.my.scala

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Durations

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.kafka.KafkaUtils

import java.util.Properties

/**

 * 

 * @author root

 *  sparkstreaming获取kafka方式一

 *  测试结果:通过

 *测试:1\需要先在h15\h16\h17上启动zookeeper,再启动kafka,创建kafka的topic

 *          2\在h15kafka的bin目录下执行

 *          #sh kafka-console-producer.sh --topic 20160510a --broker-list h15:9092,h16:9092,h17:9092

 *          让其等待输入

 *          3\启动本程序接受数据

 *          4\在h15上刚才的窗口输入数据

 *          5\查看本程序是否正常接收

 *          

 *错误:java.nio.channels.ClosedChannelException

 * fetching topic metadata for topics [Set(20160510aa)] from broker [ArrayBuffer(id:2,host:h17,port:9092, id:1,host:h16,port:9092, id:0,host:h15,port:9092)] failed

   原因:server.propertis中的host.name=h15或者注释掉,否则报错

 */

object KafkaReceiverCountWord {

  def main(args: Array[String]): Unit = {

    //创建ssc

    val ssc=new StreamingContext(new SparkConf().setAppName("wordCount").setMaster("local[2]"),Durations.seconds(5))

    //创建properties

    val topicThreadMap=Array("20160510a").map { (_,1) }.toMap

    //创建客户端接收

     val lines =KafkaUtils.createStream(ssc, 

            "192.168.142.115:2181,192.168.142.116:2181,192.168.142.117:2181"

            , "WordcountConsumerGroup"

            , topicThreadMap)

      //切割

      val words =lines.flatMap(_._2.split(" "))

      //tuple

      val pairs=words.map { (_,1) }

     //reduceByKey

    val wordcounts=pairs.reduceByKey(_+_)

    //必须触发

    wordcounts.print()

    //开启

    ssc.start()

    //等待

    ssc.awaitTermination()

    //关闭

    ssc.stop()

  }

}

第二版本

package com.kafka.my.scala

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.Durations

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder

/**

 * 

 * @author root

 *  sparkstreaming获取kafka方式二

 *  测试结果:通过

 *  区别于第一种方式:

 *                  1、offset不会更新到zookeeper

 *                  2、使用的节点端口是9092kafka的broker端口

 *                  3、不存在group

 *  

 */

object KafkaDirectCountWord {

  def main(args: Array[String]): Unit = {

    //获取sparkstreaming

    val ssc=new StreamingContext(new SparkConf().setAppName("directCount").setMaster("local[2]"),Durations.seconds(5))

    //创建kafkaParams

    val kafkaParams=Array("metadata.broker.list").map {(_,"192.168.142.115:9092,192.168.142.116:9092,192.168.142.117:9092") }.toMap

    //创建topic

    val topics=Array("20160510a").toSet

//    val topics=Set("20160510a")

    //获取lines

    val lines=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)

     //切割

      val words =lines.flatMap(_._2.split(" "))

      //tuple

      val pairs=words.map { (_,1) }

     //reduceByKey

    val wordcounts=pairs.reduceByKey(_+_)

    //必须触发

    wordcounts.print()

    //开启

    ssc.start()

    //等待

    ssc.awaitTermination()

    //关闭

    ssc.stop()

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: