Spark的StructuredStream实例(python版)
2020-02-02 17:12
633 查看
基于python的StructuredStream开发实例示例如下:
from pyspark.sql import Row, SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.types import * from pyspark.sql.functions import window spark = SparkSession \ .builder \ .master("local[2]") \ .appName("StructuredStreamTest") \ .getOrCreate() lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9984) \ .load() listLines = lines.select(split(lines.value, ",")).toDF("value") pdwData = listLines.select(listLines.value[0].cast(FloatType()), listLines.value[1].cast(FloatType()), listLines.value[2].cast(DoubleType()), \ listLines.value[3].cast(ByteType()), listLines.value[4].cast(FloatType()), listLines.value[5].cast(TimestampType())).toDF("rf", \ "pw", "toa", "inpulseType", "doa", "timestamp") #inpulseTypeCount = pdwData.groupBy("inpulseType").count() #inpulseTypeCount = pdwData.groupBy("timestamp").count() inpulseTypeWindowCount = pdwData.groupBy(window(pdwData.timestamp, "1 second", "1 second"), pdwData.inpulseType).count() query = inpulseTypeWindowCount \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- 使用docker安装部署Spark集群来训练CNN(含Python实例)
- spark编程python实例
- 使用docker安装部署Spark集群来训练CNN(含Python实例)
- 使用docker安装部署Spark集群来训练CNN(含Python实例)
- spark编程python实例
- spark--二十种特征变换方法及Spark MLlib调用实例(Scala/Java/python)(一)
- 使用docker安装部署Spark集群来训练CNN(含Python实例)
- Spark MLlib TF-IDF算法原理及调用实例(Scala/Java/python)
- kafka+spark streaming代码实例(pyspark+python)
- 我的spark python 决策树实例
- Spark及HDFS环境下使用python的wordcount实例
- 使用docker安装部署Spark集群来训练CNN(含Python实例)
- Spark--python开发实例
- Python 正则表达式---概念基础,匹配过程,表达式元字符和语法以及实例
- Python FTP两个文件夹间的同步实例代码
- knn算法实例(python)
- python发邮件实例
- python redis 删除key脚本的实例
- Spark实例演示之SparkPi
- Python开发实例分享bt种子爬虫程序和种子解析