Flink 原始碼閱讀筆記(3)- ExecutionGraph 的生成
我們前面已經分析過StreamGraph
,JobGraph
的生成過程,這兩個執行圖都是在 client 端生成的。接下來我們將把目光頭投向 Flink Job 執行時排程層核心的執行圖 -ExecutionGraph
。
和StreamGraph
以及JobGraph
不同的是,ExecutionGraph
是在 JobManager 中生成的。 Client 向 JobManager 提交JobGraph
後, JobManager 就會根據JobGraph
來建立對應的ExecutionGraph
,並以此來排程任務。
本文不會介紹 JobMagage 的啟動及任務排程過程,後面將會在單獨的文章中進行分析。
核心概念
ExecutionJobVertex
在ExecutionGraph
中,節點對應的類是ExecutionJobVertex
,與之對應的就是JobGraph
中的JobVertex
。每一個ExexutionJobVertex
都是由一個JobVertex
生成的。
private final JobVertex jobVertex; private final List<OperatorID> operatorIDs; private final List<OperatorID> userDefinedOperatorIds; //ExecutionVertex 對應一個並行的子任務 private final ExecutionVertex[] taskVertices; private final IntermediateResult[] producedDataSets; private final List<IntermediateResult> inputs; private final int parallelism; private final SlotSharingGroup slotSharingGroup; private final CoLocationGroup coLocationGroup; private final InputSplit[] inputSplits; private int maxParallelism;
ExecutionVertex
ExexutionJobVertex
的成員變數中包含一個ExecutionVertex
陣列。我們知道,Flink Job 是可以指定任務的並行度的,在實際執行時,會有多個並行的任務同時在執行,對應到這裡就是ExecutionVertex
。ExecutionVertex
是並行任務的一個子任務,運算元的並行度是多少,那麼就會有多少個ExecutionVertex
。
private final ExecutionJobVertex jobVertex; private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions; private final ExecutionEdge[][] inputEdges; private final int subTaskIndex; private final EvictingBoundedList<ArchivedExecution> priorExecutions; private volatile CoLocationConstraint locationConstraint; /** The current or latest execution attempt of this vertex's task. */ private volatile Execution currentExecution;// this field must never be null
Execution
Execution
是對ExecutionVertex
的一次執行,通過ExecutionAttemptId
來唯一標識。
IntermediateResult
在JobGraph
中用IntermediateDataSet
表示JobVertex
的對外輸出,一個JobGraph
可能有 n(n >=0) 個輸出。在ExecutionGraph
中,與此對應的就是IntermediateResult
。
//對應的IntermediateDataSet的ID private final IntermediateDataSetID id; //生產者 private final ExecutionJobVertex producer; //對應ExecutionJobVertex的並行度 private final int numParallelProducers; private final IntermediateResultPartition[] partitions = new IntermediateResultPartition[numParallelProducers]; private final ResultPartitionType resultType;
由於ExecutionJobVertex
有 numParallelProducers 個並行的子任務,自然對應的每一個IntermediateResult
就有 numParallelProducers 個生產者,每個生產者的在相應的IntermediateResult
上的輸出對應一個IntermediateResultPartition
。IntermediateResultPartition
表示的是ExecutionVertex
的一個輸出分割槽,即:
ExecutionJobVertex -->IntermediateResult ExecutionVertex -->IntermediateResultPartition
一個ExecutionJobVertex
可能包含多個(n) 個IntermediateResult
, 那實際上每一個並行的子任務ExecutionVertex
可能會會包含(n) 個IntermediateResultPartition
。
IntermediateResultPartition
的生產者是ExecutionVertex
,消費者是一個或若干個ExecutionEdge
。
ExecutionEdge
ExecutionEdge
表示ExecutionVertex
的輸入,通過ExecutionEdge
將ExecutionVertex
和IntermediateResultPartition
連線起來,進而在不同的ExecutionVertex
之間建立聯絡。
private final IntermediateResultPartition source; private final ExecutionVertex target; private final int inputNum;
構建 ExecutionGraph 的流程
建立ExecutionGraph
的入口在ExecutionGraphBuilder#buildGraph()
中。
1. 建立 ExecutionGraph 物件並設定基本屬性
設定 JobInformation, SlotProvider 等資訊,下面羅列了一些比較重要的屬性:
/** Job specific information like the job id, job name, job configuration, etc. */ private final JobInformation jobInformation; /** The slot provider to use for allocating slots for tasks as they are needed. */ private final SlotProvider slotProvider; /** The classloader for the user code. Needed for calls into user code classes. */ private final ClassLoader userClassLoader; /** All job vertices that are part of this graph. */ private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks; /** All vertices, in the order in which they were created. **/ private final List<ExecutionJobVertex> verticesInCreationOrder; /** All intermediate results that are part of this graph. */ private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults; /** Current status of the job execution. */ private volatile JobStatus state = JobStatus.CREATED; /** Listeners that receive messages when the entire job switches it status * (such as from RUNNING to FINISHED). */ private final List<JobStatusListener> jobStatusListeners; /** Listeners that receive messages whenever a single task execution changes its status. */ private final List<ExecutionStatusListener> executionListeners;
2. JobVertex 初始化
JobVertex 在Master 上進行初始化,主要關注OutputFormatVertex
和InputFormatVertex
,其他型別的 vertex 在這裡沒有什麼特殊操作。File output format 在這一步準備好輸出目錄, Input splits 在這一步建立對應的 splits。
for (JobVertex vertex : jobGraph.getVertices()) { .... try { vertex.initializeOnMaster(classLoader); } catch (Throwable t) { throw new JobExecutionException(jobId, "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); } }
4. 生成 ExecutionGraph 內部的節點和連線
對所有的 Jobvertext 進行拓撲排序,並生成ExecutionGraph
內部的節點和連線
//topologically sort the job vertices and attach the graph to the existing one List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); if (log.isDebugEnabled()) { log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId); } executionGraph.attachJobGraph(sortedTopology);
4.1 對 JobVertex 進行拓撲排序
所謂拓撲排序,即保證如果存在 A -> B 的有向邊,那麼在排序後的列表中 A 節點一定在 B 節點之前。具體的演算法這裡不再詳細分析。
4.2 建立 ExecutionJobVertex
按照拓撲排序的結果依次為每個JobVertex
建立對應的ExecutionJobVertex
。
for (JobVertex jobVertex : topologiallySorted) { if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { this.isStoppable = false; } // create the execution job vertex and attach it to the graph //建立 ExecutionJobVertex ExecutionJobVertex ejv = new ExecutionJobVertex( this, jobVertex, 1, rpcTimeout, globalModVersion, createTimestamp); //連線上游節點 ejv.connectToPredecessors(this.intermediateResults); ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv); if (previousTask != null) { throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask)); } for (IntermediateResult res : ejv.getProducedDataSets()) { IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res); if (previousDataSet != null) { throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet)); } } this.verticesInCreationOrder.add(ejv); this.numVerticesTotal += ejv.getParallelism(); newExecJobVertices.add(ejv); }
在建立ExecutionJobVertex
的時候會建立對應的ExecutionVertex
,IntermediateResult
,ExecutionEdge
,IntermediateResultPartition
等物件,這裡涉及到的物件相對較多,概括起來大致是這樣的:
-
每一個
JobVertex
對應一個 ExecutionJobVertex, -
每一個
ExecutionJobVertex
有 parallelism 個ExecutionVertex
-
每一個
JobVertex
可能有 n(n>=0) 個IntermediateDataSet
,在ExecutionJobVertex
中,一個IntermediateDataSet
對應一個IntermediateResult
, 每一個IntermediateResult
都有 parallelism 個生產者, 對應 parallelism 個IntermediateResultPartition
-
每一個
ExecutionJobVertex
都會和前向的IntermediateResult
連線,實際上是ExecutionVertex
和IntermediateResult
建立連線,生成ExecutionEdge
5. 配置 state checkpointing (忽略)
從 ExecutionGraph 到實際執行的任務
ExecutionGraph
是在建立JobMaster
時就構建完成的,之後就可以被排程執行了。下面簡單概括下排程執行的流程,具體分析見後續的文章。
ExecutionGraph.scheduleForExecution
按照拓撲順序為所有的ExecutionJobVertex
分配資源,其中每一個ExecutionVertex
都需要分配一個 slot,ExecutionVertex
的一次執行對應一個Execution
,在分配資源的時候會依照SlotSharingGroup
和CoLocationConstraint
確定,分配的時候會考慮 slot 重用的情況。
在所有的節點資源都獲取成功後,會逐一呼叫Execution.deploy()
來部署Execution
, 使用TaskDeploymentDescriptor
來描述Execution
,並提交到分配給該 Execution 的 slot 對應的 TaskManager, 最終被分配給對應的TaskExecutor
執行。
小結
本文簡單概括了ExecutionGraph
涉及到的概念和其生成過程。
到目前為止,我們瞭解了StreamGraph
,JobGraph
和ExecutionGraph
的生成過程,以及他們內部的節點和連線的對應關係。總的來說,streamGraph
是最原始的,更貼近使用者邏輯的 DAG 執行圖;JobGraph
是對StreamGraph
的進一步優化,將能夠合併的運算元合併為一個節點以降低執行時資料傳輸的開銷;ExecutionGraph
則是作業執行是用來排程的執行圖,可以看作是並行化版本的JobGraph
,將 DAG 拆分到基本的排程單元。
-EOF-