2023年11月26日发(作者:)

Spark原理-SparkSql框架优化策略

有了SparkCore为什么还要有SparkSql呢?

有两⼤原因:

⼀是SparkCore只能⽤Api,这就把很多SqlBoy拒之门外,Spark就⽆法发扬光⼤了;

⼆是使⽤Api时⽤户编写的函数作为⼀个个闭包被序列化后分发到Executor执⾏,Spark⽆法对⽤户⾃定义的代码进⾏优化;

基于以上原因,SparkSql横空出世,并提供强⼤的、⼀篮⼦的优化⽅案,以便使⽤户专注于业务需求的实现,把性能优化交给spark框架。

SparkSql提供如下优化措施:

⼀,Catalyst 优化器

Catalyst 主要负责三个⼯作:

⼀是根据Sql⽣成语法树(执⾏计划);

⼆是执⾏计划的逻辑优化;

三是执⾏计划的物理优化

以如下Sql为例,来看看Catalyst 是如何⼯作的:

select

student_id,

count(1)

from

(

select

student_id,

age

from

score

where

age > 10

)tmp

这是Catalyst 解析出来的语法树,也可以认为是执⾏计划,从上往下并⾏执⾏。

按照这个计划,其实就可以去执⾏了,但是Catalyst 还会对其进⾏优化。

逻辑优化是基于规则的优化,是静态的,Spark并没有到此为⽌,接下来会对执⾏计划进⾏动态的调整优化,动态调整的依据是运⾏中的数

据统计。

第三,执⾏计划的物理优化

在执⾏过程中,Spark会根据数据集的⼤⼩进⾏计算策略的调整,以join为例,会根据数据集的⼤⼩选择Join的⽅式、数据分发的⽅式,⽐

如有⼀个数据集⽐较⼩,会选择Broadcast⽅式将数据分发出去,节省⽹络分发的时间,提⾼性能。

⼆,Tungsten

继Catalyst 的优化之后,Tungsten ⼜出场了,其主要在⽅⾯进⾏优化,主要的⽬的是为了更⾼效率的利⽤内存和

数据结构执⾏代码

CPU,如将空间利⽤率的java对象变为UnsafeRow;为了减少昂贵的⽅法调⽤,将⼀个Stage多个算⼦整合为⼀个函数;

Unsafe Row避免了⼤量的额外信息的存储,极⼤的提⾼了空间利⽤率,对于Spark这种重度内存依赖型计算引擎,有⾮常⼤的性能提升作

⽤。

通常我们在写代码的过程中,并不经常直接使⽤Unsafe Row,Spark计算产⽣的中间结果和输出会使⽤到。

参考:

2,全阶段代码⽣成(WSCGWhole Stage Code Generation

对于同⼀个Stage的多个算⼦,本质上是多个函数的链式调⽤,伴随着很多基本类型的装箱操作,Tungsten对这些代码进⾏分析,将多个

函数融合为⼀个函数,将多次输⼊输出变为⼀次输⼊输出,减少了函数调⽤和参数封装。

三,AQEAdaptive Query Execution

AQE 的全称是 Adaptive Query Execution,⾃适应查询执⾏。

AQE主要是针对Shuffle进⾏的优化,包含了 3 个动态优化特性:

Join 策略调整

⾃动分区合并

⾃动倾斜处理

1,开启

AQE 机制默认是未开启的,要想充分利⽤上述的 3 个特性,通过如下配置开启:

spark.sql.adaptive.enabled=true

2 Join 策略调整

在运⾏的过程中,AQE会动态跟踪数据的变化,如A/B两个表Join,如果这两个表都是⼤表,在⽣成执⾏计划时,只能选择Shuffle Join,

但后续对B进⾏过滤或者聚合后,数据量⼤幅减少,AQE将会动态的将Shuffle Join调整为Broadcast Join。

这⾥看的出AQE的Join调整策略依赖于Shuffle的中间⽂件,因为其需要根据中间⽂件的⼤⼩去决策是否调整Join⽅式。

3,⾃动分区合并

第⼀个参数设置分区的最⼩尺⼨,AQE会根据这个参数确定是否需要合并。

第⼆个参数确定最⼩分区数,合并后的分区数不能低于该配置。

d 设置为 true 即可⾃动处理 Join 时数据倾斜

PartitionMaxSplits 控制处理⼀个倾斜 Partition 的 Task 个数上限,默认值为 5

PartitionRowCountThreshold 设置了⼀个 Partition 被视为倾斜 Partition 的⾏数下限,也即⾏数低于

该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即⼀千万