【Flink】详解JobGraph

我们好,咱们的gzh是朝阳三只大明白,满满满是干货,共享近期的学习常识以及个人总结(包括读研和IT),跪求一波关注,期望和我们一起努力、进步!!

概述

JobGraph 是 StreamGraph 优化后的产品,客户端会将优化后的 JobGraph 发送给 JM。接下来的文章涉及到一些前置常识点,没有看前几期的小伙伴最好看一下前几期:

  1. 【Flink】详解StreamGraph
  2. 【Flink】浅谈Flink架构和调度
  3. 【Flink】详解Flink的八种分区

Flink 在客户端将 StreamGraph 目标转换成 JobGraph 目标,这个转换的中心在于将多个契合条件的 StreamNode 节点兼并在一起,构成一个 JobVertex 节点,这样的优化办法称之为算子链兼并,这样做能够有用减少数据在节点间传递所需的序列化、反序列化操作。同一个算子链中的算子运行在同一个 TaskSlot 中,也可由理解为运行在一个线程中,这样能够明显下降线程切换的功能开支,并且能增大吞吐量和下降延迟。

源码剖析

JobGraph 的构建

JobGraph 的相关代码主要在【flink-runtime】模块下的 org.apache.flink.runtime.JobGraph 中。其调用链路是 StreamGraph#getJobGraphStreamingJobGraphGenerator#createJobGraph()

/*------------------------ StreamGraph ---------------------------*/
// 结构进口
public JobGraph getJobGraph() {  
    return getJobGraph(null);  
}  
public JobGraph getJobGraph(@Nullable JobID jobID) {  
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);  
}
/*---------------------------------------------------------*/
/*--------------- StreamingJobGraphGenerator ------------------*/
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {  
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();  
}
private JobGraph createJobGraph() {  
	// 前置校验
    preValidate();  
    // 获取StreamGraph的调度形式
    // 设置JobGraph的调度形式
    jobGraph.setJobType(streamGraph.getJobType());  
	// jobGraph设置是否发动本地近似康复战略
    jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
    // 为每一个StreamNode生成一个确认的哈希值
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);  
    // 为兼容问题生成哈希值
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());  
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {  
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));  
    }  
	// 这里是重点,JobGraph的极点和边在这个办法中创立。
	// 测验将尽或许多的StreamNode聚合在一个JobGraph节点中。
	// 判别算子chain,兼并创立JobVertex,并生成JobEdge。
    setChaining(hashes, legacyHashes);  
    // 设置物理鸿沟
    setPhysicalEdges();  
    // 设置jobGraph的SlotSharingGroup和CoLocationGroup
    setSlotSharingAndCoLocation();  
    setManagedMemoryFraction(  
            Collections.unmodifiableMap(jobVertices),  
            Collections.unmodifiableMap(vertexConfigs),  
            Collections.unmodifiableMap(chainedConfigs),  
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),  
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());  
    // 设置jobGraph的各个 JobVertex 的checkpoint 信息
    // 比如说source JobVertex 需求trigger checkpoint
    // 一切的JobVertex需求commit和ack checkpoint
    configureCheckpointing();
    // 设置保存点装备
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());  
    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =  
            JobGraphUtils.prepareUserArtifactEntries(  
                    streamGraph.getUserArtifacts().stream()  
                            .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),  
                    jobGraph.getJobID());  
    for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :  
            distributedCacheEntries.entrySet()) {  
        jobGraph.addUserArtifact(entry.getKey(), entry.getValue());  
    }  
    // 设置运行时装备信息
    try {  
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());  
    } catch (IOException e) {  
        throw new IllegalConfigurationException(  
                "Could not serialize the ExecutionConfig."  
                        + "This indicates that non-serializable types (like custom serializers) were registered");  
    }  
	// 回来JobGraph目标
    return jobGraph;  
}
/*---------------------------------------------------------*/
// 界说Flink-Job调度枚举类型
public enum JobType {  
	// 批处理形式
    BATCH,  
    // 流处理形式
	STREAMING  
}

