最近在做Spark 3.1
升级 Spark 3.5
的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况
SQLAppStatusListener
类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
。
解决方法:
spark.sql.adaptive.enabled false
背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId
来进行跟踪的。
对于一个sql,举例来说 :
insert into TableA select * from TableB;
在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId
方法:
def assertOptimized(): Unit = optimizedPlan
...
lazy val commandExecuted: LogicalPlan = mode match {
case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)
case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)
case CommandExecutionMode.SKIP => analyzed
}
...
lazy val optimizedPlan: LogicalPlan = {
// We need to materialize the commandExecuted here because optimizedPlan is also tracked under
// the optimizing phase
assertCommandExecuted()
executePhase(QueryPlanningTracker.OPTIMIZATION) {
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
// optimizing and planning.
val plan =
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
// We do not want optimized plans to be re-analyzed as literals that have been constant
// folded and such can cause issues during analysis. While `clone` should maintain the
// `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of
// paranoia.
plan.setAnalyzed()
plan
}
def assertCommandExecuted(): Unit = commandExecuted
...
private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
case c: Command =>
// Since Command execution will eagerly take place here,
// and in most cases be the bulk of time and effort,
// with the rest of processing of the root plan being just outputting command results,
// for eagerly executed commands we mark this place as beginning of execution.
tracker.setReadyForExecution()
val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
val name = commandExecutionName(c)
val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {
SQLExecution.withNewExecutionId(qe, Some(name)) {
qe.executedPlan.executeCollect()
}
}
而SQLExecution.withNewExecutionId
主要的作用是设置当前计划的所属的executionId
:
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
该EXECUTION_ID_KEY
的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands
看到qe.executedPlan.executeCollect()
这是具体的执行方法,针对于insert into
操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand
,这里的run方法最终会流转到DAGScheduler.submitJob
方法:
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
JobArtifactSet.getActiveOrDefault(sc),
Utils.cloneProperties(properties)))
最终会被DAGScheduler.handleJobSubmitted
处理,其中会发送SparkListenerJobStart
事件:
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
该事件会被SQLAppStatusListener
捕获,从而转到onJobStart
处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大
那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan
中,如下:
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting
// for tasks to be scheduled and leading to broadcast timeout.
// This partial fix only guarantees the start of materialization for BroadcastQueryStage
// is prior to others, but because the submission of collect job for broadcasting is
// running in another thread, the issue is not completely resolved.
val reorderedNewStages = result.newStages
.sortWith {
case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => false
case (_: BroadcastQueryStageExec, _) => true
case _ => false
}
// Start materialization of all new stages and fail fast if any stages failed eagerly
reorderedNewStages.foreach { stage =>
try {
stage.materialize().onComplete { res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
// explicitly clean up the resources in this stage
stage.cleanupResources()
}(AdaptiveSparkPlanExec.executionContext)
这里就是得看stage.materialize()
这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
,
这两个物理计划稍微分析一下如下:
broadcast.submitBroadcastJob
||
\/
promise.future
||
\/
relationFuture
||
\/
child.executeCollectIterator()
其中 promise
的设置在relationFuture
方法中,而relationFuture
会被doPrepare
调用,而submitBroadcastJob
会调用executeQuery
,从而调用doPrepare
,executeCollectIterator()
最终也会发送JobSubmitted
事件,分析和上面的一样 shuffle.submitShuffleJob
||
\/
sparkContext.submitMapStage(shuffleDependency)
||
\/
dagScheduler.submitMapStage
该submitMapStage
会发送MapStageSubmitted
事件:
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),
Utils.cloneProperties(properties)))
最终会被DAGScheduler.handleMapStageSubmitted
处理,其中会发送SparkListenerJobStart
事件:
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
该事件会被SQLAppStatusListener
捕获,从而转到onJobStart
处理:
private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
...
override def onJobStart(event: SparkListenerJobStart): Unit = {
val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
if (executionIdString == null) {
// This is not a job created by SQL
return
}
val executionId = executionIdString.toLong
val jobId = event.jobId
val exec = Option(liveExecutions.get(executionId))
该方法会获取事件中的executionId
,在AQE中,同一个执行单元的executionId
是一样的,所以stageMetrics
内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan
等方法中。
这样整个事件的数据流以及问题的产生原因就应该很清楚了。
为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。
文章浏览阅读4.6k次。Value-Mapped Lists(值映射列表) 功能描述:上述财产的ComboList确保单元格的值是从名单中挑选。由用户选择的值转换成列的适当类型和存储在网格,完全一样,如果用户已输入的值。在许多情况下,细胞能够承担来自明确列出的值,但是你想显示一个用户的实际价值的版本。例如,如果一个列包含的产品代码,您可能要存储的代码,但显示的产品名称来代替。这是通过的DataMa_c# c1flexgrid
文章浏览阅读199次。ArbitrageProblem Description Arbitrage is the use of discrepancies in currency exchange rates to transform one unit of a currency into more than one unit of the same currency. For example, suppose tha_arbitrage is the use of discrepancies in currency
文章浏览阅读1w次,点赞2次,收藏12次。libjpeg库的交叉编译libjpeg库主要用于jpeg格式图片的编解码,其交叉编译过程如下1. 下载源码从官方网站http://www.ijg.org/files/ 下载libjpeg库的源码,本次编译过程使用的是 jpegsrc.v9a.tar.gz2. 解压源码2.1 切换到下载目录,执行 tar -xzvf jpegsrc.v9a.tar.g_libjpeg缩略图
文章浏览阅读209次。2019独角兽企业重金招聘Python工程师标准>>> ..._mysql 时间戳用什么类型合适
文章浏览阅读221次。ChenAndyMNCsignalchainFAE摘要随着数模转换器的转换速率越来越高,JESD204B串行接口已经越来越多地广泛用在数模转换器上,其对器件时钟和同步时钟之间的时序关系有着严格需求。本文就重点讲解了JESD204B数模转换器的时钟规范,以及利用TI公司的芯片实现其时序要求。关键字:LMK04800,LMK04828,LMK1802,LMK01010,JESD204内容1.J..._204b接口支持哪种时钟
文章浏览阅读2.3k次,点赞7次,收藏69次。显卡是一个硬件,需要有一个驱动才能够被我们计算机识别出来,在安装驱动的时候,会随着驱动安装一个叫做cuda driver的东西,cuda是可以让显卡进行并行运算的一个平台,当我们的计算机想利用显卡做一些并行运算的时候,它就可以通过cuda driver去操作显卡。那为什么需要虚拟环境呢,一个直接的原因,例如我们一个项目要用pytorch开发,而另一个要用tensorflow开发,这样,我们可以创建两个虚拟环境,在里面分别安装pytorch和tensorflow,两个虚拟环境中的包和库不会互相冲突。_mx330显卡能玩跑深度学习程序吗
文章浏览阅读359次。新版驱动惹得锅,默认为美国时区,修改如下spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/mybatis?serverTimezone=UTC username: root password: 123 typ...
文章浏览阅读365次。python中自定义模块的简述模块 => python文件包 => 目录初始化 __init__.py => 初始化文件,当导入包的时候会自动执行python包中的文件是独立的,(与go区分)注意:当模块被导入的时候,模块中的代码都会被执行一次,建议每次导入模块的时候就导入模块的某个函数即可,否则很容易出现错误链接:https://pan.baidu.com/s/12jZiYPEmHDpWOQMlGTGEUQ?pwd=zouh提取码:zouh。_python里的go是模块吗?
文章浏览阅读68次。为什么80%的码农都做不了架构师?>>> ..._typeof content == 'boolean
文章浏览阅读3.1k次,点赞9次,收藏31次。Vue前端与Django后端实现前后端分离连接_vuedjango前后端分离
文章浏览阅读2.2w次,点赞11次,收藏66次。问题描述在深度学习环境 GPU 版 pytorch 下,运行代码出现报错,关键报错信息如下:RuntimeError: Couldn't load custom C++ ops. This can happen if your PyTorch and torchvision versions are incompatible, 大致的意思是说当前环境的 PyTorch 和 torchvision 版本不匹配,建议重新安装 PyTorch 和 torchvision。具体报错信息如下:Traceb_runtimeerror: couldn't load custom c++ ops. this can happen if your pytorch
文章浏览阅读1.2k次,点赞2次,收藏8次。本教程详细记录了华为海思Hi35xx系列ARM32交叉编译opencv、zlib、libpng的方法。是上一篇x86环境源码编译opencv的姊妹篇。_海思 opencv 交叉编译