SparkStreaming+Kafka样例代码
2018-02-09 14:43
351 查看
场景:SparkStreaming实时消费Kafka的消息队列,并将处理后的消息打印到控制台
假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能:
实时统计连续网购时间超过半个小时的女性网民信息。
周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。
log1.txt:周六网民停留日志
log2.txt:周日网民停留日志
本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt的内容复制保存到input_data1.txt,将log2.txt的内容复制保存到input_data2.txt。
在客户端安装节点下创建文件目录:“/home/omm/data”。将上述两个文件上传到此“/home/data”目录下。
将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”值设置为“true”。
启动Kafka的Producer,向Kafka发送数据。
提交命令格式:java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList}
{Topic}
提交命令:java -cp /dev/shm/yanguangyu/spark/SparkStreamingExample.jar com.huawei.bigdata.spark.examples.StreamingExampleProducer 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005 SparkOnHBaseCustCostData
主要分为三个部分:
接收Kafka中数据,生成相应DStream。
筛选女性网民上网时间数据信息。
筛选连续上网时间超过阈值的用户,并获取结果。
【Scala样例代码】
// 参数解析:
// <checkPointDir>为checkpoint目录。
// <batchTime>为Spark Streaming分批的处理间隔。
// <topics>为Kafka中订阅的主题,多以逗号分隔。
// <brokers>为获取元数据的kafka地址。
val Array(checkPointDir, batchTime, topics, brokers) = args
val batchDuration = Seconds(batchTime.toInt)
// 建立Spark Streaming启动环境
val sparkConf = new SparkConf()
sparkConf.setAppName("DataSightStreamingExample")
val ssc = new StreamingContext(sparkConf, batchDuration)
// 设置Spark Streaming的CheckPoint目录
ssc.checkpoint(checkPointDir)
// 组装Kafka的主题列表
val topicsSet = topics.split(",").toSet
// 通过brokers和topics直接创建kafka stream
// 1.接收Kafka中数据,生成相应DStream
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
// 2.获取每一个行的字段属性
val records = lines.map(getRecord)
// 3.筛选女性网民上网时间数据信息
val femaleRecords = records.filter(_._2 == "female")
.map(x => (x._1, x._3))
// 4.筛选连续上网时间超过阈值的用户,并获取结果
femaleRecords.filter(_._2 > 30).print()
// 5.Spark Streaming系统启动
ssc.start()
ssc.awaitTermination()上述代码会引用以下函数
// 获取字段函数
def getRecord(line: String): (String, String, Int) = {
val elems = line.split(",")
val name = elems(0)
val sexy = elems(1)
val time = elems(2).toInt
(name, sexy, time)
}
打包后提交命令:
spark-submit --master yarn-client --jars /dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka-clients-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka_2.11-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Kafka/kafka/libs/kafka_2.10-0.10.0.0.jar
--class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /dev/shm/yanguangyu/spark/SparkStreamingExample.jar /tmp 1 SparkOnHBaseCustCostData 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005
场景:SparkStreaming实时消费Kafka的消息队列,并将处理后的消息打印到控制台
假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能:
实时统计连续网购时间超过半个小时的女性网民信息。
周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。
log1.txt:周六网民停留日志
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
log2.txt:周日网民停留日志
LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
数据规划
Spark Streaming样例工程的数据存储在Kafka组件中(需要有Kafka权限用户)。本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt的内容复制保存到input_data1.txt,将log2.txt的内容复制保存到input_data2.txt。
在客户端安装节点下创建文件目录:“/home/omm/data”。将上述两个文件上传到此“/home/data”目录下。
将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”值设置为“true”。
启动Kafka的Producer,向Kafka发送数据。
提交命令格式:java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList}
{Topic}
提交命令:java -cp /dev/shm/yanguangyu/spark/SparkStreamingExample.jar com.huawei.bigdata.spark.examples.StreamingExampleProducer 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005 SparkOnHBaseCustCostData
开发思路
统计日志文件中本周末网购停留时间超过半小时的女性网民信息。主要分为三个部分:
接收Kafka中数据,生成相应DStream。
筛选女性网民上网时间数据信息。
筛选连续上网时间超过阈值的用户,并获取结果。
【Scala样例代码】
// 参数解析:
// <checkPointDir>为checkpoint目录。
// <batchTime>为Spark Streaming分批的处理间隔。
// <topics>为Kafka中订阅的主题,多以逗号分隔。
// <brokers>为获取元数据的kafka地址。
val Array(checkPointDir, batchTime, topics, brokers) = args
val batchDuration = Seconds(batchTime.toInt)
// 建立Spark Streaming启动环境
val sparkConf = new SparkConf()
sparkConf.setAppName("DataSightStreamingExample")
val ssc = new StreamingContext(sparkConf, batchDuration)
// 设置Spark Streaming的CheckPoint目录
ssc.checkpoint(checkPointDir)
// 组装Kafka的主题列表
val topicsSet = topics.split(",").toSet
// 通过brokers和topics直接创建kafka stream
// 1.接收Kafka中数据,生成相应DStream
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
// 2.获取每一个行的字段属性
val records = lines.map(getRecord)
// 3.筛选女性网民上网时间数据信息
val femaleRecords = records.filter(_._2 == "female")
.map(x => (x._1, x._3))
// 4.筛选连续上网时间超过阈值的用户,并获取结果
femaleRecords.filter(_._2 > 30).print()
// 5.Spark Streaming系统启动
ssc.start()
ssc.awaitTermination()上述代码会引用以下函数
// 获取字段函数
def getRecord(line: String): (String, String, Int) = {
val elems = line.split(",")
val name = elems(0)
val sexy = elems(1)
val time = elems(2).toInt
(name, sexy, time)
}
打包后提交命令:
spark-submit --master yarn-client --jars /dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka-clients-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Spark2x/spark/jars/streamingClient/kafka_2.11-0.8.2.1.jar,/dev/shm/yanguangyu/hadoop_client/Kafka/kafka/libs/kafka_2.10-0.10.0.0.jar
--class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /dev/shm/yanguangyu/spark/SparkStreamingExample.jar /tmp 1 SparkOnHBaseCustCostData 99.12.166.121:21005,99.12.166.122:21005,99.12.166.123:21005
相关文章推荐
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- kafka+spark streaming代码实例(pyspark+python)
- 【问底】Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战
- "Spark Streaming + Kafka direct + checkpoints + 代码改变" 引发的问题(一)
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- Scala创建SparkStreaming获取Kafka数据代码过程
- spark streaming 接收 kafka 数据java代码WordCount示例
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- "Spark Streaming + Kafka direct + checkpoints + 代码改变" 引发的问题
- <转>整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- 整合Kafka到Spark Streaming——代码示例和挑战
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码