浅析 Spark Shuffle 内存使用(解决Spark Shuffle OOM问题)_zhifeng687的博客-程序员秘密

技术标签: spark  

在使用 Spark 进行计算时,我们经常会碰到作业 (Job) Out Of Memory(OOM) 的情况,而且很大一部分情况是发生在 Shuffle 阶段。那么在 Spark Shuffle 中具体是哪些地方会使用比较多的内存而有可能导致 OOM 呢? 为此,本文将围绕以上问题梳理 Spark 内存管理和 Shuffle 过程中与内存使用相关的知识;然后,简要分析下在 Spark Shuffle 中有可能导致 OOM 的原因。

一、Spark 内存管理和消费模型

在分析 Spark Shuffle 内存使用之前。我们首先了解下以下问题:当一个 Spark 子任务 (Task) 被分配到 Executor 上运行时,Spark 管理内存以及消费内存的大体模型是什么样呢?(注:由于 OOM 主要发生在 Executor 端,所以接下来的讨论主要针对 Executor 端的内存管理和使用)。

1,在 Spark 中,使用抽象类 MemoryConsumer 来表示需要使用内存的消费者。在这个类中定义了分配,释放以及 Spill 内存数据到磁盘的一些方法或者接口。具体的消费者可以继承 MemoryConsumer 从而实现具体的行为。 因此,在 Spark Task 执行过程中,会有各种类型不同,数量不一的具体消费者。如在 Spark Shuffle 中使用的 ExternalAppendOnlyMap, ExternalSorter 等等(具体后面会分析)。

2,MemoryConsumer 会将申请,释放相关内存的工作交由 TaskMemoryManager 来执行。当一个 Spark Task 被分配到 Executor 上运行时,会创建一个 TaskMemoryManager。在 TaskMemoryManager 执行分配内存之前,需要首先向 MemoryManager 进行申请,然后由 TaskMemoryManager 借助 MemoryAllocator 执行实际的内存分配。 

3,Executor 中的 MemoryManager 会统一管理内存的使用。由于每个 TaskMemoryManager 在执行实际的内存分配之前,会首先向 MemoryManager 提出申请。因此 MemoryManager 会对当前进程使用内存的情况有着全局的了解。

MemoryManager,TaskMemoryManager 和 MemoryConsumer 之前的对应关系,如下图。总体上,一个 MemoryManager 对应着至少一个 TaskMemoryManager (具体由 executor-core 参数指定),而一个 TaskMemoryManager 对应着多个 MemoryConsumer (具体由任务而定)。

clipboard.png

了解了以上内存消费的整体过程以后,有两个问题需要注意下:

1,当有多个 Task 同时在 Executor 上执行时, 将会有多个 TaskMemoryManager 共享 MemoryManager 管理的内存。那么 MemoryManager 是怎么分配的呢?答案是每个任务可以分配到的内存范围是 [1 / (2 * n), 1 / n],其中 n 是正在运行的 Task 个数。因此,多个并发运行的 Task 会使得每个 Task 可以获得的内存变小。

2,前面提到,在 MemoryConsumer 中有 Spill 方法,当 MemoryConsumer 申请不到足够的内存时,可以 Spill 当前内存到磁盘,从而避免无节制的使用内存。但是,对于堆内内存的申请和释放实际是由 JVM 来管理的。因此,在统计堆内内存具体使用量时,考虑性能等各方面原因,Spark 目前采用的是抽样统计的方式来计算 MemoryConsumer 已经使用的内存,从而造成堆内内存的实际使用量不是特别准确。从而有可能因为不能及时 Spill 而导致 OOM。

二、Spark Shuffle 过程

整体上 Spark Shuffle 具体过程如下图,主要分为两个阶段:Shuffle Write 和 Shuffle Read。


Write 阶段大体经历排序(最低要求是需要按照分区进行排序),可能的聚合 (combine) 和归并(有多个文件 spill 磁盘的情况 ),最终每个写 Task 会产生数据和索引两个文件。其中,数据文件会按照分区进行存储,即相同分区的数据在文件中是连续的,而索引文件记录了每个分区在文件中的起始和结束位置。

