2023年11月26日发(作者:)
Spark⼩⽂件合并优化实践
⽂章⽬录
对 spark 任务数据落地(HDFS) 碎⽚⽂件过多的问题的优化实践及思考。
背景
此⽂是关于公司在 Delta Lake 上线之前对Spark任务写⼊数据产⽣碎⽚⽂件优化的⼀些实践。
形成原因
数据在流转过程中经历 filter/shuffle 等过程后,开发⼈员难以评估作业写出的数据量。即使使⽤了 Spark 提供的AE功能,⽬前也只
能控制 shuffle read 阶段的数据量,写出数据的⼤⼩实际还会受压缩算法及格式的影响,因此在任务运⾏时,对分区的数据评估⾮常
困难。
shuffle 分区过多过碎,写⼊性能会较差且⽣成的⼩⽂件会⾮常多。
shuffle 分区过少过⼤,则写⼊并发度可能会不够,影响任务运⾏时间。
不利影响
在产⽣⼤量碎⽚⽂件后,任务数据读取的速度会变慢(需要寻找读⼊⼤量的⽂件,如果是机械盘更是需要⼤量的寻址操作),同时会对
hdfs namenode 内存造成很⼤的压⼒。
在这种情况下,只能让业务/开发⼈员主动的合并下数据或者控制分区数量,提⾼了⽤户的学习及使⽤成本,往往效果还⾮常不理想。
既然在运⾏过程中对最终落地数据的评估如此困难,是否能将该操作放在数据落地后进⾏?对此我们进⾏了⼀些尝试,希望能⾃动化的解
决/缓解此类问题。
⼀些尝试
⼤致做了这么⼀些⼯作:
1. 修改 Spark FileFormatWriter 源码,数据落盘时,记录相关的 metrics,主要是⼀些分区/表的记录数量和⽂件数量信息。
2. 在发⽣落盘操作后,会⾃动触发碎⽚⽂件检测,判断是否需要追加合并数据任务。
3. 实现⼀个 MergeTable 语法⽤于合并表/分区碎⽚⽂件,通过系统或者⽤户直接调⽤。
第1和第2点主要是平台化的⼀些⼯作,包括监测数据落盘,根据采集的 metrics 信息再判断是否需要进⾏ MergeTable 操作,下⽂是关于
MergeTable 的⼀些细节实现。
MergeTable
功能:
1. 能够指定表或者分区进⾏合并
2. 合并分区表但不指定分区,则会递归对所有分区进⾏检测合并
3. 指定了⽣成的⽂件数量,就会跳过规则校验,直接按该数量进⾏合并
语法:
merge table [表名] [options (fileCount=合并后⽂件数量)] --
⾮分区表
merge table [表名] PARTITION (分区信息) [options (fileCount=合并后⽂件数量)] --
分区表
碎⽚⽂件校验及合并流程图:
性能优化
1. 只合并碎⽚⽂件
如果设置的碎⽚阈值是128M,那么只会将该表/分区内⼩于该阈值的⽂件进⾏合并,同时如果碎⽚⽂件数量⼩于⼀定阈值,将不会触
发合并,这⾥主要考虑的是合并任务存在⼀定性能开销,因此允许系统中存在⼀定量的⼩⽂件。
2. 分区数量及合并⽅式
定义了⼀些规则⽤于计算输出⽂件数量及合并⽅式的选择,获取任务的最⼤并发度 maxConcurrency ⽤于计算数据的分块⼤⼩,再
根据数据碎⽚⽂件的总⼤⼩选择合并(coalesce/repartition)⽅式。
1. 开启 dynamicAllocation
maxConcurrency = cutors *
2. 未开启 dynamicAllocation
maxConcurrency = ces *
对 MergeTable 操作做了上述的相关优化后,根据不同的数据场景下,能带来数倍⾄数⼗倍的性能提升。
缺陷
因为采⽤的是同步合并的⽅式,由于没有事务控制,所以在合并过程中数据不可⽤,这也是我们后来开始引⼊ Delta Lake 的⼀个原因。


发布评论