您的位置:首页 > 数据库

day61-Spark SQL数据加载和保存内幕深度解密实战

2016-05-08 13:13 585 查看
Spark SQL加载数据

SparkSQl 数据输入输入输出主要是DataFrame,DataFrame提供了一些通用的load和save操作。

通过load可以创建出DataFrame;通过save可以将DataFrame数据保存到文件中或者说以具体的格式来指明要读取的文件是什么格式或者输出的数据是什么格式;直接读取 文件的指定类型:

 

SQLContext源码:

load 和save方法

@deprecated("Use read.load(path). This will be removed in Spark 2.0.",
"1.4.0")
def load(path:String): DataFrame = {

  read.load(path)

}

/**

 * Returns the dataset stored at path asa DataFrame, using the given data source.

 *

 * @group genericdata

 * @deprecated As of 1.4.0,replaced by `read().format(source).load(path)`.

 *             This will be removed in Spark 2.0.

 */
@deprecated("Useread.format(source).load(path). This will be removed in Spark 2.0.",
"1.4.0")
def load(path:String,
source:String): DataFrame = {

  read.format(source).load(path)

}
 

DataFrameReader源码:

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
  this.source = source
  this
}

 

* Loads input inas a
[[DataFrame]],for data sources that don't require a path (e.g. external

 * key-value stores).

 *

 * @since 1.4.0

 */
def load(): DataFrame = {

  val resolved=
ResolvedDataSource(

    sqlContext,

    userSpecifiedSchema = userSpecifiedSchema,

    partitionColumns = Array.empty[String],

    provider = source,

    options = extraOptions.toMap)

  DataFrame(sqlContext, LogicalRelation(resolved.relation))

}
 

ResolvedDataSource源码

object
ResolvedDataSource extends
Logging {

  /** A map to maintain backward compatibility in case wemove data sources around. */

  private val backwardCompatibilityMap
= Map(

    "org.apache.spark.sql.jdbc" ->
classOf[jdbc.DefaultSource].getCanonicalName,

    "org.apache.spark.sql.jdbc.DefaultSource"
-> classOf[jdbc.DefaultSource].getCanonicalName,

    "org.apache.spark.sql.json" ->
classOf[json.DefaultSource].getCanonicalName,

    "org.apache.spark.sql.json.DefaultSource"
-> classOf[json.DefaultSource].getCanonicalName,

    "org.apache.spark.sql.parquet" ->
classOf[parquet.DefaultSource].getCanonicalName,

    "org.apache.spark.sql.parquet.DefaultSource"
-> classOf[parquet.DefaultSource].getCanonicalName

  )
可以直接读取数据格式:jdbc,parquet

def
apply(

    sqlContext: SQLContext,

    provider: String,

    partitionColumns: Array[String],

    mode: SaveMode,

    options: Map[String,
String],

    data: DataFrame): ResolvedDataSource = {
DataFramtWriter源码:

/**

 * Specifies the behavior when data ortable already exists. Options include:

 *  - `SaveMode.Overwrite`: overwrite the existing data.

 *  - `SaveMode.Append`:append the data.

 *  - `SaveMode.Ignore`:ignore the operation (i.e. no-op).

 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.

 *

 * @since 1.4.0

 */
def mode(saveMode: SaveMode): DataFrameWriter = {

  this.mode
= saveMode

  this
}
 

 import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructField;

/**
* @author 作者 E-mail:
* @version 创建时间:2016年5月8日 上午7:54:28 类说明
*/
public class SparkSQLLoadSaveOps {

public static void main( String[] args ) {
SparkConf conf = new SparkConf().setMaster( "local" ).setAppName( "rdd2d" );
JavaSparkContext sc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext( sc );
DataFrame peopleDF = sqlContext.read().format("json").load("D://person.json");
peopleDF.select( "name" ).write().format( "json" ).save( "D://logs//personName.json" );

文件追加方式:是创建一个新文件还是append追加
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: