您的位置:首页 > 编程语言

SparkStreaming+Kafka样例代码

2018-02-09 14:43 351 查看

场景:SparkStreaming实时消费Kafka的消息队列,并将处理后的消息打印到控制台

假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能:

实时统计连续网购时间超过半个小时的女性网民信息。
周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。
log1.txt:周六网民停留日志
LiuYang,female,20
YuanJing,male,10
GuoYijun,male,5
CaiXuyu,female,50
Liyuan,male,20
FangBo,female,50
LiuYang,female,20
YuanJing,male,10
GuoYijun,male,50
CaiXuyu,female,50
FangBo,female,60

log2.txt:周日网民停留日志
LiuYang,female,20
YuanJing,male,10
CaiXuyu,female,50
FangBo,female,50
GuoYijun,male,5
CaiXuyu,female,50
Liyuan,male,20
CaiXuyu,female,50
FangBo,female,50
LiuYang,female,20
YuanJing,male,10
FangBo,female,50
GuoYijun,male,50
CaiXuyu,female,50
FangBo,female,60


数据规划

Spark Streaming样例工程的数据存储在Kafka组件中(需要有Kafka权限用户)。
本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt的内容复制保存到input_data1.txt,将log2.txt的内容复制保存到input_data2.txt。
在客户端安装节点下创建文件目录:“/home/omm/data”。将上述两个文件上传到此“/home/data”目录下。
将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”值设置为“true”。
启动Kafka的Producer,向Kafka发送数据。

提交命令格式:java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList}
{Topic}

提交命令:java -cp /dev/shm/yanguangyu/spark/SparkStreamingExample.jar com.huawei.bigdata.spark.examples.StreamingExampleProducer 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005 SparkOnHBaseCustCostData

开发思路

统计日志文件中本周末网购停留时间超过半小时的女性网民信息。

主要分为三个部分:

接收Kafka中数据,生成相应DStream。
筛选女性网民上网时间数据信息。
筛选连续上网时间超过阈值的用户,并获取结果。

【Scala样例代码】

// 参数解析:

    // <checkPointDir>为checkpoint目录。

    // <batchTime>为Spark Streaming分批的处理间隔。

    // <topics>为Kafka中订阅的主题,多以逗号分隔。

    // <brokers>为获取元数据的kafka地址。

    val Array(checkPointDir, batchTime, topics, brokers) = args

    val batchDuration = Seconds(batchTime.toInt)

   

    // 建立Spark Streaming启动环境

    val sparkConf = new SparkConf()

    sparkConf.setAppName("DataSightStreamingExample")

    val ssc = new StreamingContext(sparkConf, batchDuration)

    // 设置Spark Streaming的CheckPoint目录

    ssc.checkpoint(checkPointDir)

    // 组装Kafka的主题列表

     val topicsSet = topics.split(",").toSet

    // 通过brokers和topics直接创建kafka stream

    // 1.接收Kafka中数据,生成相应DStream

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicsSet).map(_._2)

    // 2.获取每一个行的字段属性

    val records = lines.map(getRecord)

    // 3.筛选女性网民上网时间数据信息

    val femaleRecords = records.filter(_._2 == "female")

      .map(x => (x._1, x._3))

   

    // 4.筛选连续上网时间超过阈值的用户,并获取结果

    femaleRecords.filter(_._2 > 30).print()

    // 5.Spark Streaming系统启动

    ssc.start()

    ssc.awaitTermination()上述代码会引用以下函数

  // 获取字段函数

  def getRecord(line: String): (String, String, Int) = {

    val elems = line.split(",")

    val name = elems(0)

    val sexy = elems(1)

    val time = elems(2).toInt

    (name, sexy, time)

  }

打包后提交命令:

spark-submit --master yarn-client --jars /dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka-clients-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka_2.11-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Kafka/kafka/libs/kafka_2.10-0.10.0.0.jar
--class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /dev/shm/yanguangyu/spark/SparkStreamingExample.jar /tmp 1 SparkOnHBaseCustCostData 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: