Apache Hudi + AWS S3 + Athena实战
2020-08-03 19:25
1411 查看
Apache Hudi在阿里巴巴集团、EMIS Health,LinkNovate,Tathastu.AI,腾讯,Uber内使用,并且由Amazon AWS EMR和Google云平台支持,最近Amazon Athena支持了在Amazon S3上查询Apache Hudi数据集的能力,本博客将测试Athena查询S3上Hudi格式数据集。
1. 准备-Spark环境,S3 Bucket
需要使用Spark写入Hudi数据,登陆Amazon EMR并启动spark-shell:
$ export SCALA_VERSION=2.12 $ export SPARK_VERSION=2.4.4 $ spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}\ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) Type in expressions to have them evaluated. Type :help for more information. scala>
接着使用如下scala代码设置表名,基础路径以及数据生成器来生成数据。这里设置
basepath为
s3://hudi_athena_test/hudi_trips,以便后面进行查询
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips" val basePath = "s3://hudi_athena_test/hudi_trips" val dataGen = new DataGenerator
2. 插入数据
生成新的行程数据,导入DataFrame,并将其写入Hudi表
val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
3. 创建Athena数据库/表
Hudi内置表分区支持,所以在创建表后需要添加分区,安装
athenareader工具,其提供Athena多个查询和其他有用的特性。
go get -u github.com/uber/athenadriver/athenareader
接着创建
hudi_athena_test.sql文件,内容如下
DROP DATABASE IF EXISTS hudi_athena_test CASCADE; create database hudi_athena_test; CREATE EXTERNAL TABLE `trips`( `begin_lat` double, `begin_lon` double, `driver` string, `end_lat` double, `end_lon` double, `fare` double, `rider` string, `ts` double, `uuid` string ) PARTITIONED BY (`partitionpath` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://hudi_athena_test/hudi_trips' ALTER TABLE trips ADD PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco' PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo' PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai'
使用如下命令运行SQL语句
$ athenareader -q hudi_athena_test.sql
4. 使用Athena查询Hudi
如果没有错误,那么说明库和表在Athena中都已创建好,因此可以在Athena中查询Hudi数据集,使用
athenareader查询结果如下
athenareader -q "select * from trips" -o markdown
也可以带条件进行查询
athenareader -q "select fare,rider from trips where fare>20" -o markdown
5. 更新Hudi表再次查询
Hudi支持S3中的数据,回到spark-shell并使用如下命令更新部分数据
val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
运行完成后,使用
athenareader再次查询
athenareader -q "select * from trips" -o markdown
可以看到数据已经更新了
6. 限制
Athena不支持查询快照或增量查询,Hive/SparkSQL支持,为进行验证,通过spark-shell创建一个快照
spark. read. format("hudi"). load(basePath + "/*/*/*/*"). createOrReplaceTempView("hudi_trips_snapshot")
使用如下代码查询
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2)
使用Athena查询将会失败,因为没有物化
$ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime" SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist
根据官方文档,Athena支持查询Hudi数据集的Read-Optimized视图,同时,我们可以通过Athena来创建视图并进行查询,使用Athena在Hudi表上创建一个视图
$ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a
查询视图
$ athenareader -q "select fare,rider from fare_greater_than_40" FARE RIDER 43.4923811219014 rider-213 63.72504913279929 rider-284 90.25710109008239 rider-284 93.56018115236618 rider-213 49.527694252432056 rider-284 90.9053809533154 rider-284 98.3428192817987 rider-284
相关文章推荐
- 实战 | 将Apache Hudi数据集写入阿里云OSS
- AWS实战(一)之Java连接AWS的S3,实现数据上传
- Apache Spark技术实战之6 --Standalone部署模式下的临时文件清理
- aws s3加速
- Apache Curator入门实战
- 为什么PPIO作为去中心化存储,也能像阿里云、AWS S3一样,保证数据不丢失?
- Apache Cassandra static column 介绍与实战
- 使用 AWS Glue 和 Amazon Athena 实现无服务器的自主型机器学习
- Apache Storm技术实战之1 -- nimbus启动场景分析
- 【Lucene】Apache Lucene全文检索引擎架构之入门实战1
- Apache Spark技术实战之2 -- PackratParsers实例
- 机器学习实战 - ApacheCN
- apache-2.4.9安装与实战 推荐
- Apache Security-2-Basic Authentication(基本认证)简介及实战练习
- Apache ActiveMQ实战(2)-集群
- AWS s3 V4签名算法
- Apache Thrift入门简单实战(Java)
- Apache Shiro权限框架实战+项目案例+权限设计实现视频教程
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink实战基础0017--IDEA搭建非maven管理的集成开发环境002