您的位置:首页 > 其它

2,Spark分桶sink、常用算子、以及自定义函数(UDF、UDAF)

2021-09-07 23:23 666 查看

Spark 实战

[toc]

一、Spark分桶写出到HDFS

  • **Spark没有分桶sink:**Spark并没有像Flink那样提供分桶sink,所以就需要自定义OutputFormat类;
  • **自定义output类:**MultipleTextOutputFormat
// 并行写出,每个分区同时写数据到磁盘文件,如果两个分区要写入数据到同一个文件,则后写入的会覆盖先写入的,因此要避免不同分区往同一个文件中写数据
class MultipleOutputFormat extends MultipleTextOutputFormat[String, String] {
val filePathSepChar = "/"

// 根据数据中的key和value来计算路径
override def generateFileNameForKeyValue(key: String, value: String, name: String): String = {
val dayHour = key.split(" ")
val file = dayHour(2) // 文件编号就是分区编号,这样一个分区就只写到一个文件中
val path = dayHour(0) + filePathSepChar + dayHour(1) + filePathSepChar + "part" + file
path
}

/* 不写入key */
override def generateActualKey(key: String, value: String): String = {
null
}
}
  • **使用saveAsHadoopFile:**并传入自定义的OutputFormat类
val midPart = source.mapPartitionsWithIndex((idx, items) => {
val num = idx % 2
val dir = addHour("2020-07-15 00", num) // 目录
val path = dir + " " + idx  // 拼上分区编号作为文件名,避免多个分区同时写入同一个文件,从而导致覆盖(数据丢失)
items.map(row => (path, row))
})
midPart.foreach(println)

// 由于MultipleTextOutputFormat是分区并行落盘,所以当两个分区写入同一个文件时,就会产生覆盖,所以这里通过HashPartitioner将相同key分到同一个分区,避免覆盖
// 新的分区个数最好与上面的目标分区分数相同(2),否则多出的分区是空的
//    val res = midPart.partitionBy(new HashPartitioner(2))
midPart.saveAsHadoopFile(outputPathStr, classOf[String], classOf[String], classOf[MultipleOutputFormat])


二、常用算子

2.1 DF与RDD之间的转换

  • **背景:**相对于spark core来说,spark sql 会生成执行计划,并且会做优化,因此最好用DataFrame代替RDD;而DSL分格相对于SQL来说,可重构和可扩展性都要强很多,所以下面就介绍DSL风格的常用算子;

  • RDD转DF:

import spark.implicits._
// 普通类型
val rdd: RDD[(Int, String, Int)] = null
val df = rdd.toDF("id", "name", "age")
val inputFields = new List("id", "name", "age")  // 实际开发中,最好将列名抽离出来
val df = rdd.toDF(inputFields: _*)
// 复杂类型
val rdd: RDD[((String, String, Int), Long)] = null
val df = rdd.toDF("key", "value")   // 其中key是struct(String,String,Int)的结构体
  • DF转RDD:
val df: DataFrame = rdd.toDF("id", "name", "age")
val rdd: RDD[Row] = df.rdd
val mid = rdd.map(row=>{
val id = row.getAs[Int]("id")
val name = row.getAs[String]("name")
val list = row.getAs[Seq[String]]("list")   // 不能使用 Array[String]
})
  • **DF转DS:**一般是使用样例类来作为ds的schema;
val ds: Dataset[(String, Int)] = df.as[(String, Int)]

2.2 RDD常用算子

2.2.1 转换算子
  • **map类:**对每条数据做转换;
val res = rdd.map(x=>x)  // 分区内每条数据调用一次;
// 一次将一个分区的数据读进内存,然后再一条条处理;如果map里面要创建(解析json对象、jdbc对象、时间
// 转换对象)大对象或者耗时的对象,最好是使用mapPartition算子,性能有提升,但是要注意OOM问题;
val res = rdd.mapPartition(iter=>iter.map(x=>x))
// 与mapPartition类似,只不过多了分区的编号index;
val res = rdd.mapPartitionWithIndwx((index, iter)=>iter.map(x=>(index, x)))
val res = rdd.flatMap(x=>x.split(" "))  // 将返回的Array[(String, Int)]类型压平为(String, Int)类型;
val res = rdd.mapValue(x=>x) // kv算子,对value做map操作;
  • 聚合类:
// kv算子,根据k分组;rdd中的groupByKey算子不会做combine优化;
val res: RDD[(String, Iterable[Int])] = rdd.groupByKey()
// kv算子,根据k分组,并聚合v;会使用combine优化;
val res = rdd.reduceByKey(_+_)
// kv算子,根据k分组,自定义分区内、分区间合并规则;用来优化分组topN问题,非常灵活;
// 其他聚合算子底层也是调用这个算子;
val res = rdd.combineByKey(craeteCombiner, mergeValue, mergeCombiners)
// 加了初始值,自定义分区间聚合、分区间聚合;
val res = rdd2.aggregateByKey(10)((part, value) => agg + 2 * value, (part1, part2) => agg + value)
val res = rdd.foldByKey(10)((agg, value)=>agg+value)  // 加了初始值
val res = rdd.cogroup(rdd2)   // 返回RDD[(key, (Iterable[v1], iterable[v2]))],是sparkCore的join实现方式
  • join类:
