Flink 原始碼閱讀筆記(2)- JobGraph 的生成
前面的文章我們介紹了 StreamGraph 的生成,這個實際上只對應 Flink 作業在邏輯上的執行計劃圖。Flink 會進一步對 StreamGraph 進行轉換,得到另一個執行計劃圖,即 JobGraph。
JobVertex
在 StreamGraph 中,每一個運算元(Operator) 對應了圖中的一個節點(StreamNode)。StreamGraph 會被進一步優化,將多個符合條件的節點串聯(Chain) 在一起形成一個節點,從而減少資料在不同節點之間流動所產生的序列化、反序列化、網路傳輸的開銷。多個運算元被 chain 在一起的形成的節點在JobGraph
中對應的就是JobVertex
。
每個JobVertex
中包含一個或多個 Operators。JobVertex
的主要成員變數包括
/** The ID of the vertex. */ private final JobVertexID id; /** The alternative IDs of the vertex. */ private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>(); /** The IDs of all operators contained in this vertex. */ private final ArrayList<OperatorID> operatorIDs = new ArrayList<>(); /** The alternative IDs of all operators contained in this vertex. */ private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>(); /** List of produced data sets, one per writer */ private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>(); /** List of edges with incoming data. One per Reader. */ private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>(); /** Number of subtasks to split this task into at runtime.*/ private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
其輸入是JobEdge
列表, 輸出是IntermediateDataSet
列表。
JobEdge
在StramGraph
中,StreamNode
之間是通過StreamEdge
建立連線的。在JobEdge
中,對應的是JobEdge
。
和StreamEdge
中同時保留了源節點和目標節點 (sourceId 和 targetId)不同,在JobEdge
中只有源節點的資訊。由於JobVertex
中儲存了所有輸入的JobEdge
的資訊,因而同樣可以在兩個節點之間建立連線。更確切地說,JobEdge
是和節點的輸出結果相關聯的,我們看下JobEdge
主要的成員變數:
/** The vertex connected to this edge. */ private final JobVertex target; /** The distribution pattern that should be used for this job edge. */ // DistributionPattern 決定了在上游節點(生產者)的子任務和下游節點(消費者)之間的連線模式 private final DistributionPattern distributionPattern; /** The data set at the source of the edge, may be null if the edge is not yet connected*/ private IntermediateDataSet source; /** The id of the source intermediate data set */ private IntermediateDataSetID sourceId; /** Optional name for the data shipping strategy (forward, partition hash, rebalance, ...), * to be displayed in the JSON plan */ private String shipStrategyName;
IntermediateDataSet
JobVertex
產生的資料被抽象為IntermediateDataSet
, 字面意思為中間資料集,這個很容易理解。前面提到,JobEdge
是和節點的輸出結果相關聯的,其實就是指可以把JobEdge
看作是IntermediateDataSet
的消費者,那麼JobVertex
自然就是生產者了。
private final IntermediateDataSetID id; // the identifier private final JobVertex producer;// the operation that produced this data set private final List<JobEdge> consumers = new ArrayList<JobEdge>(); // The type of partition to use at runtime private final ResultPartitionType resultType;
其中ResultPartitionType
表示中間結果的型別,說起來有點抽象,我們看下屬性就明白了:
/** Can the partition be consumed while being produced? */ private final boolean isPipelined; /** Does the partition produce back pressure when not consumed? */ private final boolean hasBackPressure; /** Does this partition use a limited number of (network) buffers? */ private final boolean isBounded;
這個要結合 Flink 任務執行時的記憶體管理機制來看,在後面的文章再進行分析。目前在 Stream 模式下使用的型別是PIPELINED_BOUNDED(true, true, true)
,上面的三個屬性都是 true。
StreamConfig
對於每一個StreamOperator
, 也就是StreamGraph
中的每一個StreamGraph
, 在生成JobGraph
的過程中StreamingJobGraphGenerator
都會建立一個對應的StreamConfig
。
StreamConfig
中儲存了這個運算元(operator) 在執行是需要的所有配置資訊,這些資訊都是通過 key/value 的形式儲存在Configuration
中的。例如:
//儲存StreamOperator資訊 public void setStreamOperator(StreamOperator<?> operator) { if (operator != null) { config.setClass(USER_FUNCTION, operator.getClass()); try { InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF); } catch (IOException e) { throw new StreamTaskException("Cannot serialize operator object " + operator.getClass() + ".", e); } } } public void setChainedOutputs(List<StreamEdge> chainedOutputs) { try { InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS); } catch (IOException e) { throw new StreamTaskException("Cannot serialize chained outputs.", e); } } public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) { try { InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS); } catch (IOException e) { throw new StreamTaskException("Cannot serialize non chained outputs.", e); } } public void setInPhysicalEdges(List<StreamEdge> inEdges) { try { InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES); } catch (IOException e) { throw new StreamTaskException("Cannot serialize inward edges.", e); } } //......
從 StreamGraph 到 JobGraph
從StreamGraph
到JobGraph
的轉換入口在StreamingJobGraphGenerator
中。
首先來看下StreamingJobGraphGenerator
的成員變數和入口函式:
//id -> JobVertex 的對應關係 private final Map<Integer, JobVertex> jobVertices; //已經構建的JobVertex的id集合 private final Collection<Integer> builtVertices; //物理邊集合(不包含chain內部的邊), 按建立順序排序 private List<StreamEdge> physicalEdgesInOrder; //儲存 operataor chain 的資訊,部署時用來構建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig) private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; //所有節點的配置資訊,id -> StreamConfig private Map<Integer, StreamConfig> vertexConfigs; //儲存每個節點的名字,id -> chainedName private Map<Integer, String> chainedNames; //用於計算hash值的演算法 private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; //..... private JobGraph createJobGraph() { // 排程模式,立即啟動 jobGraph.setScheduleMode(ScheduleMode.EAGER); // 廣度優先遍歷 StreamGraph 並且為每個SteamNode生成hash,hash值將被用於 JobVertexId 中 // 保證如果提交的拓撲沒有改變,則每次生成的hash都是一樣的 Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); // 主要的轉換邏輯,生成 JobVetex, JobEdge 等 setChaining(hashes, legacyHashes, chainedOperatorHashes); // 將每個JobVertex的輸入邊集合也序列化到該JobVertex的StreamConfig中 // (出邊集合已經在setChaining的時候寫入了) setPhysicalEdges(); // 根據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup // 以及針對 Iteration的頭尾設定CoLocationGroup setSlotSharingAndCoLocation(); // 配置 checkpoint configureCheckpointing(); // 新增使用者提供的自定義的檔案資訊 JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph); // 將 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中 try { jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }
StreamingJobGraphGenerator#createJobGraph
函式的邏輯也很清晰,首先為所有節點生成一個唯一的hash id,如果節點在多次提交中沒有改變(包括併發度、上下游等),那麼這個id就不會改變,這主要用於故障恢復。這裡我們不能用StreamNode.id
來代替,因為這是一個從 1 開始的靜態計數變數,同樣的 Job 可能會得到不一樣的 id。然後就是最關鍵的 chaining 處理,和生成JobVetex、JobEdge等。之後就是寫入各種配置相關的資訊。
我們先來看一下,Flink 是如何確定兩個 Operator 是否能夠被 chain 到同一個節點的:
//StreamEdge 兩端的節點是否能夠被 chain 到同一個 JobVertex 中 public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { //獲取到上游和下游節點 StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); //獲取到上游和下游節點具體的運算元 StreamOperator StreamOperator<?> headOperator = upStreamVertex.getOperator(); StreamOperator<?> outOperator = downStreamVertex.getOperator(); return downStreamVertex.getInEdges().size() == 1 //下游節點只有一個輸入 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一個slot共享組中 //上下游運算元的 chainning 策略,要允許chainning //預設的是 ALWAYS //在新增運算元時,也可以強制使用 disableChain 設定為 NEVER && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //上下游節點之間的資料傳輸方式必須是FORWARD,而不能是REBALANCE等其它模式 && (edge.getPartitioner() instanceof ForwardPartitioner) //上下游節點的並行度要一致 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled(); }
只要一條邊兩端的節點滿足上面的條件,那麼這兩個節點就可以被串聯在同一個JobVertex
中。接著來就來看最為關鍵的函式 setChaining 的邏輯:
/** * Sets up task chains from the source {@link StreamNode} instances. * * <p>This will recursively create all {@link JobVertex} instances. */ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); } } //構建 operator chain(可能包含一個或多個StreamNode),返回值是當前的這個 operator chain 實際的輸出邊(不包括內部的邊) //如果 currentNodeId != startNodeId, 說明當前節點在operator chain 的內部 private List<StreamEdge> createChain( Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, int chainIndex, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { if (!builtVertices.contains(startNodeId)) { //當前 operator chain 最終的輸出邊,不包括內部的邊 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); //將當前節點的出邊分為兩組,即 chainable 和 nonChainable for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { if (isChainable(outEdge, streamGraph)) { //判斷當前 StreamEdge 的上下游是否可以串聯在一起 chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } //對於chainable的輸出邊,遞迴呼叫,找到最終的輸出邊並加入到輸出列表中 for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } //對於 nonChainable 的邊 for (StreamEdge nonChainable : nonChainableOutputs) { //這個邊本身就應該加入到當前節點的輸出列表中 transitiveOutEdges.add(nonChainable); //遞迴呼叫,以下游節點為起點建立新的operator chain createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); } //用於儲存一個operator chain所有 operator 的 hash 資訊 List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); byte[] primaryHashBytes = hashes.get(currentNodeId); for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); } //當前節點的名稱,資源要求等資訊 chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); //如果當前節點是起始節點, 則直接建立 JobVertex 並返回 StreamConfig, 否則先建立一個空的 StreamConfig //createJobVertex 函式就是根據 StreamNode 建立對應的 JobVertex, 並返回了空的 StreamConfig StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); // 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中. // 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); if (currentNodeId.equals(startNodeId)) { // 如果是chain的起始節點。(不是chain中的節點,也會被標記成 chain start) config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); //把實際的輸出邊寫入配置, 部署時會用到 config.setOutEdgesInOrder(transitiveOutEdges); //operator chain 的頭部 operator 的輸出邊,包括內部的邊 config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); // 將當前節點(headOfChain)與所有出邊相連 for (StreamEdge edge : transitiveOutEdges) { // 通過StreamEdge構建出JobEdge,建立IntermediateDataSet,用來將JobVertex和JobEdge相連 connect(startNodeId, edge); } // 將operator chain中所有子節點的 StreamConfig 寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中 config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { //如果是 operator chain 內部的節點 Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId); if (chainedConfs == null) { chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>()); } config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); // 將當前節點的 StreamConfig 新增所在的 operator chain 的 config 集合中 chainedConfigs.get(startNodeId).put(currentNodeId, config); } //設定當前 operator 的 OperatorID config.setOperatorID(new OperatorID(primaryHashBytes)); if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); } }
上述過程實際上就是通過 DFS 遍歷所有的StreamNode
, 並按照 chainable 的條件不停地將可以串聯的呃 operator 放在同一個的 operator chain 中。每一個StreamNode
的配置資訊都會被序列化到對應的StreamConfig
中。只有 operator chain 的頭部節點會生成對應的JobVertex
,一個 operator chain 的所有內部節點都會以序列化的形式寫入頭部節點的CHAINED_TASK_CONFIG
配置項中。
每一個 operator chain 都會為所有的實際輸出邊建立對應的JobEdge
,並和JobVertex
連線:
private void connect(Integer headOfChain, StreamEdge edge) { physicalEdgesInOrder.add(edge); Integer downStreamvertexID = edge.getTargetId(); //上下游節點 JobVertex headVertex = jobVertices.get(headOfChain); JobVertex downStreamVertex = jobVertices.get(downStreamvertexID); StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); //下游節點增加一個輸入 downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); StreamPartitioner<?> partitioner = edge.getPartitioner(); JobEdge jobEdge; //建立 JobEdge 和 IntermediateDataSet //根據StreamPartitioner型別決定在上游節點(生產者)的子任務和下游節點(消費者)之間的連線模式 if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED_BOUNDED); } // set strategy name so that web interface can show it. jobEdge.setShipStrategyName(partitioner.toString()); if (LOG.isDebugEnabled()) { LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), headOfChain, downStreamvertexID); } }
小結
本文分析了從StreamGraph
到JobGraph
之間的轉換過程。JobGraph
的關鍵在於將多個StreamNode
優化為一個JobVertex
, 對應的StreamEdge
則轉化為JobEdge
, 並且JobVertex
和JobEdge
之間通過IntermediateDataSet
形成一個生產者和消費者的連線關係。
-EOF-