DataSet数据集在使用sql()时,无法使用map,flatMap等转换算子的解决办法
2017-08-08 16:40
363 查看
摘要我们在使用spark的一个流程是:利用spark.sql()函数把数据读入到内存形成DataSet[Row](DataFrame)由于Row是新的spark数据集中无法实现自动的编码,需要对这个数据集进行编码,才能利用这些算子进行相关的操作,如何编码是一个问题,在这里就把这几个问题进行总结一下。报的错误:error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.报这个错误一般就是我们在使用算子时其返回值的数据类型往往不是spark通过自身的反射能完成的自动编码部分,比如通过map算子,我们在map算子的函数的返回值类型是Map类型的,就会出现上面的问题,因为Map集合类不在:基本的类型和String,case class和元组的范围之内,spark内部不能通过反射完成自动编码。
出现这个问题的原因spark2.0以后的版本采用的是新的分布式数据集DataSet,其中DataFrame是DataSet[Row]的别名形式。而新的数据集采用了很多的优化,其中一个就是利用了Tungsten execution engine的计算引擎,这个计算引擎采用了很多的优化。其中一个就是自己维护了一个内存管理器,从而使计算从java jvm解脱出来了,使得内存的优化得到了很大的提升。同时新的计算引擎,把数据存储在内存中是以二进制的形式存储的,大部分所有的计算都是在二进制数据流上进行的,不需要把二进制数据流反序列化成java对象,然后再把计算的结果序列化成二进制数据流,而是直接在二进制流上进行操作,这样的情况就需要我们存在一种机制就是java对象到二进制数据流的映射关系,不然我们不知道二进制流对应的数据对象是几个字节,spark这个过程是通过Encoders来完成的,spark自身通过反射完成了一部分的自动编码过程:基本的类型和String,case class和元组,对于其他的集合类型或者我们自定义的类,他是无法完成这样的编码的。需要我们自己定义这样的编码也就是让其拥有一个schema。
解决这个问题方式方法一:
这样就是把其转化为RDD,利用RDD进行操作,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).rdd.map(myfunction)
方法二:
让其自动把DataSet[Row]转化为DataSet[P],如果Row里面有复杂的类型出现的话。
case class Orders(id: String, user_id: String)
//这个case class要定义在我们的单例对象的外面
object a {
def main(args: Array[String]): Unit ={
import spark.implicits._
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).as[Orders].map(myfunction)
}
}
方式三:
自定义一个schema,然后利用RowEncoder进行编码。这只是一个例子,里面的类型其实都可以通过spark的反射自动完成编码过程。
import spark.implicits._
val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true))))
val encoders = RowEncoder(schema)
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map(row => row)(encoders)
方法四:
直接利用scala的模式匹配的策略case Row来进行是可以通过的,原因是case Row()scala模式匹配的知识,这样可以知道集合Row里面拥有多少个基本的类型,则可以通过scala就可以完成对Row的自动编码,然后可以进行相应的处理。
import spark.implicits._
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)}
这个得到的schema为:
orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
如果换成这样:
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)}
得到的schema为:
orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array<string>]
可以看出:spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase
出现这个问题的原因spark2.0以后的版本采用的是新的分布式数据集DataSet,其中DataFrame是DataSet[Row]的别名形式。而新的数据集采用了很多的优化,其中一个就是利用了Tungsten execution engine的计算引擎,这个计算引擎采用了很多的优化。其中一个就是自己维护了一个内存管理器,从而使计算从java jvm解脱出来了,使得内存的优化得到了很大的提升。同时新的计算引擎,把数据存储在内存中是以二进制的形式存储的,大部分所有的计算都是在二进制数据流上进行的,不需要把二进制数据流反序列化成java对象,然后再把计算的结果序列化成二进制数据流,而是直接在二进制流上进行操作,这样的情况就需要我们存在一种机制就是java对象到二进制数据流的映射关系,不然我们不知道二进制流对应的数据对象是几个字节,spark这个过程是通过Encoders来完成的,spark自身通过反射完成了一部分的自动编码过程:基本的类型和String,case class和元组,对于其他的集合类型或者我们自定义的类,他是无法完成这样的编码的。需要我们自己定义这样的编码也就是让其拥有一个schema。
解决这个问题方式方法一:
这样就是把其转化为RDD,利用RDD进行操作,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).rdd.map(myfunction)
方法二:
让其自动把DataSet[Row]转化为DataSet[P],如果Row里面有复杂的类型出现的话。
case class Orders(id: String, user_id: String)
//这个case class要定义在我们的单例对象的外面
object a {
def main(args: Array[String]): Unit ={
import spark.implicits._
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).as[Orders].map(myfunction)
}
}
方式三:
自定义一个schema,然后利用RowEncoder进行编码。这只是一个例子,里面的类型其实都可以通过spark的反射自动完成编码过程。
import spark.implicits._
val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true))))
val encoders = RowEncoder(schema)
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map(row => row)(encoders)
方法四:
直接利用scala的模式匹配的策略case Row来进行是可以通过的,原因是case Row()scala模式匹配的知识,这样可以知道集合Row里面拥有多少个基本的类型,则可以通过scala就可以完成对Row的自动编码,然后可以进行相应的处理。
import spark.implicits._
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)}
这个得到的schema为:
orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
如果换成这样:
val orderInfo1 = spark.sql(
"""
|SELECT
|o.id,
|o.user_id
|FROM default.api_order o
|limit 100
""".stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)}
得到的schema为:
orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array<string>]
可以看出:spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase
相关文章推荐
- 使用PL/SQL Developer 报错:ORA-01460 :转换请求无法实现或不合理 解决办法!!!
- 使用PL/SQL Developer 报错:ORA-01460 :转换请求无法实现或不合理 解决办法!!!
- RedGate SQL Prompt 5.3.4.1 Format SQL功能无法使用解决办法
- Mybatis 中sql 使用in(#{item})无法查询解决办法
- Mybaties 的sql语句无法使用聚合函数的解决办法
- 关于PL\SQL无法在64位Client下使用的原因及解决办法
- SQL Server Management Studio 过期无法使用解决办法
- 将数据库从SQL2000迁移到SQL2005时,无法查看关系图的解决办法
- 窗体样式使用WS_EX_LAYERED后,无法绘制windows控件的解决办法
- 使用脚本使IE无法打开INTERNET站点的解决办法
- PL/SQL Developer中,存储过程无法调试的问题解决办法
- 转摘--MS SQL Server 2000 数据库使用备份还原造成的孤立用户和对象名‘xxx’无效的错误的解决办法
- 无法使用ping或ipconfig命令的解决办法
- sql 2000 "无法执行查询"的解决办法
- 使用PLSQL Developer时,“ORA-12154: TNS:无法解析指定的连接标识符”问题的一个解决办法
- winxp[sp2]下Apache无法正常使用实战得出的解决办法
- sql 将 varchar 值转换为数据类型为 int 的列时发生语法错误 的解决办法
- PhpMyAdmin中无法导入sql文件的解决办法
- 窗体样式使用WS_EX_LAYERED后,无法绘制windows控件的解决办法
- iis提示“另一个程序正在使用此文件,进程无法访问”解决办法