而对于 Shuffle Read, 首先可能需要通过网络从各个 Write 任务节点获取给定分区的数据,即数据文件中某一段连续的区域,然后经过排序,归并等过程,最终形成计算结果。

clipboard.png

对于 Shuffle Write,Spark 当前有三种实现,具体分别为 BypassMergeSortShuffleWriter, UnsafeShuffleWriter 和 SortShuffleWriter (具体使用哪一个实现有一个判断条件,此处不表)。而 Shuffle Read 只有一种实现。

2.1 Shuffle Write 阶段分析

2.1.1 BypassMergeSortShuffleWriter 分析

对于 BypassMergeSortShuffleWriter 的实现,大体实现过程是首先为每个分区创建一个临时分区文件,数据写入对应的分区文件,最终所有的分区文件合并成一个数据文件,并且产生一个索引文件。由于这个过程不做排序,combine(如果需要 combine 不会使用这个实现)等操作,因此对于 BypassMergeSortShuffleWriter,总体来说是不怎么耗费内存的。

2.1.2 SortShuffleWriter 分析

SortShuffleWriter 是最一般的实现,也是日常使用最频繁的。SortShuffleWriter 主要委托 ExternalSorter 做数据插入,排序,归并 (Merge),聚合 (Combine) 以及最终写数据和索引文件的工作。ExternalSorter 实现了之前提到的 MemoryConsumer 接口。下面分析一下各个过程使用内存的情况:

1,对于数据写入,根据是否需要做 Combine,数据会被插入到 PartitionedAppendOnlyMap 这个 Map 或者 PartitionedPairBuffer 这个数组中。每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。 

2,无论是 PartitionedAppendOnlyMap 还是 PartitionedPairBuffer, 使用的排序算法是 TimSort。在使用该算法是正常情况下使用的临时额外空间是很小,但是最坏情况下是 n / 2,其中 n 表示待排序的数组长度(具体见 TimSort 实现)。

3,当插入数据因为申请不到足够的内存将会 Spill 数据到磁盘,在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。Merge 的大体过程如下图。

clipboard.png

从上图可见,大体差不多就是归并排序的过程,由此可见这个过程是没有太多额外的内存消耗。归并过程中的聚合计算大体也是差不多的过程,唯一需要注意的是键值碰撞的情况,即当前输入的各个有序队列的键值的哈希值相同,但是实际的键值不等的情况。这种情况下,需要额外的空间保存所有键值不同,但哈希值相同值的中间结果。但是总体上来说,发生这种情况的概率并不是特别大。 


4,写数据文件的过程涉及到不同数据流之间的转化,而在流的写入过程中,一般都有缓存,主要由参数 spark.shuffle.file.buffer 和 spark.shuffle.spill.batchSize 控制,总体上这部分开销也不大。

以上分析了 SortShuffleWriter write 阶段的主要过程,从中可以看出主要的内存消耗在写入 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 这个阶段。

2.1.3 UnsafeShuffleWriter

UnsafeShuffleWriter 是对 SortShuffleWriter 的优化,大体上也和 SortShuffleWriter 差不多,在此不再赘述。从内存使用角度看,主要差异在以下两点:

一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存储的是键值或者值的具体类型,也就是 Java 对象,是反序列化过后的数据。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中数据是序列化以后存储到实际的 Page 中,而且在写入数据过程中会额外写入长度信息。总体而言,序列化以后数据大小是远远小于序列化之前的数据。 


另一方面,UnsafeShuffleWriter 中需要额外的存储记录(LongArray),它保存着分区信息和实际指向序列化后数据的指针(经过编码的Page num 以及 Offset)。相对于 SortShuffleWriter, UnsafeShuffleWriter 中这部分存储的开销是额外的。

2.2 Shuffle Read 阶段分析

Spark Shuffle Read 主要经历从获取数据,序列化流,添加指标统计,可能的聚合 (Aggregation) 计算以及排序等过程。大体流程如下图。

clipboard.png

以上计算主要都是迭代进行。在以上步骤中,比较复杂的操作是从远程获取数据,聚合和排序操作。接下来,依次分析这三个步骤内存的使用情况。