// 只有kv类型的rdd才可以join,并且会自动根据key连接;
val res = rdd.join(rdd2)  // 内连接,(key, (v1, v2))
val res = rdd.leftOuterJoin(rdd2)  // 左连接
val res = rdd.fullOuterJoin(rdd2)  // 全连接
val res = rdd.cartesian(rdd2)  // 笛卡尔积;分区数是 rdd分区数 * rdd2分区数;不是shuffle算子;
  • 重分区:
// 不触发shuffle的重分区,不能扩大分区数;
// 如果上一个stage0的shuffle write是1000个分区,那么当前stage1会启动1000个task来处理,一个task
// 处理一个分区;但是如果当前stage1有coalesce(100),那么当前stage1只会启动100个task,一个task处理
//10个分区(10个分区作为一组);因此coalesce后的分区数不要与原分区数相差太大,否则运行很慢甚至OOM;
val res = rdd.coalesce(100)
val res = rdd.repartition(100)  // 触发shuffle的重分区,根据随机key分区,等价于rdd.coalesce(100, true);
val res = rdd.partitionBy(new HashPartitioner(100)) // kv算子,根据key的hash值分区,可以自定义分区器;
// 重分区,并使分区内有序;比先repartition再sort的性能高,因为这个是在shuffle时边shuffle边排序;
val res = rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3))  // kv算子
  • 其他:
val res = rdd.filter(x=>x=="name")  // 过滤
val res = rdd.distinct()  // 去重
// 先是分区间有序,然后分区内有序,相同key会放到同一个分区;
// sortBy采用抽样来确定分区的边界,从而使分区数据均衡;抽样时用到了collect会生成job;
// 1,sample:创建RangePartitioner,对输入数据的key做sample来估算key的分布,然后排序切分出range
// 2,shuffle write:用上面的RangePartitioner对数据重分区,使相同范围数据在同一分区,即分区间有序;
// 3,shuffle read:每个reduce拉取自己分区的数据,然后分区内排序;
// 最后就达到了分区间有序和分区内有序,也就是全局有序;
// 难点就在于如何确定range的边界,以及快速将一个值映射到partition里,解决思路是:通过抽样来确定
// range的边界,通过字典树来构建索引,来快速找到分区;
val res = rdd.sortBy(x=>x._1, ascending=false)  // 排序,第二个参数:是否升序;
// 由行动算子触发;task在读取分区时,先从cacheManager判断是否有缓存,有就直接获取,没有就计算
// 获取缓存时,先通过(rddId, 分区Id)到blockManager中查找block信息,有就直接获取,没有就计算
// 此时重新计算会加锁,因为可能会有多个线程同时读取(例如笛卡尔积),获取到锁的线程会计算该分区的数据
// 并缓存起来,后续的线程直接读缓存;用show和take触发的缓存,只会缓存计算的分区,不会缓存所有分区
val res = rdd.persist(StorageLevel)  // 缓存,默认是内存
val res = rdd.union(rdd2)  // 并集,不会触发shuffle
val res = rdd.intersection(rdd2)  // 交集
val res = rdd.subtract(rdd2)  // 差集
val res = rdd.glom()  // 将一个分区的数据合并为一个数组
val res = rdd.sample(false, 0.1)  // 抽样,(是否可放回,抽样比例[0, 1]),0.1=10%;
val res = rdd.toDebugString  // 打印rdd的

2.2.2 行动算子
  • 有返回值:
val res = rdd.count()  // 求数量
val res = rdd.collect()  // 将数据拉取到driver端
val res = rdd.take(10)  // 返回前10条数据,只计算部分分区
val res = rdd.reduce((x, y)=>x+y)  // 聚合算子,直接返回聚合后的结果
val res = rdd.countByKey()  // kv算子,统计key出现的次数,返回返回Map[(String, Long)]
val res = rdd.countByValue()  // 统计数据出现的次数,返回Map[(String, Long)]
  • 无返回值:
val res = rdd.foreach()  // 遍历每条数据,类似于map,但是没有返回值
val res = rdd.saveAsTextFile("path")  // 保存到磁盘


2.3 DataFrame常用算子

  • **select:**select中只能有列名,因为参数类型为Column;
