Spark Structured Streaming如何操作数据集?10分钟案例入门
从Spark2.0版本以来,DataFrames和Datasets类型既可以表示静态的有界数据,也可以表示流式的***数据。与静态Datasets/DataFrames类似,都可以使用最基本的SparkSession类从数据源创建流数据集,并执行与静态数据集相同的操作,比如分组、转换或者聚合操作等。
如何创建流式数据流和流式数据集?先从了解输入源入手。流数据集可以通过SparkSession.readStream()返回的DataStreamReader接口创建。具体定义数据源时,可以指定源数据的格式、模式和选项等详细信息。
以下是几个内置的数据输入源:
- 文件:以数据流的形式读取文件,文件格式支持文本/csv/json/orc/parquet。
- Kafka:从Kafka读取数据。它与Kafka版本0.10.0或更高版本兼容。
- Socket套接字(用于测试目的):从套接字连接中读取UTF8文本数据。监听服务器在Driver端运行。一般这种模式仅用于测试,原因是它不提供端到端的容错保证,不具备高可用性。
- 速率源(RateSource,用于测试):以固定频率生成数据,比如每秒N行生成数据,每个输出行包含时间戳和值。其中timestamp是消息分发时间,value是消息长度。
如上所述,有些输入源不具备容错性,因为它们不能保证在发生故障后,可以通过检查点偏移量重放数据。
图1:Structured Streaming结构化流编程模型
以下是从不同源读取数据的示例,简单看一下大致能了解其脉络:
例1:从Socket源读取数据
// 构建SparkSession作为操作数据集的入口 SparkSession spark = SparkSession.builder() .appName("SocketProcessor").getOrCreate(); // 从Socket中读取数据 Dataset<Row> socketDF = spark .readStream() .format("socket") //此处定义输入源格式 .option("host", "localhost") .option("port", 9999) .load(); // 如有流式输入源则返回True socketDF.isStreaming(); // 输入当前数据集中的Schema源信息 socketDF.printSchema();
例2:从Kafka数据源读取数据
// 构建SparkSession作为操作数据集的入口 SparkSession spark =SparkSession.builder() .appName("KafkaProcessor").getOrCreate(); // 定义StructType用于描述自定义类型 StructType reportMsgSchema = newStructType() .add("token", "string") .add("content", "string"); // 定义主机端口与Topic,从Kafka流式读取数据 Dataset<Row> dataset =spark.readStream().format("kafka") .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") .option("subscribe", "sysalert").load(); // 对Dataset类型进行转换 // 通过预定义from_json函数与Schema将读取到的字符串转化为JSON Dataset<Row> untypedDs = dataset .select(functions.from_json(functions.col("value") .cast("string"),reportMsgSchema).alias("msg")) .select("msg.token", "msg.content");
例3:从JSON文件中读取数据
spark.read().json("/demo/msg.json").show();
一般默认情况下,比如例子2中读取Kafka流数据,需要通过StructType对输入源指定模式(避免Spark自动推断)。只有在某些特殊情况下,才通过设置spark.sql.streaming=true来启用模式推理。
对流式数据的常用操作
编写作业时,可以在数据流上进行各种操作,比如无类型的SQL操作(select/where/groupBy)、类型化的RDD操作(map/filter/flatMap)。下面例子中介绍了几个使用频率比较高的操作。
// 定义数据模型 public class DeviceData { private String device; private String deviceType; private Double signal; private java.sql.Date time; ... // Getter/setter方法 }
从输入源中读取数据集(具体代码省略,可参考例2)
Dataset<Row> df = ...;
对数据集的类型进行转换,从Row转换为具体自定义类型
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class));
Select操作示例:选择signal大于10的数据集(注:针对df和ds结果相同)
df.select("device").where("signal> 10"); // 针对Row类型数据集操作 ds.filter((FilterFunction<DeviceData>)value -> value.getSignal() > 10) .map((MapFunction<DeviceData, String>) value ->value.getDevice(), Encoders.STRING());
针对deviceType字段实现分组计数
df.groupBy("deviceType").count();// 针对Row类型数据集操作
根据deviceType字段分组并求signal字段的平均值
ds.groupByKey((MapFunction<DeviceData,String>) value -> value.getDeviceType(), Encoders.STRING()) .agg(typed.avg((MapFunction<DeviceData, Double>) value ->value.getSignal()));
以上是关于StructuredStreaming结构化流中对于如何读取输入源,并对读取的流式数据集进行简单操作的示例。后续文章我们将继续探讨更深入的内容。
参考资料:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- Spark 2.2.1 JSON 数据集操作的案例与解读
- Spark 2.0从入门到精通245讲——操作RDD(action案例实战)
- Spark 2.0从入门到精通245讲——操作RDD(transformation案例实战)
- python用字符串操作20行代码简单爬虫入门+案例(爬取一章《三体》小说)
- 如何画双层pcb板_双层pcb板布线规则(操作技巧与案例分析)
- (1) 如何用Apache POI操作Excel文件-----入门
- 如何使用git工具进行分支管理——git入门操作
- Spark SQL概述,DataFrames,创建DataFrames的案例,DataFrame常用操作(DSL风格语法),sql风格语法
- Spark快速入门的简单程序案例
- Spark DataFrames入门指南:创建和操作DataFrame
- Spark编程指南入门之Java篇二-基本操作
- 大数据Spark “蘑菇云”行动前传第2课:Scala零基础实战入门的第一堂课及如何成为Scala高手
- 如何画双层pcb板_双层pcb板布线规则(操作技巧与案例分析)
- spark大数据入门(一)如何在windows下部署spark开发环境
- 如何画双层pcb板_双层pcb板布线规则(操作技巧与案例分析)
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- kindle如何关闭“10分钟无任何操作进入屏幕保护”, 也就是常亮
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Hibernate学习-03:入门案例(CRUD(增删改查)操作之添加记录)
- spark弹性分布式数据集基本操作