从上面的剖析能够看出,由 StreamGraph 到 JobGraph 最重要的一步是创立算子链 setChaining(hashes, legacyHashes),这样做能够尽或许的多整合一些操作在同一个节点中完结,避免不必要的线程切换和网络通信。举一个简单一点的例子,DataStream.map(a -> a+1).filter(a -> a > 2),此刻数据流有两个处理过程,也就是两个算子组成,即 mapfilter,这两个算子会组成不同的 StreamNode 目标和 Task 目标,假如这两个 Task 不在一个 TaskSlot 或者一个 TM 中,那么必然涉及到网络传输,这样的履行功能会很差,为了优化这一点,Flink 引入了算子链的概念,一个算子链代表一组能够在同一个 TaskSlot 中履行的算子串。

/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创立JobVertex目标
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {  
	final Map<Integer, OperatorChainInfo> chainEntryPoints =  
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);  
    final Collection<OperatorChainInfo> initialEntryPoints =  
            chainEntryPoints.entrySet().stream()  
                    .sorted(Comparator.comparing(Map.Entry::getKey))  
                    .map(Map.Entry::getValue)  
                    .collect(Collectors.toList());  
    // 创立算子链
    for (OperatorChainInfo info : initialEntryPoints) {  
        createChain(  
                info.getStartNodeId(),  
                1, // 索引从1开端,0是Source
                info,  
                chainEntryPoints);  
    }  
}
// 创立算子链
private List<StreamEdge> createChain(
            final Integer currentNodeId,
            final int chainIndex,
            final OperatorChainInfo chainInfo,
            final Map<Integer, OperatorChainInfo> chainEntryPoints) {
		// 获取开端Node-ID
        Integer startNodeId = chainInfo.getStartNodeId();
        // builtVertices用于寄存现已进行构建的StreamNode ID,避免重复结构
        if (!builtVertices.contains(startNodeId)) {
			// transitiveOutEdges 存储整个算子链的出边
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
			// chainableOutputs 存储一切能够构成算子链的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // nonChainableOutputs 存储不能够构成算子链的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
			// 获取当时处理的SteamNode
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
			// 对一切的StreamEdge进行处理,分为能够构成算子链和不能够构成算子链两类
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
			// 假如是能够构成算子链的StreamEdge目标,递归调用createChain,并添加到transitiveOutEdges
			// 递归完毕条件:
			// 1. 当时节点不再有出边;
			// 2. 当时节点现已完结转换
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(
                                chainable.getTargetId(),
                                chainIndex + 1,
                                chainInfo,
                                chainEntryPoints));
            }
			// 假如是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(
                        nonChainable.getTargetId(),
                        1, // operators start at position 1 because 0 is for chained source inputs
                        chainEntryPoints.computeIfAbsent(
                                nonChainable.getTargetId(),
                                (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                        chainEntryPoints);
            }
			// 设置算子链称号
            chainedNames.put(
                    currentNodeId,
                    createChainedName(
                            currentNodeId,
                            chainableOutputs,
                            Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            // 设置算子链所需最小资源
            chainedMinResources.put(
                    currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置算子链所需最佳资源
            chainedPreferredResources.put(
                    currentNodeId,
                    createChainedPreferredResources(currentNodeId, chainableOutputs));
			// 
            OperatorID currentOperatorId =
                    chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId)
                        .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
			// 假如currentNodeId和startNodeId持平,阐明需求创立一个新的chain,会生成一个JobVertex
            StreamConfig config =
                    currentNodeId.equals(startNodeId)
                            ? createJobVertex(startNodeId, chainInfo)
                            : new StreamConfig(new Configuration());
			// 设置的极点特点到config中
            setVertexConfig(
                    currentNodeId,
                    config,
                    chainableOutputs,
                    nonChainableOutputs,
                    chainInfo.getChainedSources());
            if (currentNodeId.equals(startNodeId)) {
				// 开端一个新的算子链的衔接
                config.setChainStart();
                config.setChainIndex(chainIndex);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
				// 对于每一个算子链,把它和指向下一个算子链的出边衔接起来
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }
				// 
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
            } else {
                chainedConfigs.computeIfAbsent(
                        startNodeId, k -> new HashMap<Integer, StreamConfig>());
                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            config.setOperatorID(currentOperatorId);
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
        } else {
            return new ArrayList<>();
        }
    }