df.select("name", "age")
df.select(col("name").as("rename"))    // col("name") 等价于 $"name"
df.select($"name".as("rename"))
df.select(Column("*"))  // 获取所有列
// 可以在select中使用的方法
df.select(concat($"name", $"id"))     // concat中只能是列名,不能是字符串;concat_ws同理
// 获取struct类型中的字段;例如:(id, (name, age))类型的RDD转为(id, info)的DF,则DF的info字段就是
// struct类型,此时struct中的字段名默认是 '_1', '_2';可以通过col("info").getField("_1")来获取name字段;
df.select(col("struct").getField("id"))
  • **selectExpr:**执行字符串格式的SQL表达式;
df.selectExpr("concat(name, 'sd') as name", "id / 10 as id")   // 直接解析SQL字符串
  • **where / filter:**过滤
df.where($"id" =!= 2)
df.where($"id" === 3)
df.where($"id">=3 and $"name" =!= "a")  // and 等价于 &&
df.where("id>=2 and name != 'a' ")   // 可以直接执行SQL字符串
  • **sort:**排序
// 全局排序;先是分区有序,然后分区内有序,相同key会放到同一个分区;
df.sort($"name".desc)
df.sortWithinPartitions($"id")   // 分区内排序
  • **group by:**分组,分组后只能接聚合算子
df.groupBy($"name").agg("id" -> "max", "id" -> "sum")  //  "id"->"max" 等价于 max(id)
df.groupBy($"name").agg(collect_set($"id").as("ids"), count(lit(1)).as("cnt"))
  • **join:**连接
// 注意,这里不能把 Seq("name")改为  $"name",会报错;使用Seq("key")时,右表的key会自动去掉;
df1.join(df2, Seq("name"), "left")
df2.as("dfa").join(df2.as("dfb"), $"dfa.name" === $"dfb.name", "left")  // 给表取别名,然后自定义连接字段
df1.join(df2)  // 不加连接条件的就是笛卡尔积,必须设spark.sql.crossJoin.enabled为true;
  • **over():**开窗函数
val window = Window.partitionBy("name").orderBy($"id".desc)
val mid = df.withColumn("rnk", row_number().over(window))
  • 对列的操作
df.withColumn("gid", $"id"+1)   // 增加一列,列名为gid,如果gid列名存在,则会更新该列;
df.withColumnRenamed("name", "new_name")   // 重命名
df.drop("name")  // 删除一列
  • read:
// 读取CSV文件,option("header", "true")表示csv文件中包含header信息,自动加载第一行作为header
val csv = spark.read.option("header", "true").csv("path")
// 读取resource中的文件,并转为rdd;spark的read只支持读取本地和分布式文件,所以用下面的方式;
val source = this.getClass.getClassLoader.getResourceAsStream("sample.csv")
val lines = Source.fromInputStream(source)("UTF-8").getLines()
val rdd = spark.sparkContext.parallelize(lines)


三、自定义函数

  • **UDF:**自定义基本函数
// 通过实名函数创建
spark.sqlContext.udf.register("myudf", myUdf _)
// 通过匿名函数创建
spark.sqlContext.udf.register("myudf", (a: String, b: Int)=>{"sd_" + a + b})
// 使用
df.selectExpr("myudf(name, id) as str")

def myUdf(a: String, b: Int): String ={   // 自定义函数最多支持22个参数
val res = "sd_" + a + b
res
}
  • **UDAF:**自定义聚合函数;需要实现UserDefinedAggregateFunction或者 类型安全的Aggregator接口
// 注册自定义聚合函数
spark.sqlContext.udf.register("avgudaf", new AvgUDAF)
df.groupBy().agg("id" -> "avgudaf")

// 定义自定义聚合函数
class AvgUDAF extends UserDefinedAggregateFunction{
//输入的数据类型
override def inputSchema: StructType = {
StructType(Array(StructField("id",IntegerType)))
}
//中间聚合处理时,所处理的数据类型
override def bufferSchema: StructType = {
StructType(Array(StructField("sum", DoubleType), StructField("count", IntegerType)))
}
//函数的返回类型
override def dataType: DataType = {
DoubleType
}
// 是否为幂等函数
override def deterministic: Boolean = {
true
}
//为每个分组的数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0.0  // 初始化sum
buffer(1)=0    // 初始化count
}
//指的是,每个分组,有新的值进来时,如何进行分组的聚合计算
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val in = input.getInt(0).toDouble   // 获取输入的id
buffer(0) = buffer.getDouble(0) + in   // buffer(0)就是sum,buffer(1)就是count
buffer(1) = buffer.getInt(1) + 1
}
//由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update
//但是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val sum1 = buffer1.getDouble(0)
val sum2 = buffer2.getDouble(0)
val count1 = buffer1.getInt(1)
val count2 = buffer2.getInt(1)
buffer1(0) = sum1 + sum2
buffer1(1) = count1 + count2
}
//一个分组的聚合值,如何通过中间的聚合值,最后返回一个最终的聚合值
override def evaluate(buffer: Row): Any = {
val sum = buffer.getDouble(0)
val count = buffer.getInt(1)
val avg = (sum / count).formatted("%.2f").toDouble
avg
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: