首页 图片 查字 
所属分类:Flink
浏览:599
内容:

按照顺序阅读。


一、
+++shell脚本提交任务入口类+++
org.apache.flink.client.cli.CliFrontend

public static void main(final String[] args)
static int mainInternal(final String[] args)
public int parseAndRun(String[] args)
protected void run(String[] args)
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
      executeProgram(effectiveConfiguration, program);
}

+++构建PackagedProgram+++
getPackagedProgram(programOptions, effectiveConfiguration)
buildProgram(programOptions, effectiveConfiguration)
++++  .setEntryPointClassName(entryPointClass)

+++执行PackagedProgram+++
executeProgram(effectiveConfiguration, program)
ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
program.invokeInteractiveModeForExecution();

+++执行用户类的main方法+++
callMainMethod(mainClass, args);
mainMethod.invoke(null, (Object) args);

二、
+++用户类+++
main(String[] args)
streamExecutionEnvironment.execute(jobName);

+++构建StreamGraph+++
StreamGraph streamGraph = this.getStreamGraph();
this.execute(streamGraph);

+++构建JobGraph+++
JobClient jobClient = this.executeAsync(streamGraph);
PipelineExecutor executor = this.getPipelineExecutor();
CompletableFuture<JobClient> jobClientFuture = executor.execute(streamGraph, this.configuration, this.userClassloader);
实现类 AbstractJobClusterExecutor
public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader userCodeClassloader) throws Exception
JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(userClassloader, pipeline, configuration, executionConfigAccessor.getParallelism());
return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);
实现类 StreamGraphTranslator
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism)
return streamGraph.getJobGraph(this.userClassloader, (JobID)null);
return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
var4 = (new StreamingJobGraphGenerator(userClassLoader, streamGraph, jobID, serializationExecutor)).createJobGraph();
private JobGraph createJobGraph()

+++提交集群+++
JobClient jobClient = (JobClient)jobClientFuture.get();
实现类 AbstractJobClusterExecutor
ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
var10 = CompletableFuture.completedFuture(new ClusterClientJobClientAdapter(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));