您的位置:首页 > 编程语言 > Python开发

Spark的StructuredStream实例(python版)

2020-02-02 17:12 633 查看

基于python的StructuredStream开发实例示例如下:

from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *
from pyspark.sql.functions import window

spark = SparkSession \
.builder \
.master("local[2]") \
.appName("StructuredStreamTest") \
.getOrCreate()

lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9984) \
.load()

listLines = lines.select(split(lines.value, ",")).toDF("value")

pdwData = listLines.select(listLines.value[0].cast(FloatType()), listLines.value[1].cast(FloatType()), listLines.value[2].cast(DoubleType()), \
listLines.value[3].cast(ByteType()), listLines.value[4].cast(FloatType()), listLines.value[5].cast(TimestampType())).toDF("rf", \
"pw", "toa", "inpulseType", "doa", "timestamp")

#inpulseTypeCount = pdwData.groupBy("inpulseType").count()
#inpulseTypeCount = pdwData.groupBy("timestamp").count()
inpulseTypeWindowCount = pdwData.groupBy(window(pdwData.timestamp, "1 second", "1 second"), pdwData.inpulseType).count()

query = inpulseTypeWindowCount \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

query.awaitTermination()
  • 点赞
  • 收藏
  • 分享
  • 文章举报
常思大妹子 发布了11 篇原创文章 · 获赞 7 · 访问量 216 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: