spark-sql跑任务报错org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location f......_香山上的麻雀1008的博客-程序员秘密

技术标签: hive  spark  bug  scala  大数据  

spark-SQL跑任务报错

错误信息如下

19/10/17 18:06:50 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_e122_1568025476000_38356_01_000022 on host: node02. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

19/10/17 18:06:50 INFO BlockManagerMasterEndpoint: Trying to remove executor 21 from BlockManagerMaster.
19/10/17 18:06:50 INFO BlockManagerMaster: Removal of executor 21 requested
19/10/17 18:06:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 21
19/10/17 18:06:50 INFO TaskSetManager: Starting task 1.1 in stage 28.3 (TID 46, node02, executor 4, partition 1, NODE_LOCAL, 5066 bytes)
19/10/17 18:06:51 INFO BlockManagerInfo: Added broadcast_23_piece0 in memory on node02:27885 (size: 106.5 KB, free: 5.2 GB)
19/10/17 18:06:51 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 6 to xx.xx.xx.xx:30178
19/10/17 18:06:51 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 6 is 166 bytes
19/10/17 18:06:51 WARN TaskSetManager: Lost task 1.1 in stage 28.3 (TID 46, node02, executor 4): FetchFailed(null, shuffleId=6, mapId=-1, reduceId=166, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

)

上面的错误报完接着就是以下错误:
19/10/17 17:10:32 WARN TaskSetManager: Lost task 0.0 in stage 82.0 (TID 1855, node01, executor 5): FetchFailed(BlockManagerId(1, node01, 26969, None), shuffleId=13, mapId=1, reduceId=10, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostname/xx.xx.xx.xx:26969
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:513)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:444)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:776)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:737)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:899)
    at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:935)
    at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:313)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to node01/xx.xx.xx.xx:26969
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry$0(RetryingBlockFetcher.java:169)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: node01/xx.xx.xx.xx:26969
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    ... 2 more

)

解决方案:

增加executor内存,把spark.executor.memory由10G调到20G就不出现这个异常了

原因分析

shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。
shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决思路

知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。

  • 减少shuffle数据
    思考是否可以使用map side join或是broadcast join来规避shuffle的产生。
    将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
  • SparkSQL和DataFrame的join,group by等操作
    通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。
  • Rdd的join,groupBy,reduceByKey等操作
    通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。
  • 提高executor的内存
    通过spark.executor.memory适当提高executor的memory值。
    -是否存在数据倾斜的问题
    空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_26502245/article/details/108510730

智能推荐

Java类的加载过程_舒大飞的博客-程序员秘密

编译:即javac的过程,即把.java文件编译成.class文件,即编译成字节码文件,同时做一些类型以及格式的检查。     类只有在要运行的时候才会被加载进JVM,即编译后只有需要到这个类的时候才会把他加载进JVM运行,这种动态加载是依靠反射来实现的,一般来说一个class只会被加载一次,下一次就会从jvm的class缓存中获取,不会再去文件系统中去获取class文件了。

idea 在线添加插件 cmd_idea cmd_无、涯的博客-程序员秘密

idea 在线添加插件 cmdidea中有些好用的工具,如果想要添加 可以在 file -> Settings-> Plugins -> Browse repositories… 中搜索 选择安装.我的idea 版本: ULTIMATE 2018.1在idea中在线添加插件cmd,如下:安装完后需要重启idea 生效, 重启后右键bat文件会有 选项 Run cmd ...

AcWing-算法基础课总结_acwing官网入口_北极汪汪熊的博客-程序员秘密

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Markdown编辑器你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Mar

【复习向】年纪大了,那些被忘了的英语语法们_三千の世界的博客-程序员秘密

年纪大了,一些基础知识交还给老师了⊙﹏⊙,此篇用来记录被遗忘的语法目录1.什么时候加“the” 什么时候不加?2. 反义疑问句的用法总结3英语学习博文合集1.什么时候加“the” 什么时候不加?1 表示特定的人或物 The book on the table is mine.2表示听话人和说话人彼此都熟悉的人或物Lock the door, please.3 第一...

qt视频渲染画面闪烁_d3d11和qt窗口一起渲染闪烁_tiancai1912的博客-程序员秘密

问题: 用第三方库给qt窗口渲染时候,当移动窗口更改大小时候,画面出现闪烁现象。原因: widget属性updatesEnabled默认是true,这个时候按照qt文档说法就是 An updates enabled widget receives paint events and has a system background 这个widget会有默认的背景以及接受paintevent事件,

随便推点

RHEL7开机自动运行程序的脚本(fedora20通用)_redhat 7的init脚本_junmuzi的博客-程序员秘密

方法如下:cd /etc/init.dvi youshell.sh #将youshell.sh修改为你自己的脚本名编写自己的脚本后保存退出。在编写脚本的时候,请先加入以下注释#add for chkconfig#chkconfig: 2345 70 30#description: the description of the shell #关于脚本的简短描述#pro

微信小程序之接入七牛云_七牛云免费额度能给小程序用吗_七牛云的博客-程序员秘密

- 小程序发布大小超限- 存储空间不足- 文件加载速度慢

当网页向上滚动时,让header固定的停留在网页的上方_header固定在顶部不随着页面下滑_太阳的后裔的博客-程序员秘密

想当年,小编实习时总是遇见此类问题,当时为了画页面也是苦到没话说,小编在实习时就遇见过此类的问题,没有想到小编现在又遇到了,小编只好再做一下了,但有点小傲娇的小编下次只想看自己的博客<!DOCTYPE html><html> <head> <meta charset="utf-8"> <title></title&gt...

mysql 5.7.18 源码安装笔记_mysql5.7.18_wujianqinjian的博客-程序员秘密

之所以贴出这样一篇笔记呢?主要是因为很久之前,源码安装MySQL的时候,碰到了太多太多的坎坷。如果你有兴趣进行源码安装,那么请不要以这篇文章为标准,因为每个人的及其环境等其他因素还是差距比较大的。但可以作为一篇流程参考文档,其中的坑点总结,希望能帮助大家绕过一些不必要的麻烦!坑点1 5.7.6以后安装必须有boost支持2 权限和路径比较坑,步骤较多,如果中间错了,先检查之...

易灵思FPGA-软件Efinity和Modelsim联仿设置_易灵思仿真_Folsie的博客-程序员秘密

文章目录前言一、modelsim破解过程二、使用步骤1.引入库2.读入数据总结欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入前言提示:这里可以添加本文要记录的大概内容:例如:随着人工智

推荐文章

热门文章

相关标签