2,Spark分桶sink、常用算子、以及自定义函数(UDF、UDAF)
2021-09-07 23:23
666 查看
Spark 实战
[toc]
一、Spark分桶写出到HDFS
- **Spark没有分桶sink:**Spark并没有像Flink那样提供分桶sink,所以就需要自定义OutputFormat类;
- **自定义output类:**MultipleTextOutputFormat
// 并行写出,每个分区同时写数据到磁盘文件,如果两个分区要写入数据到同一个文件,则后写入的会覆盖先写入的,因此要避免不同分区往同一个文件中写数据 class MultipleOutputFormat extends MultipleTextOutputFormat[String, String] { val filePathSepChar = "/" // 根据数据中的key和value来计算路径 override def generateFileNameForKeyValue(key: String, value: String, name: String): String = { val dayHour = key.split(" ") val file = dayHour(2) // 文件编号就是分区编号,这样一个分区就只写到一个文件中 val path = dayHour(0) + filePathSepChar + dayHour(1) + filePathSepChar + "part" + file path } /* 不写入key */ override def generateActualKey(key: String, value: String): String = { null } }
- **使用saveAsHadoopFile:**并传入自定义的OutputFormat类
val midPart = source.mapPartitionsWithIndex((idx, items) => { val num = idx % 2 val dir = addHour("2020-07-15 00", num) // 目录 val path = dir + " " + idx // 拼上分区编号作为文件名,避免多个分区同时写入同一个文件,从而导致覆盖(数据丢失) items.map(row => (path, row)) }) midPart.foreach(println) // 由于MultipleTextOutputFormat是分区并行落盘,所以当两个分区写入同一个文件时,就会产生覆盖,所以这里通过HashPartitioner将相同key分到同一个分区,避免覆盖 // 新的分区个数最好与上面的目标分区分数相同(2),否则多出的分区是空的 // val res = midPart.partitionBy(new HashPartitioner(2)) midPart.saveAsHadoopFile(outputPathStr, classOf[String], classOf[String], classOf[MultipleOutputFormat])
二、常用算子
2.1 DF与RDD之间的转换
**背景:**相对于spark core来说,spark sql 会生成执行计划,并且会做优化,因此最好用DataFrame代替RDD;而DSL分格相对于SQL来说,可重构和可扩展性都要强很多,所以下面就介绍DSL风格的常用算子;
RDD转DF:
import spark.implicits._ // 普通类型 val rdd: RDD[(Int, String, Int)] = null val df = rdd.toDF("id", "name", "age") val inputFields = new List("id", "name", "age") // 实际开发中,最好将列名抽离出来 val df = rdd.toDF(inputFields: _*) // 复杂类型 val rdd: RDD[((String, String, Int), Long)] = null val df = rdd.toDF("key", "value") // 其中key是struct(String,String,Int)的结构体
- DF转RDD:
val df: DataFrame = rdd.toDF("id", "name", "age") val rdd: RDD[Row] = df.rdd val mid = rdd.map(row=>{ val id = row.getAs[Int]("id") val name = row.getAs[String]("name") val list = row.getAs[Seq[String]]("list") // 不能使用 Array[String] })
- **DF转DS:**一般是使用样例类来作为ds的schema;
val ds: Dataset[(String, Int)] = df.as[(String, Int)]
2.2 RDD常用算子
2.2.1 转换算子
- **map类:**对每条数据做转换;
val res = rdd.map(x=>x) // 分区内每条数据调用一次; // 一次将一个分区的数据读进内存,然后再一条条处理;如果map里面要创建(解析json对象、jdbc对象、时间 // 转换对象)大对象或者耗时的对象,最好是使用mapPartition算子,性能有提升,但是要注意OOM问题; val res = rdd.mapPartition(iter=>iter.map(x=>x)) // 与mapPartition类似,只不过多了分区的编号index; val res = rdd.mapPartitionWithIndwx((index, iter)=>iter.map(x=>(index, x))) val res = rdd.flatMap(x=>x.split(" ")) // 将返回的Array[(String, Int)]类型压平为(String, Int)类型; val res = rdd.mapValue(x=>x) // kv算子,对value做map操作;
- 聚合类:
// kv算子,根据k分组;rdd中的groupByKey算子不会做combine优化; val res: RDD[(String, Iterable[Int])] = rdd.groupByKey() // kv算子,根据k分组,并聚合v;会使用combine优化; val res = rdd.reduceByKey(_+_) // kv算子,根据k分组,自定义分区内、分区间合并规则;用来优化分组topN问题,非常灵活; // 其他聚合算子底层也是调用这个算子; val res = rdd.combineByKey(craeteCombiner, mergeValue, mergeCombiners) // 加了初始值,自定义分区间聚合、分区间聚合; val res = rdd2.aggregateByKey(10)((part, value) => agg + 2 * value, (part1, part2) => agg + value) val res = rdd.foldByKey(10)((agg, value)=>agg+value) // 加了初始值 val res = rdd.cogroup(rdd2) // 返回RDD[(key, (Iterable[v1], iterable[v2]))],是sparkCore的join实现方式
- join类:
// 只有kv类型的rdd才可以join,并且会自动根据key连接; val res = rdd.join(rdd2) // 内连接,(key, (v1, v2)) val res = rdd.leftOuterJoin(rdd2) // 左连接 val res = rdd.fullOuterJoin(rdd2) // 全连接 val res = rdd.cartesian(rdd2) // 笛卡尔积;分区数是 rdd分区数 * rdd2分区数;不是shuffle算子;
- 重分区:
// 不触发shuffle的重分区,不能扩大分区数; // 如果上一个stage0的shuffle write是1000个分区,那么当前stage1会启动1000个task来处理,一个task // 处理一个分区;但是如果当前stage1有coalesce(100),那么当前stage1只会启动100个task,一个task处理 //10个分区(10个分区作为一组);因此coalesce后的分区数不要与原分区数相差太大,否则运行很慢甚至OOM; val res = rdd.coalesce(100) val res = rdd.repartition(100) // 触发shuffle的重分区,根据随机key分区,等价于rdd.coalesce(100, true); val res = rdd.partitionBy(new HashPartitioner(100)) // kv算子,根据key的hash值分区,可以自定义分区器; // 重分区,并使分区内有序;比先repartition再sort的性能高,因为这个是在shuffle时边shuffle边排序; val res = rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3)) // kv算子
- 其他:
val res = rdd.filter(x=>x=="name") // 过滤 val res = rdd.distinct() // 去重 // 先是分区间有序,然后分区内有序,相同key会放到同一个分区; // sortBy采用抽样来确定分区的边界,从而使分区数据均衡;抽样时用到了collect会生成job; // 1,sample:创建RangePartitioner,对输入数据的key做sample来估算key的分布,然后排序切分出range // 2,shuffle write:用上面的RangePartitioner对数据重分区,使相同范围数据在同一分区,即分区间有序; // 3,shuffle read:每个reduce拉取自己分区的数据,然后分区内排序; // 最后就达到了分区间有序和分区内有序,也就是全局有序; // 难点就在于如何确定range的边界,以及快速将一个值映射到partition里,解决思路是:通过抽样来确定 // range的边界,通过字典树来构建索引,来快速找到分区; val res = rdd.sortBy(x=>x._1, ascending=false) // 排序,第二个参数:是否升序; // 由行动算子触发;task在读取分区时,先从cacheManager判断是否有缓存,有就直接获取,没有就计算 // 获取缓存时,先通过(rddId, 分区Id)到blockManager中查找block信息,有就直接获取,没有就计算 // 此时重新计算会加锁,因为可能会有多个线程同时读取(例如笛卡尔积),获取到锁的线程会计算该分区的数据 // 并缓存起来,后续的线程直接读缓存;用show和take触发的缓存,只会缓存计算的分区,不会缓存所有分区 val res = rdd.persist(StorageLevel) // 缓存,默认是内存 val res = rdd.union(rdd2) // 并集,不会触发shuffle val res = rdd.intersection(rdd2) // 交集 val res = rdd.subtract(rdd2) // 差集 val res = rdd.glom() // 将一个分区的数据合并为一个数组 val res = rdd.sample(false, 0.1) // 抽样,(是否可放回,抽样比例[0, 1]),0.1=10%; val res = rdd.toDebugString // 打印rdd的
2.2.2 行动算子
- 有返回值:
val res = rdd.count() // 求数量 val res = rdd.collect() // 将数据拉取到driver端 val res = rdd.take(10) // 返回前10条数据,只计算部分分区 val res = rdd.reduce((x, y)=>x+y) // 聚合算子,直接返回聚合后的结果 val res = rdd.countByKey() // kv算子,统计key出现的次数,返回返回Map[(String, Long)] val res = rdd.countByValue() // 统计数据出现的次数,返回Map[(String, Long)]
- 无返回值:
val res = rdd.foreach() // 遍历每条数据,类似于map,但是没有返回值 val res = rdd.saveAsTextFile("path") // 保存到磁盘
2.3 DataFrame常用算子
- **select:**select中只能有列名,因为参数类型为Column;
df.select("name", "age") df.select(col("name").as("rename")) // col("name") 等价于 $"name" df.select($"name".as("rename")) df.select(Column("*")) // 获取所有列 // 可以在select中使用的方法 df.select(concat($"name", $"id")) // concat中只能是列名,不能是字符串;concat_ws同理 // 获取struct类型中的字段;例如:(id, (name, age))类型的RDD转为(id, info)的DF,则DF的info字段就是 // struct类型,此时struct中的字段名默认是 '_1', '_2';可以通过col("info").getField("_1")来获取name字段; df.select(col("struct").getField("id"))
- **selectExpr:**执行字符串格式的SQL表达式;
df.selectExpr("concat(name, 'sd') as name", "id / 10 as id") // 直接解析SQL字符串
- **where / filter:**过滤
df.where($"id" =!= 2) df.where($"id" === 3) df.where($"id">=3 and $"name" =!= "a") // and 等价于 && df.where("id>=2 and name != 'a' ") // 可以直接执行SQL字符串
- **sort:**排序
// 全局排序;先是分区有序,然后分区内有序,相同key会放到同一个分区; df.sort($"name".desc) df.sortWithinPartitions($"id") // 分区内排序
- **group by:**分组,分组后只能接聚合算子
df.groupBy($"name").agg("id" -> "max", "id" -> "sum") // "id"->"max" 等价于 max(id) df.groupBy($"name").agg(collect_set($"id").as("ids"), count(lit(1)).as("cnt"))
- **join:**连接
// 注意,这里不能把 Seq("name")改为 $"name",会报错;使用Seq("key")时,右表的key会自动去掉; df1.join(df2, Seq("name"), "left") df2.as("dfa").join(df2.as("dfb"), $"dfa.name" === $"dfb.name", "left") // 给表取别名,然后自定义连接字段 df1.join(df2) // 不加连接条件的就是笛卡尔积,必须设spark.sql.crossJoin.enabled为true;
- **over():**开窗函数
val window = Window.partitionBy("name").orderBy($"id".desc) val mid = df.withColumn("rnk", row_number().over(window))
- 对列的操作
df.withColumn("gid", $"id"+1) // 增加一列,列名为gid,如果gid列名存在,则会更新该列; df.withColumnRenamed("name", "new_name") // 重命名 df.drop("name") // 删除一列
- read:
// 读取CSV文件,option("header", "true")表示csv文件中包含header信息,自动加载第一行作为header val csv = spark.read.option("header", "true").csv("path") // 读取resource中的文件,并转为rdd;spark的read只支持读取本地和分布式文件,所以用下面的方式; val source = this.getClass.getClassLoader.getResourceAsStream("sample.csv") val lines = Source.fromInputStream(source)("UTF-8").getLines() val rdd = spark.sparkContext.parallelize(lines)
三、自定义函数
- **UDF:**自定义基本函数
// 通过实名函数创建 spark.sqlContext.udf.register("myudf", myUdf _) // 通过匿名函数创建 spark.sqlContext.udf.register("myudf", (a: String, b: Int)=>{"sd_" + a + b}) // 使用 df.selectExpr("myudf(name, id) as str") def myUdf(a: String, b: Int): String ={ // 自定义函数最多支持22个参数 val res = "sd_" + a + b res }
- **UDAF:**自定义聚合函数;需要实现UserDefinedAggregateFunction或者 类型安全的Aggregator接口
// 注册自定义聚合函数 spark.sqlContext.udf.register("avgudaf", new AvgUDAF) df.groupBy().agg("id" -> "avgudaf") // 定义自定义聚合函数 class AvgUDAF extends UserDefinedAggregateFunction{ //输入的数据类型 override def inputSchema: StructType = { StructType(Array(StructField("id",IntegerType))) } //中间聚合处理时,所处理的数据类型 override def bufferSchema: StructType = { StructType(Array(StructField("sum", DoubleType), StructField("count", IntegerType))) } //函数的返回类型 override def dataType: DataType = { DoubleType } // 是否为幂等函数 override def deterministic: Boolean = { true } //为每个分组的数据初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0.0 // 初始化sum buffer(1)=0 // 初始化count } //指的是,每个分组,有新的值进来时,如何进行分组的聚合计算 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val in = input.getInt(0).toDouble // 获取输入的id buffer(0) = buffer.getDouble(0) + in // buffer(0)就是sum,buffer(1)就是count buffer(1) = buffer.getInt(1) + 1 } //由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update //但是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val sum1 = buffer1.getDouble(0) val sum2 = buffer2.getDouble(0) val count1 = buffer1.getInt(1) val count2 = buffer2.getInt(1) buffer1(0) = sum1 + sum2 buffer1(1) = count1 + count2 } //一个分组的聚合值,如何通过中间的聚合值,最后返回一个最终的聚合值 override def evaluate(buffer: Row): Any = { val sum = buffer.getDouble(0) val count = buffer.getInt(1) val avg = (sum / count).formatted("%.2f").toDouble avg } }
相关文章推荐
- SparkSQL 自定义算子UDF、UDAF、UDTF
- 【Spark篇】---SparkSQL中自定义UDF和UDAF,开窗函数的应用
- Spark学习之路 (十九)SparkSQL的自定义函数UDF
- Hive数据仓库--UDF自定义函数以及其中的坑
- HIVE 自定义函数之UDF/UDAF/UDTF
- Spark SQL 解析-UDF,UDAF,开窗函数
- Hive 中的自定义函数(udf,udtf,udaf)
- GCC和IAR编译器内建函数以及C常用自定义函数
- Spark(十三)SparkSQL的自定义函数UDF与开窗函数
- HIVE自定义函数之UDF,UDAF和UDTF
- HIVE 自定义函数之UDF/UDAF/UDTF
- MySql常用函数以及自定义函数详解
- hive-2 自定义函数UDF、UDAF、UDTF介绍及区别
- Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)
- hive 自定义函数UDF,UDAF
- 大数据之hive:自定义函数udf,udaf,udtf的区别
- Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)
- Spark-sql 自定义UDF函数 UDAF 以及开窗函数 JAVA和SCALA版本
- hive----自定义UDF 函数-----时间格式化以及取出双引号的代码
- hive中添加自定义udf udaf udtf等函数的jar文件的三种方法