org.apache.spark.shuffle.FetchFailedException:Failed to connect to异常
2017-08-01 00:41
991 查看
最近在做Spark的性能优化,测试使用不同CPU核数和内存对计算性能的影响,由于是在测试集群进行测试的,硬件配置比生产上面的要少和低,遇到了不少的问题,其中一个值得说一下的就是org.apache.spark.shuffle.FetchFailedException:Failed to connect to /xxx:43301
1. 运行环境
1.1 硬件
3台测试服务器,分别为A,B,C,每台4核,16GB内存
每台部署HDFS的DataNode和Spark的Worker
其中A同时部署了HDFS的NameNode
其中B同时部署了Spark的Master
其中C是Spark的Driver
1.2 软件
HDFS 2.7.3,集群
Spark 2.1.0,标准集群模式
Java 1.8.0_131
2. Spark启动参数
2.1 测试1
2.1.1 测试参数
spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g
spark.executor.memory 配置8g,默认是1g
spark.executor.cores 没有配置,默认使用Worker全部核数,这里是4
2.1.2 测试结果
Spark集群每个Worker创建了1个Executor,每个Executor使用了4核和8g内存,可以得出结果,耗时2小时
2.2 测试2
2.2.1 测试参数
spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g
因为想每个Worker创建多于1个Executor,测试多个Executors是否能提高性能,所以修改以下参数:
spark.executor.memory 配置4g,是测试1的一半
spark.executor.cores 配置2,是测试1的一半
2.2.2 测试结果
Spark集群每个Worker创建了2个Executors(spark.cores.max/spark.executor.cores=4/2=2),每个Executor使用了2核和4g内存,总使用资源和测试1是一样的,也就是每台服务器的2个Executors总共使用了4核和8g内存,但是遇到以下异常:
3. 异常分析
因为Executor可用的资源减少了一半,shuffle执行的时间变长,内存使用过多导致无响应心跳,超过默认的spark.network.timeout=120s,对应的Executor会被移除,任务丢失:
Spark的DAGScheduler会尝试提交失败的task到其它的Executors,但是由于其它的Executors也是使用同样的配置资源,最终的任务还是会失败。
4. 解决方案
减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存
增大spark.network.timeout,从而允许有更多时间去等待心跳响应
增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少
同时增大spark.executor.memory,保证每个Executor有足够的可用内存
增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)
另外可以参考官方的shuffle配置参数:http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
1. 运行环境
1.1 硬件
3台测试服务器,分别为A,B,C,每台4核,16GB内存
每台部署HDFS的DataNode和Spark的Worker
其中A同时部署了HDFS的NameNode
其中B同时部署了Spark的Master
其中C是Spark的Driver
1.2 软件
HDFS 2.7.3,集群
Spark 2.1.0,标准集群模式
Java 1.8.0_131
2. Spark启动参数
2.1 测试1
2.1.1 测试参数
spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g
spark.executor.memory 配置8g,默认是1g
spark.executor.cores 没有配置,默认使用Worker全部核数,这里是4
2.1.2 测试结果
Spark集群每个Worker创建了1个Executor,每个Executor使用了4核和8g内存,可以得出结果,耗时2小时
2.2 测试2
2.2.1 测试参数
spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g
因为想每个Worker创建多于1个Executor,测试多个Executors是否能提高性能,所以修改以下参数:
spark.executor.memory 配置4g,是测试1的一半
spark.executor.cores 配置2,是测试1的一半
2.2.2 测试结果
Spark集群每个Worker创建了2个Executors(spark.cores.max/spark.executor.cores=4/2=2),每个Executor使用了2核和4g内存,总使用资源和测试1是一样的,也就是每台服务器的2个Executors总共使用了4核和8g内存,但是遇到以下异常:
[WARN][TaskSetManager] Lost task 6.0 in stage 4.0 (TID 307, xxx, executor 0): FetchFailed(BlockManagerId(1, xxx, 33557, None), shuffleId=0, mapId=7, reduceId=6, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /xxx:43301 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
3. 异常分析
因为Executor可用的资源减少了一半,shuffle执行的时间变长,内存使用过多导致无响应心跳,超过默认的spark.network.timeout=120s,对应的Executor会被移除,任务丢失:
[WARN][HeartbeatReceiver] Removing executor 5 with no recent heartbeats: 120504 ms exceeds timeout 120000 ms [ERROR][TaskSchedulerImpl] Lost executor 5 on xxx: Executor heartbeat timed out after 120504 ms [WARN][TaskSetManager] Lost task 8.0 in stage 4.0 (TID 309, xxx, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122504 ms
Spark的DAGScheduler会尝试提交失败的task到其它的Executors,但是由于其它的Executors也是使用同样的配置资源,最终的任务还是会失败。
4. 解决方案
减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存
增大spark.network.timeout,从而允许有更多时间去等待心跳响应
增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少
同时增大spark.executor.memory,保证每个Executor有足够的可用内存
增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)
另外可以参考官方的shuffle配置参数:http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
相关文章推荐
- org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 5
- Spark ERROR: org.apache.spark.shuffle.FetchFailedException 问题追查
- tomcat应用org.apache.catalina.LifecycleException: Failed to stop component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[]]异常的根本原因
- org.apache.catalina.LifecycleException: Failed to start component 异常
- android异常解决:org.apache.http.conn.HttpHostConnectException: Connection to http://127.0.0.1 refused
- Hive出现异常 FAILED: Error In Metadata: Java.Lang.RuntimeException: Unable To Instantiate Org.Apache.Had
- Kafka java client 连接异常(org.apache.kafka.common.errors.TimeoutException: Failed to update metadata )
- Spark Shuffle FetchFailedException异常
- 关于异常org.apache.catalina.LifecycleException: Failed to start
- 错误:Caused by:org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow.Available: 0, required: 21. To avoid this,
- org.apache.spark.sql.AnalysisException: Try to map struct<>to Tuple1 异常
- Hive On Spark报错:Failed to execute spark task, org.apache.hadoop.hive.ql.metadata.HiveException
- 解决org.apache.jasper.JasperException: Failed to load or instantiate TagLibraryVal
- org.apache.http.conn.ConnectTimeoutException: Connect to ajax.googleapis.com/173.194.72.95:80 timed
- java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start com
- 关于solr异常:org.apache.solr.client.solrj.SolrServerException: IOException occured when talking to server at: http:192.168.0.11/solr/的解决
- java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start com
- Maven使用tomcat7-maven-plugin插件run时出现错误: A child container failed during start java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component
- org.apache.jasper.JasperException: Failed to load or instantiate TagExtraInfo class: org.apache.struts.taglib.bean.CookieTei错误解决方法。
- java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start com