您的位置:首页 > 数据库

spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案

2017-11-23 16:37 766 查看

1、背景:

    控制上游文件个数每天7000个,每个文件大小小于256M,50亿条+,orc格式。查看每个文件的stripe个数,500个左右,查询命令:hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999
-files -blocks;

stripe个数查看命令:hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less

2、问题出现:

    通过Spark SQL读取orc格式文件,从spark作业提交到计算出Partition,开始执行Task,间隔时间太长。

    频繁打印如下日志:

17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942)

17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], --

17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: 

    问题抽象:如果执行如下简单SQL 也会出现作业提交后ApplicationMaster(Driver)启动了,作业Task迟迟不执行,Partition不能计算出来。SparkUI刷不出来DAU图,看不到Stage相关信息。

SELECT * from table where partition_date=2017-11-11 limit 1;

3、问题分析

    初步分析:Driver读取DataNode的数据,通过分析GC日志发现:确认Driver读取了DataNode上的数据(orc文件的head信息),导致Driver产生了full GC。

    源码跟踪分析:发现和spark读取orc文件的策略有关系。

    查看HiveConf.java发现Spark读取orc文件默认采用HYBRID策略。

HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet(new String[]{"HYBRID", "BI", "ETL"}),

"This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed

to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in

split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based

on heuristics."),


    查看OrcInputFormat.java文件发现HYBRID切分策略代码如下:

public SplitStrategy call() throws IOException {
final SplitStrategy splitStrategy;
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
context.conf, context.transactionList);
List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
Path base = dirInfo.getBaseDirectory();
List<FileStatus> original = dirInfo.getOriginalFiles();
boolean[] covered = new boolean[context.numBuckets];
boolean isOriginal = base == null;
// if we have a base to work from
if (base != null || !original.isEmpty()) {
// find the base files (original or new style)
List<FileStatus> children = original;
if (base != null) {
children = SHIMS.listLocatedStatus(fs, base,
AcidUtils.hiddenFileFilter);
}
long totalFileSize = 0;
for (FileStatus child : children) {
totalFileSize += child.getLen();
AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
(child.getPath(), context.conf);
int b = opts.getBucket();
// If the bucket is in the valid range, mark it as covered.
// I wish Hive actually enforced bucketing all of the time.
if (b >= 0 && b < covered.length) {
covered = [b]true;
}
}
int numFiles = children.size();
long avgFileSize = totalFileSize / numFiles;
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
case ETL:
// ETL strategy requested through config
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
default:
// HYBRID strategy
if (avgFileSize > context.maxSize) {
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
} else {
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
}
break;
}
} else {
// no base, only deltas
splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
}
return splitStrategy;
}
}


    HYBRID策略:Spark Driver启动的时候,会去nameNode读取元数据,根据文件总大小和文件个数计算一个文件的平均大小,如果这个平均值大于默认256M的时候就会触发ETL策略。ETL策略就会去DataNode上读取orc文件的head等信息,如果stripe个数多或元数据信息太大就会导致Driver 产生FUll GC,这个时候就会表现为Driver启动到Task执行间隔时间太久的现象。

4、解决方案:

spark 1.6.2:

val hiveContext = new HiveContext(sc)
// 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
// 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
hiveContext.setConf("hive.exec.orc.default.stripe.size","268435456")
// 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time insplit generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies basedon heuristics."),
// 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
hiveContext.setConf("hive.exec.orc.split.strategy", "BI")


spark2.2.0:

// 创建一个支持Hive的SparkSession
val sparkSession = SparkSession
.builder()
.appName("PvMvToBase")
// 默认64M,即代表在压缩前数据量累计到64M就会产生一个stripe。与之对应的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是产生一个stripe。
// 调整这个参数可控制单个文件中stripe的个数,不配置单个文件stripe过多,影响下游使用,如果配置了ETL切分策略或启发式触发了ETL切分策略,就会使得Driver读取DataNode元数据太大,进而导致频繁GC,使得计算Partition的时间太长难以接受。
.config("hive.exec.orc.default.stripe.size", 268435456L)
// 总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time insplit generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies basedon heuristics."),
// 如果不配置,当orc文件大小大于spark框架估算的平均值256M时,会触发ETL策略,导致Driver读取DataNode数据切分split花费大量的时间。
.config("hive.exec.orc.split.strategy", "BI")
.enableHiveSupport()
.getOrCreate()

Spark Shuffle六大问题 fetch操作、数据存储、文件个数、什么排序算法简单介绍
MapReduce过程详解及其性能优化
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