2023年11月26日发(作者:)
Spark原理-SparkSql框架优化策略
有了SparkCore为什么还要有SparkSql呢?
有两⼤原因:
⼀是SparkCore只能⽤Api,这就把很多SqlBoy拒之门外,Spark就⽆法发扬光⼤了;
⼆是使⽤Api时⽤户编写的函数作为⼀个个闭包被序列化后分发到Executor执⾏,Spark⽆法对⽤户⾃定义的代码进⾏优化;
基于以上原因,SparkSql横空出世,并提供强⼤的、⼀篮⼦的优化⽅案,以便使⽤户专注于业务需求的实现,把性能优化交给spark框架。
SparkSql提供如下优化措施:
⼀,Catalyst 优化器
Catalyst 主要负责三个⼯作:
⼀是根据Sql⽣成语法树(执⾏计划);
⼆是执⾏计划的逻辑优化;
三是执⾏计划的物理优化
以如下Sql为例,来看看Catalyst 是如何⼯作的:
select
student_id,
count(1)
from
(
select
student_id,
age
from
score
where
age > 10
)tmp
这是Catalyst 解析出来的语法树,也可以认为是执⾏计划,从上往下并⾏执⾏。
按照这个计划,其实就可以去执⾏了,但是Catalyst 还会对其进⾏优化。
逻辑优化是基于规则的优化,是静态的,Spark并没有到此为⽌,接下来会对执⾏计划进⾏动态的调整优化,动态调整的依据是运⾏中的数
据统计。
第三,执⾏计划的物理优化
在执⾏过程中,Spark会根据数据集的⼤⼩进⾏计算策略的调整,以join为例,会根据数据集的⼤⼩选择Join的⽅式、数据分发的⽅式,⽐
如有⼀个数据集⽐较⼩,会选择Broadcast⽅式将数据分发出去,节省⽹络分发的时间,提⾼性能。
⼆,Tungsten
继Catalyst 的优化之后,Tungsten ⼜出场了,其主要在和⽅⾯进⾏优化,主要的⽬的是为了更⾼效率的利⽤内存和
数据结构执⾏代码
CPU,如将空间利⽤率的java对象变为UnsafeRow;为了减少昂贵的⽅法调⽤,将⼀个Stage多个算⼦整合为⼀个函数;
Unsafe Row避免了⼤量的额外信息的存储,极⼤的提⾼了空间利⽤率,对于Spark这种重度内存依赖型计算引擎,有⾮常⼤的性能提升作
⽤。
通常我们在写代码的过程中,并不经常直接使⽤Unsafe Row,Spark计算产⽣的中间结果和输出会使⽤到。
参考:
2,全阶段代码⽣成(WSCG,Whole Stage Code Generation)
对于同⼀个Stage的多个算⼦,本质上是多个函数的链式调⽤,伴随着很多基本类型的装箱操作,Tungsten对这些代码进⾏分析,将多个
函数融合为⼀个函数,将多次输⼊输出变为⼀次输⼊输出,减少了函数调⽤和参数封装。
三,AQE(Adaptive Query Execution)
AQE 的全称是 Adaptive Query Execution,⾃适应查询执⾏。
AQE主要是针对Shuffle进⾏的优化,包含了 3 个动态优化特性:
Join 策略调整
⾃动分区合并
⾃动倾斜处理
1,开启
AQE 机制默认是未开启的,要想充分利⽤上述的 3 个特性,通过如下配置开启:
spark.sql.adaptive.enabled=true
2, Join 策略调整
在运⾏的过程中,AQE会动态跟踪数据的变化,如A/B两个表Join,如果这两个表都是⼤表,在⽣成执⾏计划时,只能选择Shuffle Join,
但后续对B进⾏过滤或者聚合后,数据量⼤幅减少,AQE将会动态的将Shuffle Join调整为Broadcast Join。
这⾥看的出AQE的Join调整策略依赖于Shuffle的中间⽂件,因为其需要根据中间⽂件的⼤⼩去决策是否调整Join⽅式。
3,⾃动分区合并
第⼀个参数设置分区的最⼩尺⼨,AQE会根据这个参数确定是否需要合并。
第⼆个参数确定最⼩分区数,合并后的分区数不能低于该配置。
d 设置为 true 即可⾃动处理 Join 时数据倾斜
PartitionMaxSplits 控制处理⼀个倾斜 Partition 的 Task 个数上限,默认值为 5
PartitionRowCountThreshold 设置了⼀个 Partition 被视为倾斜 Partition 的⾏数下限,也即⾏数低于
该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即⼀千万


发布评论