SparkSQL-2.0-新特性
2017-05-24 18:23
387 查看
1- SparkSession
The entry point into all functionality in Spark is the SparkSessionclass.
To create a basic
SparkSession, just use
SparkSession.builder():
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
SparkSessionin Spark 2.0 provides builtin support for Hive features including the ability to write queries using HiveQL, access
to Hive UDFs, and the ability to read data from Hive tables. To use these features, you do not need to have an existing Hive setup.
2-Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary viewthat is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database
global_temp,
// Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
3-Untyped User-Defined Aggregate Functions
Users have to extend the UserDefinedAggregateFunction abstractclass to implement a custom untyped aggregate function. For example, a user-defined average can look like:
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.
4-Type-Safe User-Defined Aggregate Functions
User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class.For example, a type-safe user-defined average can look like:
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
相关文章推荐
- 温故知新ASP.NET 2.0(C#)(4) - Cache&SqlCacheDependency(缓存和SqlCacheDependency特性)
- 温故知新ASP.NET 2.0(C#)(4) - Cache&SqlCacheDependency(缓存和SqlCacheDependency特性)
- [Spark2.0]Spark SQL, DataFrames 和Datasets指南
- 【Spark 2.0官方文档】Spark SQL、DataFrames以及Datasets指南
- Spark 2.0 -SQL 学习笔记
- ASP.NET 2.0 中的SqlCacheDependency特性
- 基于spark2.0整合spark-sql + mysql + parquet + HDFS
- 【转】Spark-Sql版本升级对应的新特性汇总
- Spark的RDD原理以及2.0特性的介绍
- spark2.0新特性
- ASP.NET 2.0 中的SqlCacheDependency特性
- ASP.NET 2.0 中的SqlCacheDependency特性
- ASP.NET 2.0 中的 SqlCacheDependency 特性
- Spark 2.0介绍:在Spark SQL中定义查询优化规则
- Spark-Sql版本升级对应的新特性汇总
- 温故知新ASP.NET 2.0(C#)(4) - Cache&SqlCacheDependency(缓存和SqlCacheDependency特性)
- ASP.NET 2.0 中的 SqlCacheDependency 特性
- JSM SqlHelper 2.0 新特性(C#)
- 初识Spark2.0之Spark SQL
- 基于spark2.0整合spark-sql + mysql + parquet + HDFS