【Flink】详解JobGraph
我们好,咱们的gzh是朝阳三只大明白,满满满是干货,共享近期的学习常识以及个人总结(包括读研和IT),跪求一波关注,期望和我们一起努力、进步!!
概述
JobGraph 是 StreamGraph 优化后的产品,客户端会将优化后的 JobGraph 发送给 JM。接下来的文章涉及到一些前置常识点,没有看前几期的小伙伴最好看一下前几期:
- 【Flink】详解StreamGraph
- 【Flink】浅谈Flink架构和调度
- 【Flink】详解Flink的八种分区
Flink 在客户端将 StreamGraph 目标转换成 JobGraph 目标,这个转换的中心在于将多个契合条件的 StreamNode 节点兼并在一起,构成一个 JobVertex 节点,这样的优化办法称之为算子链兼并,这样做能够有用减少数据在节点间传递所需的序列化、反序列化操作。同一个算子链中的算子运行在同一个 TaskSlot 中,也可由理解为运行在一个线程中,这样能够明显下降线程切换的功能开支,并且能增大吞吐量和下降延迟。
源码剖析
JobGraph 的构建
JobGraph 的相关代码主要在【flink-runtime】模块下的 org.apache.flink.runtime.JobGraph
中。其调用链路是 StreamGraph#getJobGraph
→ StreamingJobGraphGenerator#createJobGraph()
→
/*------------------------ StreamGraph ---------------------------*/
// 结构进口
public JobGraph getJobGraph() {
return getJobGraph(null);
}
public JobGraph getJobGraph(@Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
/*---------------------------------------------------------*/
/*--------------- StreamingJobGraphGenerator ------------------*/
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
private JobGraph createJobGraph() {
// 前置校验
preValidate();
// 获取StreamGraph的调度形式
// 设置JobGraph的调度形式
jobGraph.setJobType(streamGraph.getJobType());
// jobGraph设置是否发动本地近似康复战略
jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// 为每一个StreamNode生成一个确认的哈希值
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// 为兼容问题生成哈希值
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
// 这里是重点,JobGraph的极点和边在这个办法中创立。
// 测验将尽或许多的StreamNode聚合在一个JobGraph节点中。
// 判别算子chain,兼并创立JobVertex,并生成JobEdge。
setChaining(hashes, legacyHashes);
// 设置物理鸿沟
setPhysicalEdges();
// 设置jobGraph的SlotSharingGroup和CoLocationGroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
// 设置jobGraph的各个 JobVertex 的checkpoint 信息
// 比如说source JobVertex 需求trigger checkpoint
// 一切的JobVertex需求commit和ack checkpoint
configureCheckpointing();
// 设置保存点装备
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
JobGraphUtils.prepareUserArtifactEntries(
streamGraph.getUserArtifacts().stream()
.collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
jobGraph.getJobID());
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
distributedCacheEntries.entrySet()) {
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
}
// 设置运行时装备信息
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
} catch (IOException e) {
throw new IllegalConfigurationException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
}
// 回来JobGraph目标
return jobGraph;
}
/*---------------------------------------------------------*/
// 界说Flink-Job调度枚举类型
public enum JobType {
// 批处理形式
BATCH,
// 流处理形式
STREAMING
}
从上面的剖析能够看出,由 StreamGraph 到 JobGraph 最重要的一步是创立算子链 setChaining(hashes, legacyHashes)
,这样做能够尽或许的多整合一些操作在同一个节点中完结,避免不必要的线程切换和网络通信。举一个简单一点的例子,DataStream.map(a -> a+1).filter(a -> a > 2)
,此刻数据流有两个处理过程,也就是两个算子组成,即 map
和 filter
,这两个算子会组成不同的 StreamNode 目标和 Task 目标,假如这两个 Task 不在一个 TaskSlot 或者一个 TM 中,那么必然涉及到网络传输,这样的履行功能会很差,为了优化这一点,Flink 引入了算子链的概念,一个算子链代表一组能够在同一个 TaskSlot 中履行的算子串。
/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创立JobVertex目标
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// 创立算子链
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
info.getStartNodeId(),
1, // 索引从1开端,0是Source
info,
chainEntryPoints);
}
}
// 创立算子链
private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
// 获取开端Node-ID
Integer startNodeId = chainInfo.getStartNodeId();
// builtVertices用于寄存现已进行构建的StreamNode ID,避免重复结构
if (!builtVertices.contains(startNodeId)) {
// transitiveOutEdges 存储整个算子链的出边
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// chainableOutputs 存储一切能够构成算子链的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// nonChainableOutputs 存储不能够构成算子链的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 获取当时处理的SteamNode
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 对一切的StreamEdge进行处理,分为能够构成算子链和不能够构成算子链两类
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
// 假如是能够构成算子链的StreamEdge目标,递归调用createChain,并添加到transitiveOutEdges
// 递归完毕条件:
// 1. 当时节点不再有出边;
// 2. 当时节点现已完结转换
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
}
// 假如是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
// 设置算子链称号
chainedNames.put(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
// 设置算子链所需最小资源
chainedMinResources.put(
currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 设置算子链所需最佳资源
chainedPreferredResources.put(
currentNodeId,
createChainedPreferredResources(currentNodeId, chainableOutputs));
//
OperatorID currentOperatorId =
chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId)
.addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 假如currentNodeId和startNodeId持平,阐明需求创立一个新的chain,会生成一个JobVertex
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 设置的极点特点到config中
setVertexConfig(
currentNodeId,
config,
chainableOutputs,
nonChainableOutputs,
chainInfo.getChainedSources());
if (currentNodeId.equals(startNodeId)) {
// 开端一个新的算子链的衔接
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
// 对于每一个算子链,把它和指向下一个算子链的出边衔接起来
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
//
config.setOutEdgesInOrder(transitiveOutEdges);
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(
startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
// 判别是否能够构成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
return false;
}
for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
return false;
}
}
return true;
}
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)
|| (slotSharingGroup != null
&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
}
private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes) {
final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();
// 遍历一切的Source-StreamNode
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
// 依据ID获取StreamNode目标
final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);
if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
&& sourceNode.getOutEdges().size() == 1) {
final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
final ChainingStrategy targetChainingStrategy =
target.getOperatorFactory().getChainingStrategy();
if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
&& isChainableInput(sourceOutEdge, streamGraph)) {
final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
final StreamConfig.SourceInputConfig inputConfig =
new StreamConfig.SourceInputConfig(sourceOutEdge);
final StreamConfig operatorConfig = new StreamConfig(new Configuration());
setVertexConfig(
sourceNodeId,
operatorConfig,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap());
operatorConfig.setChainIndex(0); // sources are always first
operatorConfig.setOperatorID(opId);
operatorConfig.setOperatorName(sourceNode.getOperatorName());
chainedSources.put(
sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));
final SourceOperatorFactory<?> sourceOpFact =
(SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
final OperatorCoordinator.Provider coord =
sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
final OperatorChainInfo chainInfo =
chainEntryPoints.computeIfAbsent(
sourceOutEdge.getTargetId(),
(k) ->
new OperatorChainInfo(
sourceOutEdge.getTargetId(),
hashes,
legacyHashes,
chainedSources,
streamGraph));
chainInfo.addCoordinatorProvider(coord);
continue;
}
}
// 将SourceID-OperatorChainInfo添加到HashMap中
chainEntryPoints.put(
sourceNodeId,
new OperatorChainInfo(
sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
}
return chainEntryPoints;
}
/*---------------------------------------------------------*/
/*--------------- ChainingStrategy ------------------*/
public enum ChainingStrategy {
// 最大程度衔接前后算子
ALWAYS,
// 算子不会衔接前后的算子构成算子链
NEVER,
// 算子只会衔接后面的算子可是不会衔接前面的算子
HEAD,
// 头部算子,尽或许衔接多个source算子
HEAD_WITH_SOURCES;
// 默认衔接战略是【ALWAYS】
public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}
/*--------------------------------------------------*/
简单总结一下 JobGraph 的构建过程,进口办法是 setChaining()
,该办法会结构一个 Collection<OperatorChainInfo>
目标,该目标是一切 Source 节点的信息集合,遍历该集合调用 createChain()
办法,该办法会递归调用下流节点,构建算子链。在改办法中会对每一个 Operator 调用 isChainable
办法,将一切的出边分成两类:chainalbeOutputs 和 noChainableOutputs,递归遍历二者进行算子链的构建,同时将 StreamNode 的装备信息序列化到 StreamConfig 目标中,这里会有一个分支,假如当时节点是算子链的头结点,则会调用 createJobVertex
构建 JobVertex 目标和 JobEdge 目标相连;假如当时节点不是算子链的头节点,则构建一个新的 StreamConfig 目标。
能够构成算子链的依据是 isChainable
办法和 isChainableInput
办法,详细判别条件如下:
- 下流节点的前置节点只要一个;
- 分区器有必要是ForwardPartitioner;
- Shuffle 形式有必要是 Pipeline 形式;
- 上下流的并行度有必要共同;
- StreamGraph 启用算子链优化;
- 上游算子的算子链战略有必要是【ALWAYS | HEAD | HEAD_WITH_SOURCES】;
- 下流算子的算子链战略有必要是【ALWAYS | 上游是 Source 算子的情况下 HEAD_WITH_SOURCES】;
- 上、下流算子都分配了 SlotSharingGroup 而且二者共同;
在 createJobVertex
办法中,首要创立一个 JobVertex 目标,然后调用 jobVertex.setInvokableClass()
设置履行类,然后设置运行资源和并行度。最后传递 JobVertex 目标的装备信息构建一个 StreamConfig 目标并回来。
遍历 transitiveOutEdges
,调用 connect()
办法,在 connect
办法中,依据 StreamEdge 目标得到上下流 JobVertex 节点信息;通过 StreamEdge.getPartitioner()
办法得到 StreamPartitioner 特点,假如分区器的 isPointwise()
办法回来 True(ForwardPartitioner 和 RescalePartitioner 分区器都是由明确指向的),那么构建 DistributionPattern.POINTWISE 类型的 JobEdge 目标,其他的分区器构建 DistributionPattern.ALL_TO_ALL 类型的 JobEdge 目标,JobEdge 目标就是各个 JobVertex 之间的衔接目标,也就是说在 connect
办法中就完结了 JobGraph 的各个节点之间的衔接作业。
// connect办法
private void connect(Integer headOfChain, StreamEdge edge) {
physicalEdgesInOrder.add(edge);
Integer downStreamVertexID = edge.getTargetId();
// 获取算子链头JobVertex目标
JobVertex headVertex = jobVertices.get(headOfChain);
// 获取下流JobVertex目标
JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);
// 获取分区器
StreamPartitioner<?> partitioner = edge.getPartitioner();
// 获取成果分区类型
ResultPartitionType resultPartitionType;
switch (edge.getShuffleMode()) {
case PIPELINED:
resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
break;
case BATCH:
resultPartitionType = ResultPartitionType.BLOCKING;
break;
case UNDEFINED:
resultPartitionType = determineResultPartitionType(partitioner);
break;
default:
throw new UnsupportedOperationException(
"Data exchange mode " + edge.getShuffleMode() + " is not supported yet.");
}
checkAndResetBufferTimeout(resultPartitionType, edge);
// 依据分区器的不同构建JobEdge目标
JobEdge jobEdge;
if (partitioner.isPointwise()) {
jobEdge =
downStreamVertex.connectNewDataSetAsInput(
headVertex, DistributionPattern.POINTWISE, resultPartitionType);
} else {
jobEdge =
downStreamVertex.connectNewDataSetAsInput(
headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
}
// 设置战略称号,这些都能够在Web上看到
jobEdge.setShipStrategyName(partitioner.toString());
jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());
// 打印日志【分区器称号】-【算子链头】->【下流VertexID】
if (LOG.isDebugEnabled()) {
LOG.debug(
"CONNECTED: {} - {} -> {}",
partitioner.getClass().getSimpleName(),
headOfChain,
downStreamVertexID);
}
}
物理鸿沟的设置
setPhysicalEdges
办法用于设置 JobVertex 的物理鸿沟,履行办法总结如下:
- 遍历 physicalEdgesInOrder 目标,该目标包括一切不能构成算子链的边,将边的目标节点的入边添加到一个 List 目标中;
- 遍历一切的 physicalInEdgesInOrder,经过上面的过程,该目标的内部结构为不能构成算子链的边的下流节点 ID-入边集合,将该节点的入边结合都设置为实际物理鸿沟。
// 设置物理鸿沟
private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder =
new HashMap<Integer, List<StreamEdge>>();
for (StreamEdge edge : physicalEdgesInOrder) {
int target = edge.getTargetId();
List<StreamEdge> inEdges =
physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());
inEdges.add(edge);
}
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
int vertex = inEdges.getKey();
List<StreamEdge> edgeList = inEdges.getValue();
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
}
}
终究完结物理鸿沟划分的 JobGraph 如下图所示:
JobGraph 的提交
JobGraph 的相关行为由ClusterClient接口进行界说,该接口界说了封装程序提交到远程集群的办法,要害源码剖析如下:
public interface ClusterClient<T> extends AutoCloseable {
// 回来集群ID
T getClusterId();
// 回来Flink设置
Configuration getFlinkConfiguration();
// 封闭客户端此刻衔接的集群
void shutDownCluster();
// 获取一切的正在运行和完结的作业
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
// 提交JobGraph给集群
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
// 依据作业ID获取其运行状态
CompletableFuture<JobResult> requestJobResult(JobID jobId);
// 依据JobId触发保存点
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
// 向协调器发出恳求并接纳回应
CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId, OperatorID operatorId, CoordinationRequest request);
// 依据JobID中止相关作业(仅适用于流式作业)
// 发送中止指令后本质上仅仅Source中止发送数据,整个程序中止还需求一切的TM处理完当时数据
// jobId:作业仅有标识符
// advanceToEndOfEventTime:表明Source是否注入最大水位线
CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final boolean advanceToEndOfEventTime,
@Nullable final String savepointDirectory);
// 依据JobID获取Job成果
CompletableFuture<JobResult> requestJobResult(JobID jobId);
// 依据JobID获取累加器
// jobId:作业仅有标识符
// loader:用于反序列化
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
// 依据JobID吊销作业
CompletableFuture<Acknowledge> cancel(JobID jobId);
...
}
也就是说 JobGraph 目标终究会由 ClusterClient#submitJob()
发送给集群,由 JM 的 JobMaster 进行接纳,之后的调用链路是 JobMaster#startJobExecution()
→ JobMaster#startScheduling()
开端进行任务调集。
往期回顾
- 【Flink】详解StreamGraph
- 【Flink】浅谈Flink架构和调度
- 【Flink】详解Flink的八种分区
- 【Flink】浅谈Flink背压问题(1)
- 【分布式】浅谈CAP、BASE理论(1)
文中难免会出现一些描述不当之处(虽然我已重复查看屡次),欢迎在留言区指正,相关的常识点也可进行共享,期望我们都能有所收成!!