SparkSQL操作Hive Table(enableHiveSupport())
2017-11-03 14:23
537 查看
Spark SQL支持对Hive的读写操作。然而因为Hive有很多依赖包,所以这些依赖包没有包含在默认的Spark包里面。如果Hive依赖的包能在classpath找到,Spark将会自动加载它们。需要注意的是,这些Hive依赖包必须复制到所有的工作节点上,因为它们为了能够访问存储在Hive的数据,会调用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xml、core-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目录下面。
当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。
注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。
如果使用eclipse运行上述代码的话需要添加spark-hive的jars,下面是maven的配置:
否则的话会遇到下面错误:
与不同版本Hive Metastore的交互
Spark SQL对Hive的支持其中一个最重要的部分是与Hive metastore的交互,使得Spark SQL可以访问Hive表的元数据。从Spark 1.4.0版本开始,Spark SQL使用下面的配置可以用于查询不同版本的Hive metastores。需要注意的是,本质上Spark SQL会使用编译后的Hive 1.2.1版本的那些类来用于内部操作(serdes、UDFs、UDAFs等等)。
当使用Hive时,必须初始化一个支持Hive的SparkSession,用户即使没有部署一个Hive的环境仍然可以使用Hive。当没有配置hive-site.xml时,Spark会自动在当前应用目录创建metastore_db和创建由spark.sql.warehouse.dir配置的目录,如果没有配置,默认是当前应用目录下的spark-warehouse目录。
注意:从Spark 2.0.0版本开始,hive-site.xml里面的hive.metastore.warehouse.dir属性已经被spark.sql.warehouse.dir替代,用于指定warehouse的默认数据路径(必须有写权限)。
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = "/spark-warehouse"; // init spark session with hive support SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .master("local[*]") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // only showing top 20 rows // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DaraFrames are of type Row, which lets you to access each column by ordinal. Dataset<String> stringsDS = sqlDF.map(row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List<Record> records = new ArrayList<Record>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ... // only showing top 20 rows
如果使用eclipse运行上述代码的话需要添加spark-hive的jars,下面是maven的配置:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0< 4000 span class="hljs-tag"></version> </dependency>
否则的话会遇到下面错误:
Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found. at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:815) at JavaSparkHiveExample.main(JavaSparkHiveExample.java:17)
与不同版本Hive Metastore的交互
Spark SQL对Hive的支持其中一个最重要的部分是与Hive metastore的交互,使得Spark SQL可以访问Hive表的元数据。从Spark 1.4.0版本开始,Spark SQL使用下面的配置可以用于查询不同版本的Hive metastores。需要注意的是,本质上Spark SQL会使用编译后的Hive 1.2.1版本的那些类来用于内部操作(serdes、UDFs、UDAFs等等)。
相关文章推荐
- SparkSQL On Yarn with Hive,操作和访问Hive表
- Spark SQL1.2与Hive互通操作
- Spark SQL Hive Support Demo
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- 第76课:Spark SQL基于网站Log的综合案例实战之Hive数据导入、Spark SQL对数据操作每天晚上20:00YY频道现场授课频道68917580
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL之Hive操作
- SparkSql1.5.2操作hive1.2
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- sparksql 操作hive
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表
- SparkSQL On Yarn with Hive,操作和访问Hive表