// 判别是否能够构成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {  
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);  
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);  
    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)  
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)  
            && (edge.getPartitioner() instanceof ForwardPartitioner)  
            && edge.getShuffleMode() != ShuffleMode.BATCH  
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()  
            && streamGraph.isChainingEnabled())) {  
        return false;  
    }  
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {  
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {  
            return false;  
        }  
    }  
    return true;  
}
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {  
    return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)  
            || (slotSharingGroup != null  
                    && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));  
}
private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(  
        final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {  
    final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();  
    final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();  
	// 遍历一切的Source-StreamNode
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {  
	    // 依据ID获取StreamNode目标
        final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);  
        if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory  
                && sourceNode.getOutEdges().size() == 1) {  
            final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);  
            final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());  
            final ChainingStrategy targetChainingStrategy =  
                    target.getOperatorFactory().getChainingStrategy();  
            if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES  
                    && isChainableInput(sourceOutEdge, streamGraph)) {  
                final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));  
                final StreamConfig.SourceInputConfig inputConfig =  
                        new StreamConfig.SourceInputConfig(sourceOutEdge);  
                final StreamConfig operatorConfig = new StreamConfig(new Configuration());  
                setVertexConfig(  
                        sourceNodeId,  
                        operatorConfig,  
                        Collections.emptyList(),  
                        Collections.emptyList(),  
                        Collections.emptyMap());  
                operatorConfig.setChainIndex(0); // sources are always first  
                operatorConfig.setOperatorID(opId);  
                operatorConfig.setOperatorName(sourceNode.getOperatorName());  
                chainedSources.put(  
                        sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));  
                final SourceOperatorFactory<?> sourceOpFact =  
                        (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();  
                final OperatorCoordinator.Provider coord =  
                        sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);  
                final OperatorChainInfo chainInfo =  
                        chainEntryPoints.computeIfAbsent(  
                                sourceOutEdge.getTargetId(),  
                                (k) ->  
                                        new OperatorChainInfo(  
                                                sourceOutEdge.getTargetId(),  
                                                hashes,  
                                                legacyHashes,  
                                                chainedSources,  
                                                streamGraph));  
                chainInfo.addCoordinatorProvider(coord);  
                continue;  
            }  
        }  
		// 将SourceID-OperatorChainInfo添加到HashMap中
        chainEntryPoints.put(  
                sourceNodeId,  
                new OperatorChainInfo(  
                        sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));  
    }  
    return chainEntryPoints;  
}
/*---------------------------------------------------------*/
/*--------------- ChainingStrategy ------------------*/
public enum ChainingStrategy {  
	// 最大程度衔接前后算子
    ALWAYS,  
    // 算子不会衔接前后的算子构成算子链
    NEVER,  
    // 算子只会衔接后面的算子可是不会衔接前面的算子
    HEAD,  
    // 头部算子,尽或许衔接多个source算子
    HEAD_WITH_SOURCES;  
	// 默认衔接战略是【ALWAYS】
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;  
}
/*--------------------------------------------------*/

