SparkSQL00_总体介绍_04_Scala Type与Catalyst Type类型转换(第一部分)
2015-10-10 11:59
399 查看
1. Scala Type和Catalyst Type的转换包括简单类型,比如String,Date, Number之间的转换;也包括集合类型,如Array、Map之间的转换。
2. Scala Type, CatalystType指的是什么
ScalaType指的是Scala和Java语言提供的数据类型,比如String, Date,Decimal,Array,Map等。
CatalystType指的是Catalyst框架提供的数据类型,用于SQL处理,它们都是DataType的子类,DataType本身继承自抽象类AbstractDataType,类的继承关系如下:
3. CatalystTypeConverter的代码:
CatalystTypeConverter定义了三个抽象方法toScala,toScalaImpl和toCatalystImpl
4. CatalystTypeConverter的具体实现之原子类型转换
4.1 源代码:
每个具体的converter实现都是object,因此可以作为单例进行方法调用。注意,Date类型不属于Primitive范畴
4.2 以BooleanConverter为例,看下这些原子Converter如何使用,即如何给方法进行输入,如何得到方法的输出
5. DateConverter的实现
Scala的Date类型,对应于Catalyst的Int类型
问题:CatalystType的类型是Any,为什么在它的方法实现中使用Int了?猜测的原因是Int是Any的子类,具体是什么语法现象,现在不确定(Java也有类型的语法现象)。一个简单的例子:
Java版本:
6. TimestampConverter
Scala的Timestamp类型与Catalyst的Long类型的转换
7. DecimalConverter
Scala的BigDecimal、JavaBigDecimal、Decimal与Catalyst的Decimal之间转换。通过Decimal转换得到的Scala的类型归一为JavaBigDecimal,它的实际类型是Java语言中定义BigDecimal
DecimalConverter有一个构造参数dataType,它的类型是DecimalType,DecimalType是一个case class,它有两方面的信息,case class DecimalType(precision: Int, scale: Int)
8. ArrayConverter
Scala的输入类型任意,
Scala的输出类型是Seq[Any]
Catalyst的类型是ArrayData,它是一个抽象类,有两个具体的类型,GenericArrayData和UnsafeArrayData
如果elementType是原子类型,则调用catalystValue.toArray[Any](elementType)进行处理
2. Scala Type, CatalystType指的是什么
ScalaType指的是Scala和Java语言提供的数据类型,比如String, Date,Decimal,Array,Map等。
CatalystType指的是Catalyst框架提供的数据类型,用于SQL处理,它们都是DataType的子类,DataType本身继承自抽象类AbstractDataType,类的继承关系如下:
3. CatalystTypeConverter的代码:
CatalystTypeConverter定义了三个抽象方法toScala,toScalaImpl和toCatalystImpl
/** * Converts a Scala type to its Catalyst equivalent (and vice versa). * * @tparam ScalaInputType The type of Scala values that can be converted to Catalyst.(Scala值的类型,是要转换到CatalystType类型吗?) * @tparam ScalaOutputType The type of Scala values returned when converting Catalyst to Scala.(Catalyst类型转换成目标Scala类型) * @tparam CatalystType The internal Catalyst type used to represent values of this Scala type. */ private abstract class CatalystTypeConverter[ScalaInputType, ScalaOutputType, CatalystType] extends Serializable { /** * Converts a Scala type to its Catalyst equivalent while automatically handling nulls * and Options. * final函数,这个转换是不能被重写的。参数是scala类型的值,类型是Any,即表示传入该函数的参数值的类型不受约束?不是!实际上是ScalaInputType或者Option[ScalaInputType] */ final def toCatalyst(@Nullable maybeScalaValue: Any): CatalystType = { if (maybeScalaValue == null) { null.asInstanceOf[CatalystType] } else if (maybeScalaValue.isInstanceOf[Option[ScalaInputType]]) { //如果maybeScalaValue是Option类型,Option中的元素类型是ScalaInputType val opt = maybeScalaValue.asInstanceOf[Option[ScalaInputType]] //通过类型转换,得到opt是Option[ScalaInputType]类型的变量 if (opt.isDefined) { //如果opt是Some,而不是None toCatalystImpl(opt.get) //通过Option的get方法,取出其值,然后传给toCatalystImpl方法处理,返回类型是CatalystType } else { null.asInstanceOf[CatalystType] //如果是None,则返回的值是null。即用CatalystType的null表示Scala类型的None } } else { toCatalystImpl(maybeScalaValue.asInstanceOf[ScalaInputType]) //调用toCatalystIpml处理maybeScalaValue } } /** * Given a Catalyst row, convert the value at column `column` to its Scala equivalent. * 将row的第column列的值转换成ScalaOutputType类型的值返回 * 这个方法同toCatalyst一样,也是final的,不过它的输入不是CatalystType类型的值,而是通过row+column二个维度确定的值 */ final def toScala(row: InternalRow, column: Int): ScalaOutputType = { //调用toScalaImpl进行处理 if (row.isNullAt(column)) null.asInstanceOf[ScalaOutputType] else toScalaImpl(row, column) } /** * Convert a Catalyst value to its Scala equivalent. * 这个方法的签名和toCatalyst完全一样,不过它是一个抽象方法,意味着不同的converter,需要提供个性化的转换实现,而不能由CatalystTypeConverter进行框架级别的统一实现 */ def toScala(@Nullable catalystValue: CatalystType): ScalaOutputType /** * Converts a Scala value to its Catalyst equivalent. * @param scalaValue the Scala value, guaranteed not to be null. * @return the Catalyst value. * 针对不为null的scalaValue进行转换 */ protected def toCatalystImpl(scalaValue: ScalaInputType): CatalystType /** * Given a Catalyst row, convert the value at column `column` to its Scala equivalent. * This method will only be called on non-null columns. * 针对不为null的row+column二维确定的值,返回它对应的ScalaOutputType类型的值 */ protected def toScalaImpl(row: InternalRow, column: Int): ScalaOutputType } private case class IdentityConverter(dataType: DataType) extends CatalystTypeConverter[Any, Any, Any] { override def toCatalystImpl(scalaValue: Any): Any = scalaValue override def toScala(catalystValue: Any): Any = catalystValue override def toScalaImpl(row: InternalRow, column: Int): Any = row.get(column, dataType) } private case class UDTConverter( udt: UserDefinedType[_]) extends CatalystTypeConverter[Any, Any, Any] { override def toCatalystImpl(scalaValue: Any): Any = udt.serialize(scalaValue) override def toScala(catalystValue: Any): Any = udt.deserialize(catalystValue) override def toScalaImpl(row: InternalRow, column: Int): Any = toScala(row.get(column, udt.sqlType)) } /** Converter for arrays, sequences, and Java iterables. */ private case class ArrayConverter( elementType: DataType) extends CatalystTypeConverter[Any, Seq[Any], ArrayData] { private[this] val elementConverter = getConverterForType(elementType) override def toCatalystImpl(scalaValue: Any): ArrayData = { scalaValue match { case a: Array[_] => new GenericArrayData(a.map(elementConverter.toCatalyst)) case s: Seq[_] => new GenericArrayData(s.map(elementConverter.toCatalyst).toArray) case i: JavaIterable[_] => val iter = i.iterator val convertedIterable = scala.collection.mutable.ArrayBuffer.empty[Any] while (iter.hasNext) { val item = iter.next() convertedIterable += elementConverter.toCatalyst(item) } new GenericArrayData(convertedIterable.toArray) } } override def toScala(catalystValue: ArrayData): Seq[Any] = { if (catalystValue == null) { null } else if (isPrimitive(elementType)) { catalystValue.toArray[Any](elementType) } else { val result = new Array[Any](catalystValue.numElements()) catalystValue.foreach(elementType, (i, e) => { result(i) = elementConverter.toScala(e) }) result } } override def toScalaImpl(row: InternalRow, column: Int): Seq[Any] = toScala(row.getArray(column)) }
4. CatalystTypeConverter的具体实现之原子类型转换
4.1 源代码:
每个具体的converter实现都是object,因此可以作为单例进行方法调用。注意,Date类型不属于Primitive范畴
private abstract class PrimitiveConverter[T] extends CatalystTypeConverter[T, Any, Any] { final override def toScala(catalystValue: Any): Any = catalystValue //为什么catalystValue可以直接作为scalaValue返回 final override def toCatalystImpl(scalaValue: T): Any = scalaValue //为什么scalaValue可以直接作为CatalystValue返回 } private object BooleanConverter extends PrimitiveConverter[Boolean] { override def toScalaImpl(row: InternalRow, column: Int): Boolean = row.getBoolean(column) } private object ByteConverter extends PrimitiveConverter[Byte] { override def toScalaImpl(row: InternalRow, column: Int): Byte = row.getByte(column) } private object ShortConverter extends PrimitiveConverter[Short] { override def toScalaImpl(row: InternalRow, column: Int): Short = row.getShort(column) } private object IntConverter extends PrimitiveConverter[Int] { override def toScalaImpl(row: InternalRow, column: Int): Int = row.getInt(column) } private object LongConverter extends PrimitiveConverter[Long] { override def toScalaImpl(row: InternalRow, column: Int): Long = row.getLong(column) } private object FloatConverter extends PrimitiveConverter[Float] { override def toScalaImpl(row: InternalRow, column: Int): Float = row.getFloat(column) } private object DoubleConverter extends PrimitiveConverter[Double] { override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column) }
4.2 以BooleanConverter为例,看下这些原子Converter如何使用,即如何给方法进行输入,如何得到方法的输出
5. DateConverter的实现
Scala的Date类型,对应于Catalyst的Int类型
/** * Scala输入类型是Date * Scala输出类型是Date * Catalyst的类型是Any,实际上是Int * */ private object DateConverter extends CatalystTypeConverter[Date, Date, Any] { //Scala类型(Date)转换为Catalyst类型,是几个Int override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue) //Catalyst类型()实际上是Int,转换为Scala类型的Date override def toScala(catalystValue: Any): Date = if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int]) //将row的指定列的值(Int类型)转换为Date类型的Scala值 override def toScalaImpl(row: InternalRow, column: Int): Date = DateTimeUtils.toJavaDate(row.getInt(column)) }
问题:CatalystType的类型是Any,为什么在它的方法实现中使用Int了?猜测的原因是Int是Any的子类,具体是什么语法现象,现在不确定(Java也有类型的语法现象)。一个简单的例子:
abstract class X[T] { def doIt(): T } class A1 extends X[Any] { override def doIt(): Int = 10 }
Java版本:
abstract class X<T>{ //泛型 abstract T doIt(); } public class A1 extends X<Object> { @Override Integer doIt() { return null; } }
6. TimestampConverter
Scala的Timestamp类型与Catalyst的Long类型的转换
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] { override def toCatalystImpl(scalaValue: Timestamp): Long = DateTimeUtils.fromJavaTimestamp(scalaValue) override def toScala(catalystValue: Any): Timestamp = if (catalystValue == null) null else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long]) override def toScalaImpl(row: InternalRow, column: Int): Timestamp = DateTimeUtils.toJavaTimestamp(row.getLong(column)) }
7. DecimalConverter
Scala的BigDecimal、JavaBigDecimal、Decimal与Catalyst的Decimal之间转换。通过Decimal转换得到的Scala的类型归一为JavaBigDecimal,它的实际类型是Java语言中定义BigDecimal
private class DecimalConverter(dataType: DecimalType) extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { override def toCatalystImpl(scalaValue: Any): Decimal = { val decimal = scalaValue match { case d: BigDecimal => Decimal(d) case d: JavaBigDecimal => Decimal(d) case d: Decimal => d } if (decimal.changePrecision(dataType.precision, dataType.scale)) { decimal } else { null } } override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal = row.getDecimal(column, dataType.precision, dataType.scale).toJavaBigDecimal }
DecimalConverter有一个构造参数dataType,它的类型是DecimalType,DecimalType是一个case class,它有两方面的信息,case class DecimalType(precision: Int, scale: Int)
8. ArrayConverter
Scala的输入类型任意,
Scala的输出类型是Seq[Any]
Catalyst的类型是ArrayData,它是一个抽象类,有两个具体的类型,GenericArrayData和UnsafeArrayData
/** Converter for arrays, sequences, and Java iterables. */ //参数elementType用于指定数据元素的类型 private case class ArrayConverter( elementType: DataType) extends CatalystTypeConverter[Any, Seq[Any], ArrayData] { //根据elementType,获得相应的elementConverter,就是我们前面定义的这些Converter,比如对于BooleanType,得到BooleanConverter private[this] val elementConverter = getConverterForType(elementType) override def toCatalystImpl(scalaValue: Any): ArrayData = { scalaValue match { case a: Array[_] => new GenericArrayData(a.map(elementConverter.toCatalyst)) //将数组a中的元素依次交给elementConverter.toCatalyst进行处理 case s: Seq[_] => new GenericArrayData(s.map(elementConverter.toCatalyst).toArray) case i: JavaIterable[_] => val iter = i.iterator val convertedIterable = scala.collection.mutable.ArrayBuffer.empty[Any] while (iter.hasNext) { val item = iter.next() convertedIterable += elementConverter.toCatalyst(item) } new GenericArrayData(convertedIterable.toArray) } } //将ArrayData转换成Seq类型 override def toScala(catalystValue: ArrayData): Seq[Any] = { if (catalystValue == null) { null } else if (isPrimitive(elementType)) { //如果elementType是原子类型,处理逻辑见下面的分析 catalystValue.toArray[Any](elementType) } else { val result = new Array[Any](catalystValue.numElements()) catalystValue.foreach(elementType, (i, e) => { result(i) = elementConverter.toScala(e) }) result } } //row+column对应的数据是数组 override def toScalaImpl(row: InternalRow, column: Int): Seq[Any] = toScala(row.getArray(column)) }
如果elementType是原子类型,则调用catalystValue.toArray[Any](elementType)进行处理
def toArray[T: ClassTag](elementType: DataType): Array[T] = { val size = numElements() val values = new Array[T](size) var i = 0 while (i < size) { if (isNullAt(i)) { values(i) = null.asInstanceOf[T] } else { values(i) = get(i, elementType).asInstanceOf[T] } i += 1 } values }
相关文章推荐
- Part 3 talking about constraint in sql
- Part 4 Identity Column in SQL Server
- Part 7 Joins in sql server
- Part 5 Select statement in sql server
- Part 6 Group by in sql server
- Part 8 Coalesce function in sql server
- Part 9 Union and union all in sql server
- mysql免安装方法
- 浅谈Oracle 11g 发行版2 新安装后关于登录的一些基本操作
- zabbix进行数据库备份以及表分区的方法
- 在Windows上部署使用Redis
- MongoDB(三)
- Redis --- Sentinel
- 数据库基础问答Q&A
- 3分钟wamp安装redis扩展超级简单
- Oracle误删表空间文件后数据库无法启动解决
- 阿里云安装PostgreSQL无法创建数据库实例
- MySQL删除重复记录只保留一条
- SQL注入——原理讲解
- MySQL自建函数之得到两个日期间连续的日期