如何解决spark写hive慢的问题
2016-05-31 10:13
3291 查看
在使用spark写hive过程中,发现最耗时的部分是将产生的结果写入hive,举个例子,对3g*1G表的join来讲,将结果使用以下方式直接写入hive表需要超过半小时的时间:
dataframe.registerTempTable("result")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from result""")
而整个结果数据的产生只需要4分钟左右的时间,比如以下方式:将结果以textfile存入hdfs:
result.rdd.saveAsTextFile(output_tmp_dir)
由此可见,对hive的写入操作耗用了大量的时间。
对此现象的优化可以是,将文件存为符合hive table文件的格式,然后使用hive load将产生的结果文件直接move到指定目录下。代码如下:
result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
详解:
result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir):
hive column默认分隔符在scala/java中的表示为“/001”,r.mkString("/001")既是将column以分隔符/001进行分割,hive在导入时会自动识别。
repartition(partitions)是为了防止hdfs中产生大量小文件。partitions的设定与最终结果大小有关,一般是result_size/hdfs_block_size。
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
此处使用hive load data命令,将hdfs文件load到hive表中。后台操作为直接将目录下的文件移到hive table所在目录,所以只是hdfs move数据的过程,执行非常快。
需要注意的是,此处要求hive建表时,已textfile格式建表。orc的方式不支持。对orc的表,可以建立临时表使用textfile临时存储,然后用以下命令进行导入:
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $tmp_table partition (dt='$dt')""")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from $tmp_table where dt='$dt'""")
在资源配置为--num-executors 20 --executor-cores 4,结果数据为1.8g的情况下,需要额外耗时50s。好处是结果数据使用列式、压缩方式存储,压缩比12.7左右。
使用优化后的方式,原有test case的耗时从半小时降到4分钟,效率提升明显。
dataframe.registerTempTable("result")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from result""")
而整个结果数据的产生只需要4分钟左右的时间,比如以下方式:将结果以textfile存入hdfs:
result.rdd.saveAsTextFile(output_tmp_dir)
由此可见,对hive的写入操作耗用了大量的时间。
对此现象的优化可以是,将文件存为符合hive table文件的格式,然后使用hive load将产生的结果文件直接move到指定目录下。代码如下:
result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
详解:
result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir):
hive column默认分隔符在scala/java中的表示为“/001”,r.mkString("/001")既是将column以分隔符/001进行分割,hive在导入时会自动识别。
repartition(partitions)是为了防止hdfs中产生大量小文件。partitions的设定与最终结果大小有关,一般是result_size/hdfs_block_size。
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
此处使用hive load data命令,将hdfs文件load到hive表中。后台操作为直接将目录下的文件移到hive table所在目录,所以只是hdfs move数据的过程,执行非常快。
需要注意的是,此处要求hive建表时,已textfile格式建表。orc的方式不支持。对orc的表,可以建立临时表使用textfile临时存储,然后用以下命令进行导入:
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $tmp_table partition (dt='$dt')""")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from $tmp_table where dt='$dt'""")
在资源配置为--num-executors 20 --executor-cores 4,结果数据为1.8g的情况下,需要额外耗时50s。好处是结果数据使用列式、压缩方式存储,压缩比12.7左右。
使用优化后的方式,原有test case的耗时从半小时降到4分钟,效率提升明显。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 分享Hive的一份胶片资料
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- 将Hive的默认数据库Derby改为Postgresql