简单总结一下 JobGraph 的构建过程,进口办法是 setChaining(),该办法会结构一个 Collection<OperatorChainInfo> 目标,该目标是一切 Source 节点的信息集合,遍历该集合调用 createChain() 办法,该办法会递归调用下流节点,构建算子链。在改办法中会对每一个 Operator 调用 isChainable 办法,将一切的出边分成两类:chainalbeOutputs 和 noChainableOutputs,递归遍历二者进行算子链的构建,同时将 StreamNode 的装备信息序列化到 StreamConfig 目标中,这里会有一个分支,假如当时节点是算子链的头结点,则会调用 createJobVertex 构建 JobVertex 目标和 JobEdge 目标相连;假如当时节点不是算子链的头节点,则构建一个新的 StreamConfig 目标。

能够构成算子链的依据是 isChainable 办法和 isChainableInput 办法,详细判别条件如下:

  1. 下流节点的前置节点只要一个;
  2. 分区器有必要是ForwardPartitioner
  3. Shuffle 形式有必要是 Pipeline 形式;
  4. 上下流的并行度有必要共同;
  5. StreamGraph 启用算子链优化;
  6. 上游算子的算子链战略有必要是【ALWAYS | HEAD | HEAD_WITH_SOURCES】;
  7. 下流算子的算子链战略有必要是【ALWAYS | 上游是 Source 算子的情况下 HEAD_WITH_SOURCES】;
  8. 上、下流算子都分配了 SlotSharingGroup 而且二者共同;

createJobVertex 办法中,首要创立一个 JobVertex 目标,然后调用 jobVertex.setInvokableClass() 设置履行类,然后设置运行资源和并行度。最后传递 JobVertex 目标的装备信息构建一个 StreamConfig 目标并回来。

遍历 transitiveOutEdges,调用 connect() 办法,在 connect 办法中,依据 StreamEdge 目标得到上下流 JobVertex 节点信息;通过 StreamEdge.getPartitioner() 办法得到 StreamPartitioner 特点,假如分区器的 isPointwise() 办法回来 True(ForwardPartitioner 和 RescalePartitioner 分区器都是由明确指向的),那么构建 DistributionPattern.POINTWISE 类型的 JobEdge 目标,其他的分区器构建 DistributionPattern.ALL_TO_ALL 类型的 JobEdge 目标,JobEdge 目标就是各个 JobVertex 之间的衔接目标,也就是说在 connect 办法中就完结了 JobGraph 的各个节点之间的衔接作业。

// connect办法
private void connect(Integer headOfChain, StreamEdge edge) {  
    physicalEdgesInOrder.add(edge);  
    Integer downStreamVertexID = edge.getTargetId();  
	// 获取算子链头JobVertex目标
    JobVertex headVertex = jobVertices.get(headOfChain);  
    // 获取下流JobVertex目标
    JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);  
    StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());  
    downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);  
	// 获取分区器
    StreamPartitioner<?> partitioner = edge.getPartitioner();  
	// 获取成果分区类型
    ResultPartitionType resultPartitionType;  
    switch (edge.getShuffleMode()) {  
        case PIPELINED:  
            resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;  
            break;  
        case BATCH:  
            resultPartitionType = ResultPartitionType.BLOCKING;  
            break;  
        case UNDEFINED:  
            resultPartitionType = determineResultPartitionType(partitioner);  
            break;  
        default:  
            throw new UnsupportedOperationException(  
                    "Data exchange mode " + edge.getShuffleMode() + " is not supported yet.");  
    }  
    checkAndResetBufferTimeout(resultPartitionType, edge);  
	// 依据分区器的不同构建JobEdge目标
    JobEdge jobEdge;  
    if (partitioner.isPointwise()) {  
        jobEdge =  
                downStreamVertex.connectNewDataSetAsInput(  
                        headVertex, DistributionPattern.POINTWISE, resultPartitionType);  
    } else {  
        jobEdge =  
                downStreamVertex.connectNewDataSetAsInput(  
                        headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);  
    }  
    // 设置战略称号,这些都能够在Web上看到 
    jobEdge.setShipStrategyName(partitioner.toString());  
    jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());  
    jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());  
	// 打印日志【分区器称号】-【算子链头】->【下流VertexID】
    if (LOG.isDebugEnabled()) {  
        LOG.debug(  
                "CONNECTED: {} - {} -> {}",  
                partitioner.getClass().getSimpleName(),  
                headOfChain,  
                downStreamVertexID);  
    }  
}