1,数据获取分为远程获取和本地获取。本地获取将直接从本地的 BlockManager 取数据, 而对于远程数据,需要走网络。在远程获取过程中,有相关参数可以控制从远程并发获取数据的大小,正在获取数据的请求数,以及单次数据块请求是否放到内存等参数。具体参数包括 spark.reducer.maxSizeInFlight (默认 48M),spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress 和 spark.maxRemoteBlockSizeFetchToMem。考虑到数据倾斜的场景,如果 Map 阶段有一个 Block 数据特别的大,默认情况由于 spark.maxRemoteBlockSizeFetchToMem 没有做限制,所以在这个阶段需要将需要获取的整个 Block 数据放到 Reduce 端的内存中,这个时候是非常的耗内存的。可以设置 spark.maxRemoteBlockSizeFetchToMem 值,如果超过该阈值,可以落盘,避免这种情况的 OOM。 另外,在获取到数据以后,默认情况下会对获取的数据进行校验(参数 spark.shuffle.detectCorrupt 控制),这个过程也增加了一定的内存消耗。

2,对于需要聚合和排序的情况,这个过程是借助 ExternalAppendOnlyMap 来实现的。整个插入,Spill 以及 Merge 的过程和 Write 阶段差不多。总体上,这块也是比较消耗内存的,但是因为有 Spill 操作,当内存不足时,可以将内存数据刷到磁盘,从而释放内存空间。

三、Spark Shuffle OOM 可能性分析

围绕内存使用,前面比较详细的分析了 Spark 内存管理以及在 Shuffle 过程可能使用较多内存的地方。接下来总结的要点如下:


1,首先需要注意 Executor 端的任务并发度,多个同时运行的 Task 会共享 Executor 端的内存,使得单个 Task 可使用的内存减少。

2,无论是在 Map 还是在 Reduce 端,插入数据到内存,排序,归并都是比较都是比较占用内存的。因为有 Spill,理论上不会因为数据倾斜造成 OOM。 但是,由于对堆内对象的分配和释放是由 JVM 管理的,而 Spark 是通过采样获取已经使用的内存情况,有可能因为采样不准确而不能及时 Spill,导致OOM。

3,在 Reduce 获取数据时,由于数据倾斜,有可能造成单个 Block 的数据非常的大,默认情况下是需要有足够的内存来保存单个 Block 的数据。因此,此时极有可能因为数据倾斜造成 OOM。 可以设置 spark.maxRemoteBlockSizeFetchToMem 参数,设置这个参数以后,超过一定的阈值,会自动将数据 Spill 到磁盘,此时便可以避免因为数据倾斜造成 OOM 的情况。在我们的生产环境中也验证了这点,在设置这个参数到合理的阈值后,生产环境任务 OOM 的情况大大减少了。

4,在 Reduce 获取数据后,默认情况会对数据流进行解压校验(参数 spark.shuffle.detectCorrupt)。正如在代码注释中提到,由于这部分没有 Spill 到磁盘操作,也有很大的可性能会导致 OOM。在我们的生产环境中也有碰到因为检验导致 OOM 的情况。

转载自:有赞技术

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_26222859/article/details/51221083

智能推荐

《Cortex-M0权威指南》之Cortex-M0编程入门_cortex-m0参考手册学习_大吉机器人的博客-程序员秘密

《Cortex-M0权威指南》之Cortex-M0编程入门转载请注明来源:cuixiaolei的技术博客嵌入式系统编程入门微控制器是如何启动的  为了保存编译号的二进制程序代码,大多数的现代微控制器都会包含片上flash存储器。有些微控制器还可能有一个独立的启动ROM,里面装有Bootloader程序。微控制器启动后,再执行flash的用户程序前,Bootloader会首先运行。...

python,pycharm的安装及其环境变量的配置_pycharm环境变量配置_zh_huan的博客-程序员秘密

python安装:软件的下载:1.去官网下载:python下载2.在浏览器搜索栏输入“python下载”,然后下载3.当然也可以在第三方平台下载软件的安装:找到并打开软件安装包,双击1.默认安装路径:默认在C盘下2,自定义路径3,打勾,表示自动设置环境变量。我选择自定义安装,点击进入下个界面全部打勾,不用管他什么意思,感兴趣的可以自行百度意思。点击next,下一步,下面的界面...

