2024年4月11日发(作者:)

大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件

1. 前言

Executor是如何获取到Shuffle的数据文件进行Action的算子的计算呢?在

ResultTask中,Executor通过MapOutPutTracker向Driver获取了ShuffID的Shuffle

数据块的结构,整理成以BlockManangerId为Key的结构,这样可以更容易区分究竟是

本地的Shuffle还是远端executor的Shuffle

2. Fetch数据

在MapOutputTracker中获取到的BlockID的地址,是以BlockManagerId的seq

数组

[plain] view plain copy

Seq[(BlockManagerId, Seq[(BlockId, Long)])]

BlockManagerId结构

[plain] view plain copy

class BlockManagerId private (

private var executorId_ : String,

private var host_ : String,

private var port_ : Int,

private var topologyInfo_ : Option[String])

extends Externalizable

是以ExecutorId,Executor Host IP, Executor Port 标示从哪个Executor获取

Shuffle的数据文件,通过Seq[BlockManagerId, Seq(BlockID,Long)]的结构,当前

executor很容易区分究竟哪些是本地的数据文件,哪些是远端的数据,本地的数据可以直

接本地读取,而需要不通过网络来获取。

2.1 读取本Executor文件

如何认为是本地数据?

Spark认为区分是通过相同的ExecutorId来区别的,如果ExecutorId和自己的

ExecutorId相同,认为是本地Local,可以直接读取文件。

[plain] view plain copy

for ((address, blockInfos) <- blocksByAddress) {

totalBlocks +=