物理鸿沟的设置

setPhysicalEdges 办法用于设置 JobVertex 的物理鸿沟,履行办法总结如下:

  1. 遍历 physicalEdgesInOrder 目标,该目标包括一切不能构成算子链的边,将边的目标节点的入边添加到一个 List 目标中;
  2. 遍历一切的 physicalInEdgesInOrder,经过上面的过程,该目标的内部结构为不能构成算子链的边的下流节点 ID-入边集合,将该节点的入边结合都设置为实际物理鸿沟。
// 设置物理鸿沟 
private void setPhysicalEdges() {  
    Map<Integer, List<StreamEdge>> physicalInEdgesInOrder =  
            new HashMap<Integer, List<StreamEdge>>();  
    for (StreamEdge edge : physicalEdgesInOrder) {  
        int target = edge.getTargetId();  
        List<StreamEdge> inEdges =  
                physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());  
        inEdges.add(edge);  
    }  
    for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {  
        int vertex = inEdges.getKey();  
        List<StreamEdge> edgeList = inEdges.getValue();  
        vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);  
    }  
}

终究完结物理鸿沟划分的 JobGraph 如下图所示:

【Flink】详解JobGraph

JobGraph 的提交

JobGraph 的相关行为由ClusterClient接口进行界说,该接口界说了封装程序提交到远程集群的办法,要害源码剖析如下:

public interface ClusterClient<T> extends AutoCloseable {
	// 回来集群ID
	T getClusterId();
	// 回来Flink设置
	Configuration getFlinkConfiguration();
	// 封闭客户端此刻衔接的集群
	void shutDownCluster();
	// 获取一切的正在运行和完结的作业
	CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
	// 提交JobGraph给集群
	CompletableFuture<JobID> submitJob(JobGraph jobGraph);
	// 依据作业ID获取其运行状态
	CompletableFuture<JobResult> requestJobResult(JobID jobId);
	// 依据JobId触发保存点
	CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
	// 向协调器发出恳求并接纳回应
	CompletableFuture<CoordinationResponse> sendCoordinationRequest(  
        JobID jobId, OperatorID operatorId, CoordinationRequest request);
    // 依据JobID中止相关作业(仅适用于流式作业)
    // 发送中止指令后本质上仅仅Source中止发送数据,整个程序中止还需求一切的TM处理完当时数据
    // jobId:作业仅有标识符
    // advanceToEndOfEventTime:表明Source是否注入最大水位线
    CompletableFuture<String> stopWithSavepoint(  
        final JobID jobId,  
        final boolean advanceToEndOfEventTime,  
        @Nullable final String savepointDirectory);
    // 依据JobID获取Job成果
    CompletableFuture<JobResult> requestJobResult(JobID jobId);
    // 依据JobID获取累加器
    // jobId:作业仅有标识符
    // loader:用于反序列化
    CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
	// 依据JobID吊销作业
	CompletableFuture<Acknowledge> cancel(JobID jobId);
	...
}

也就是说 JobGraph 目标终究会由 ClusterClient#submitJob() 发送给集群,由 JM 的 JobMaster 进行接纳,之后的调用链路是 JobMaster#startJobExecution()JobMaster#startScheduling() 开端进行任务调集。

往期回顾

  1. 【Flink】详解StreamGraph
  2. 【Flink】浅谈Flink架构和调度
  3. 【Flink】详解Flink的八种分区
  4. 【Flink】浅谈Flink背压问题(1)
  5. 【分布式】浅谈CAP、BASE理论(1)

文中难免会出现一些描述不当之处(虽然我已重复查看屡次),欢迎在留言区指正,相关的常识点也可进行共享,期望我们都能有所收成!!