2024年2月10日发(作者:)
【Flink】Flink1.12.2源码浅析:TaskExecutor1.概述转载:TaskExecutor 是TaskManger的具体实现.
/** *
此任务管理器的连接信息。 * The connection information of this task manager. */ private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation; private final TaskManagerMetricGroup taskManagerMetricGroup; /** *
此任务的状态管理器,为每个插槽提供状态管理器。 * The state manager for this task, providing state managers per slot. */ private final TaskExecutorLocalStateStoresManager localStateStoresManager; /** Information provider for external resources. */ private final ExternalResourceInfoProvider externalResourceInfoProvider; /** The network component in the task manager. */ private final ShuffleEnvironment, ?> shuffleEnvironment; /** The kvState registration service in the task manager. */ private final KvStateService kvStateService; private final Executor ioExecutor;3.1.3. 任务slot分配表 private final TaskSlotTable
相关 @Nullable private ResourceManagerAddress resourceManagerAddress; @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection; @Nullable private TaskExecutorToResourceManagerConnection resourceManagerConnection; @Nullable private UUID currentRegistrationTimeoutId; private Map
//
是否连接到 ResourceManager if (!isConnectedToResourceManager(resourceManagerId)) { final String message = ( "TaskManager is not connected to the resource manager %s.", resourceManagerId); (message); return tedExceptionally(new TaskManagerException(message)); } try { //[重点]
分配 slot allocateSlot(slotId, jobId, allocationId, resourceProfile); } catch (SlotAllocationException sae) { return tedExceptionally(sae); } final job; try { //
获取/构建 job =reateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress)); } catch (Exception e) { // free the allocated slot try { ot(allocationId); } catch (SlotNotFoundException slotNotFoundException) { // slot no longer existent, this should actually never happen, because we've // just allocated the slot. So let's fail hard in this case! onFatalError(slotNotFoundException); } // release local state under the allocation id. eLocalStateForAllocationId(allocationId); // sanity check if (!Free(tNumber())) { onFatalError(new Exception("Could not free slot " + slotId)); } return tedExceptionally( new SlotAllocationException("Could not create new job.", e)); } if (ected()) { //[重要]
向JobManager提供Slot offerSlotsToJobManager(jobId); } return tedFuture(()); }ecutor#allocateSlot(slotId, jobId, allocationId, resourceProfile); private void allocateSlot(
"The slot " + slotId + " has already been allocated for a different job."; (message); final AllocationID allocationID = rentAllocation(tNumber()); throw new SlotOccupiedException( message, allocationID, ingJob(allocationID)); } }ecutor#offerSlotsToJobManager(jobId);
// ------------------------------------------------------------------------ // Internal job manager connection methods // ------------------------------------------------------------------------ private void offerSlotsToJobManager(final JobID jobId) { //
向JobManager提供Slot : internalOfferSlotsToJobManager nection(jobId).ifPresent(this::internalOfferSlotsToJobManager); } private void internalOfferSlotsToJobManager(tion jobManagerConnection) { //
获取JobID final JobID jobId = Id(); // JobID是否已经分配 if (ocatedSlots(jobId)) { // Offer reserved slots to the leader of job 694474d11da6100e82744c9e47e2f511. ("Offer reserved slots to the leader of job {}.", jobId); //
获取JobMaster
的 Gateway final JobMasterGateway jobMasterGateway = ManagerGateway(); //
获取
分配给jobId
的所有 TaskSlot final Iterator
获取 JobMasterId final JobMasterId jobMasterId = MasterId(); //
保留的Slot final Collection
异步操作.
处理响应请求,处理异常 ||
标记为 slot
状态为active mpleteAsync( handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots), getMainThreadExecutor()); } else { ("There are no unassigned slots for the job {}.", jobId); } }3.2.2. freeSlotFrees the slot with the given allocation ID.
final tion jobManagerConnection = nection(jobId) .orElseThrow( () -> { final String message = "Could not submit task because there is no JobManager " + "associated for the job " + jobId + '.'; (message); return new TaskSubmissionException(message); }); if (!(MasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + MasterId() + '.'; (message); throw new TaskSubmissionException(message); } if (!kSlotActive(jobId, ocationId())) { final String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + ocationId() + '.'; (message); throw new TaskSubmissionException(message); } // re-integrate offloaded data: try { gData(manentBlobService()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); } // deserialize the pre-serialized information final JobInformation jobInformation; final TaskInformation taskInformation; try { jobInformation = ializedJobInformation() .deserializeValue(getClass().getClassLoader()); taskInformation = ializedTaskInformation() .deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not deserialize the job or task information.", e); } if (!(Id())) { throw new TaskSubmissionException( "Inconsistent job ID information inside TaskDeploymentDescriptor (" + Id()
Id(), ducedPartitions(), minationFuture()); return tedFuture(()); } else { final String message = "TaskManager already contains a task for id " + cutionId() + '.'; (message); throw new TaskSubmissionException(message); } } catch (TaskSubmissionException e) { return tedExceptionally(e); } }3.2.4. updatePartitions根据分区信息 huffleEnvironment#updatePartitionInfo 同步分区相关信息.
@Override public CompletableFuture
获取Task final Task task = k(executionAttemptID); if (task == null) { return tedExceptionally( new TaskNotRunningException( "Task " + executionAttemptID + " not running on TaskManager")); } try { //
发送 OperatorEvent
给 task rOperatorEvent(operatorId, evt); return tedFuture(()); } catch (Throwable t) { wIfFatalError(t); return tedExceptionally(t); } }3.2.13. requestThreadDump请求获取Thread线程的信息… @Override public CompletableFuture


发布评论