Flink 原始碼解析之從 Example 出發:讀懂叢集任務執行流程
關注可瞭解更多大資料相關的資訊。問題或建議,請公眾號留言;
如果你覺得深廣大資料Club對你有幫助,歡迎讚賞
本文主要講述Apache Flink在叢集模式下提交任務的執行流程原始碼分析。
本地模式任務提交原始碼解析可以參考上篇文章《Flink原始碼解析 | 從Example出發:讀懂本地任務執行流程》進行了解。
Apache Flink叢集模式任務提交執行流程入口與本地模式入口相同。
我們還是從SocketWindowWordCount入手.
任務程式碼入口
SocketWindowWordCount.run
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
getExecutionEnvironment()
判斷獲取對應的ExecutionEnvironment物件,這裡獲取的物件是StreamContextEnvironment。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); }
StreamContextEnvironment.execute()
@Override public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull(jobName, "Streaming Job name should not be null."); StreamGraph streamGraph = this.getStreamGraph(); streamGraph.setJobName(jobName); transformations.clear(); // execute the programs if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { return ctx .getClient() .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings()) .getJobExecutionResult(); } }
若不指定detached模式,則執行else程式碼塊中的程式碼
-
獲取ClusterClient物件,執行run方法
-
設定jar,classpath,classloader,savePointRestore
-
獲取JobExecutionResult
ClusterClient.java
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { ... if (prog.isUsingProgramEntryPoint()) { ... }else if (prog.isUsingInteractiveMode()) { ... } }
任務執行提交的時候判斷是使用互動模式還是使用程式入口點。
public boolean isUsingInteractiveMode() { return this.program == null; } public boolean isUsingProgramEntryPoint() { return this.program != null; }
判斷條件則是program是否為null。
從flink run入手
flink run指令碼呼叫CliFrontend.java類中的run方法。
我們執行run方法啟動程式,呼叫buildProgram()方法初給program賦值,之後傳遞給runProgram()繼續往下呼叫。
final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final RunOptions runOptions = new RunOptions(commandLine); final PackagedProgram program; try { LOG.info("Building program from JAR file"); program = buildProgram(runOptions); } ... runProgram(customCommandLine, commandLine, runOptions, program); ...
從以下程式碼可以看出,這裡buildProgram其實就是我們提交jar包的方式。
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException { String[] programArgs = options.getProgramArgs(); String jarFilePath = options.getJarFilePath(); List<URL> classpaths = options.getClasspaths(); if (jarFilePath == null) { throw new IllegalArgumentException("The program JAR file was not specified."); } File jarFile = new File(jarFilePath); // Check if JAR file exists if (!jarFile.exists()) { throw new FileNotFoundException("JAR file does not exist: " + jarFile); } else if (!jarFile.isFile()) { throw new FileNotFoundException("JAR file is not a file: " + jarFile); } // Get assembler class String entryPointClass = options.getEntryPointClassName(); PackagedProgram program = entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); return program; }
isUsingProgramEntryPoint
呼叫多層run方法後,最後呼叫StandaloneClusterClient的submit方法提交任務,返回JobSubmissionResult結果。
ClusterClient.java
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings); } ... public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings); return submitJob(job, classLoader); }
以上流程呼叫涉及到的兩個關鍵的run方法。
-
第一個run()方法
-
載入執行jar包中的主程式類
-
獲取優化計劃OptimizedPlan
-
提交到下一個run方法
-
第二個run()方法
-
建立JobGraph物件
-
提交任務並返回JobSubmissionResult(ClusterClient的submitJob()方法是一個抽象方法,這裡實際上是呼叫的StandaloneClusterClient的submitJob()方法)
StandaloneClusterClient.java
@Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { return super.runDetached(jobGraph, classLoader); } else { return super.run(jobGraph, classLoader); } }
我們在執行flink run命令的時候,若命令列新增 -d
指定,則會走 runDetached()
;否則,走 run()
run()
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorSystem actorSystem; try { actorSystem = actorSystemLoader.get(); } catch (FlinkException fe) { throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " + "JobManager.", jobGraph.getJobID(), fe); } try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); this.lastJobExecutionResult = JobClient.submitJobAndWait( actorSystem, flinkConfig, highAvailabilityServices, jobGraph, timeout, printStatusDuringExecution, classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e); } }
到這裡就會涉及到flink的ActorSystem,ActorSystem的分析後續文章再做講述。
上述邏輯包含如下步驟:
-
等待叢集狀態準備就緒
-
獲取ActorSystem例項
-
傳入actorSystem,flinkConfig以及jobGraph等引數,呼叫JobClient.submitJobAndWait()方法執行並等待任務返回結果JobExecutionResult
runDetached()
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); } catch (Exception e) { throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", jobGraph.getJobID(), e); } try { logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission."); JobClient.submitJobDetached( new AkkaJobManagerGateway(jobManagerGateway), flinkConfig, jobGraph, Time.milliseconds(timeout.toMillis()), classLoader); return new JobSubmissionResult(jobGraph.getJobID()); } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e); } }
runDetached流程與run的流程類似。不同的是runDetached採用的是ActorGateway,而run採用的是ActorSystem.
isUsingInteractiveMode
建立ContextEnvironmentFactory工廠物件,並通過factory.getLastEnvCreated()獲得DetachedEnvironment,並呼叫finalizeExecute方法。通過例項化的ClusterClient例項物件呼叫run方法,run方法再呼叫submit執行,返回JobSubmissionResult結果
這塊在這裡就不具體展開,有興趣的夥伴可以自己看下原始碼。
關注公眾號