spark使用udf给dataFrame新增列
2017-07-14 11:23
316 查看
在
打印结果如下:
可以看到
这样可以用
得到结果:
还可以写下更多的逻辑判断:
传入多个参数:
spark中给
dataframe增加一列的方法一般使用
withColumn
// 新建一个dataFrame val sparkconf = new SparkConf() .setMaster("local") .setAppName("test") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val tempDataFrame = spark.createDataFrame(Seq( (1, "asf"), (2, "2143"), (3, "rfds") )).toDF("id", "content") // 增加一列 val addColDataframe = tempDataFrame.withColumn("col", tempDataFrame("id")*0) addColDataframe.show(10,false)
打印结果如下:
+---+-------+---+ |id |content|col| +---+-------+---+ |1 |asf |0 | |2 |2143 |0 | |3 |rfds |0 | +---+-------+---+
可以看到
withColumn很依赖原来
dataFrame的结构,但是假设没有
id这一列,那么增加列的时候灵活度就降低了很多,假设原始
dataFrame如下:
+---+-------+ | id|content| +---+-------+ | a| asf| | b| 2143| | b| rfds| +---+-------+
这样可以用
udf写自定义函数进行增加列:
import org.apache.spark.sql.functions.udf // 新建一个dataFrame val sparkconf = new SparkConf() .setMaster("local") .setAppName("test") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val tempDataFrame = spark.createDataFrame(Seq( ("a, "asf"), ("b, "2143"), ("c, "rfds") )).toDF("id", "content") // 自定义udf的函数 val code = (arg: String) => { if (arg.getClass.getName == "java.lang.String") 1 else 0 } val addCol = udf(code) // 增加一列 val addColDataframe = tempDataFrame.withColumn("col", addCol(tempDataFrame("id"))) addColDataframe.show(10, false)
得到结果:
+---+-------+---+ |id |content|col| +---+-------+---+ |a |asf |1 | |b |2143 |1 | |c |rfds |1 | +---+-------+---+
还可以写下更多的逻辑判断:
// 新建一个dataFrame val sparkconf = new SparkConf() .setMaster("local") .setAppName("test") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val tempDataFrame = spark.createDataFrame(Seq( (1, "asf"), (2, "2143"), (3, "rfds") )).toDF("id", "content") val code :(Int => String) = (arg: Int) => {if (arg < 2) "little" else "big"} val addCol = udf(code) val addColDataframe = tempDataFrame.withColumn("col", addCol(tempDataFrame("id"))) addColDataframe.show(10, false)
+---+-------+------+ |1 |asf |little| |2 |2143 |big | |3 |rfds |big | +---+-------+------+
传入多个参数:
val sparkconf = new SparkConf() .setMaster("local") .setAppName("test") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val tempDataFrame = spark.createDataFrame(Seq( ("1", "2"), ("2", "3"), ("3", "1") )).toDF("content1", "content2") val code = (arg1: String, arg2: String) => { Try(if (arg1.toInt > arg2.toInt) "arg1>arg2" else "arg1<=arg2").getOrElse("error") } val compareUdf = udf(code) val addColDataframe = tempDataFrame.withColumn("compare", compareUdf(tempDataFrame("content1"),tempDataFrame("content2"))) addColDataframe.show(10, false)
+--------+--------+----------+ |content1|content2|compare | +--------+--------+----------+ |1 |2 |arg1<=arg2| |2 |3 |arg1<=arg2| |3 |1 |arg1>arg2 | +--------+--------+----------+
相关文章推荐
- spark scala 对dataframe进行过滤----filter方法使用
- 『 Spark 』7. 使用 Spark DataFrame 进行大数据分析
- Spark单机模式下使用mysql和DataFrame API
- spark dataframe新增列的处理
- spark dataFrame 新增一列函数withColumn
- DataFrame和SparkSql使用区别
- 『 Spark 』7. 使用 Spark DataFrame 进行大数据分析
- 本地模式使用JAVA SACLA 开发 Spark SQL DataFrame
- SPARK 使用Java 在IDE中实战RDD和DataFrame动态转换操作
- Spark注册UDF函数,用于DataFrame DSL or SQL
- 使用Spark DataFrame进行大数据处理
- 使用JAVA SACLA 开发 Spark SQL DataFrame IMF内部课程
- spark dataframe 新增列
- Spark DataFrame中基于List的排序UDF
- Spark注册UDF函数,用于DataFrame DSL or SQL
- Spark组件之SparkR学习2--使用spark-submit向集群提交R代码文件dataframe.R
- spark查询任意字段,并使用dataframe输出结果
- Spark 之 RDD、DataFrame和DataSet的区别是什么
- 使用pandas对两个dataframe进行join
- 一起学spark(12)-- 关于RDD和DataFrame 的缓存