zoj2112 主席树动态第k大 (主席树&&树状数组)_weixin_30826761的博客-程序员秘密

Dynamic RankingsTime Limit:10 Seconds Memory Limit:32768 KBThe Company Dynamic Rankings has developed a new kind of computer that is no longer satisfied with the query like to simply find the...

java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver_qq_37176957的博客-程序员秘密

sequenceDiagram(x)csdn博客有位网友说将驱动和url修改为4.0的jar驱动,还是不行,我甚至修改为4.1的jar驱动.也总是报错.(x)将 sqljdbc_auth.dll 拷贝至C:\Windows\System32中,还是不行.(x)然后我以为是jdk版本差异会影响,我换了jdk,还是不行.(x)然后将jar4.1的驱动包放到web下的Web-INF的lib...

简单的人脸识别系统_人脸识别报警记录怎么写入数据库_Logan_Lin的博客-程序员秘密

之前一直在做一个功能比较简单的人脸识别系统,当然实验里的程序有部分是参考别人的,但是在整个实验过程中,我学到了不少东西,所以就记录一下,也供其它有需要的人参考。实验的最终界面是这样的:在做MFC界面前首先是参考了下面这几个链接,自己也实现了一遍https://www.2cto.com/kf/201605/508553.htmlhttp://blog.csdn.net/xingchenbingbuy...

随便推点

sql中聚合函数和分组函数_学习SQL:聚合函数_culuo4781的博客-程序员秘密

sql中聚合函数和分组函数 SQL has many cool features and aggregate functions are definitely one of these features, actually functions. While they are not specific to SQL, they are used often. They are part o...

构建哈夫曼树及求它的带权路径长度_使用顺序结构来实现哈夫曼树,并求出最小带权路径长度_可乐学算法的博客-程序员秘密

给定N个权值作为N个叶子结点,构造一棵二叉树,若该树的带权路径长度达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree)。要构成哈夫曼树,值比较大的叶子节点高度越低越好。(1) 将n个权值看出n颗只有根节点的树,构建n颗树。(2) 在森林中选出两个根结点的权值最小的树合并,作为一棵新树的左、右子树,且新树的根结点权值为其左、右子树根结点权值之和;(3)从森林中删除选...

数据库单行长度限制_数据库表单行记录会超过60k,一定要做裁剪_Scott_ora的博客-程序员秘密

不同数据库单行记录长度限制1.MySQL单行长度限制为655352.Oracle对单行记录长度不做限制,可通过行迁移或者行链接(如果单个block无法存放单条记录的时候,通过多个block链接起来)扩展单行的大小限制3.4.分布式数据库tidb行大小限制为...

CRC循环校验怎么计算 循环的性质以及CRC如何进行纠错_crc校验码怎么判断哪位出错_...xiexiaohan的博客-程序员秘密

CRC是循环冗余校验,关于其是怎么计算校验码的,课程上都讲的很清楚,但是对于冗余的特性以及CRC如何进行纠错,很多地方都没有提到在复习组原的时候看到了秦老师的mooc,他讲了一个CRC计算检验的例题,但是关于纠错的部分也是一笔带过,这里针对这道例题,进行一下补充例题真值为1100,计算其CRC校验码并讨论如何纠错发送方进行CRC计算如图,其中G(x)可以通过查表得知在1100后面补上3个0,为1100 000到这一步CRC校验码就算出来了,那么对于接收方来说,要如何判断接受的数据是.

配置DNS、安装MYSQL、安装APACHE_阿里云dns 装mysql 客户端_一一弓也的博客-程序员秘密

配置DNS、安装MYSQL、安装APACHE一、配置DNS DNS原理 DNS 为Domain Name System(域名系统)的缩写,它是一种将ip地址转换成对应的主机名或将主机名转换成与之相对应ip地址的一种服务机制。 其中通过域名解析出ip地址的叫做正向解析,通过ip地址解析出域名的叫做反向解析。 DNS使用TCP和UDP, 端口号都是53, 但它主要使用UDP,服务...

推荐文章

热门文章

相关标签