Java教程

Flink源码解读(二):JobGraph源码解读

本文主要是介绍Flink源码解读(二):JobGraph源码解读,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

JobGraph源码解读

JobGraph生成过程

入口函数

createJobGraph函数

参考


JobGraph源码解读

上回说到,StreamGraph的源码其中是在客户端生成,并且是生成Node节点和Edge,主要是通过StreamAPI生成,表示拓扑结构,这次给大家讲讲JobGraph的生成(以Yarn集群模式)。

首先,JobGraph是基于StreamGraph进行优化(包括设置Checkpoint、slot分组策略,内存占比等),最主要是将多个符合条件的StreamNode链接chain在一起作为一个节点,减少数据在节点之间的流动所需要的序列化、反序列化、传输的消耗

简单讲一下JobGraph的过程,将符合条件的Operator算子组合成ChainableOperator,生成对应的JobVertex、InermediateDataSet和JobEdge等,并且通过JobEdge连接上IntermediateDataSet和JobVertex,这里只是生成粗粒度的用户代码逻辑结构(如数据结构),真正的数据是在后续生成Task时构造的ResultSubPartition和InputGate才会交互用户的物理数据。

JobGraph生成过程

JobGraph的生成入口是StreamingJobGraphGenerator.createJobGraph(this, jobID),最终调用StreamingJobGraphGenerator.createJobGraph()。

入口函数

入口函数调用的过程:executeAsync(生成YarnJobClusterExecutorFactory)->execute(生成JobGraph,并向集群发布部署任务)->getJobGraph(根据Pipeline类型生成离线planTranslator或者实时的streamGraphTranslator)->createJobGraph(生成StreamingJobGraphGenerator实例并创建JobGraph)

@Internal
	public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
		checkNotNull(streamGraph, "StreamGraph cannot be null.");
		checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
 
		//调用DefaultExecutorServiceLoader生成YarnJobClusterExecutorFactory
		final PipelineExecutorFactory executorFactory =
			executorServiceLoader.getExecutorFactory(configuration);
 
		checkNotNull(
			executorFactory,
			"Cannot find compatible factory for specified execution.target (=%s)",
			configuration.get(DeploymentOptions.TARGET));
 
		//生成YarnJobClusterExecutor调用生成JobGraph后向集群提交任务资源申请
		CompletableFuture<JobClient> jobClientFuture = executorFactory
			.getExecutor(configuration) //new YarnJobClusterExecutor
			.execute(streamGraph, configuration, userClassloader);
 
            ........
}
 
@Override
	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
		//生成JobGraph
		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
 
		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 
			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 
			//开始向集群发布部署任务
			final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
					.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
 
			//启动异步可回调线程,返会完成的部署任务
			return CompletableFuture.completedFuture(
					new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
		}
	}
 
 
public static JobGraph getJobGraph(
			Pipeline pipeline,
			Configuration optimizerConfiguration,
			int defaultParallelism) {
 
		//根据Pipeline类型生成离线planTranslator或者实时的streamGraphTranslator
		FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
 
		return pipelineTranslator.translateToJobGraph(pipeline,
				optimizerConfiguration,
				defaultParallelism);
	}
 
//生成StreamingJobGraphGenerator实例并创建JobGraph并
	public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
		
		return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
	}

createJobGraph函数

在StreamingJobGraphGenerator生成器当中,基本上所有的成员变量都是为了辅助生成最终的JobGraph。

其中createJobGraph函数的过程:首先为所有节点都生成一个唯一的hash id,这个哈希函数可以用户进行自己定义,如果节点在多次提交中没有改变(如组、并发度、上下游关系等),那么这个hash id就不会改变,这个主要是用于故障恢复然后在chaining处理、生成JobVetex、JobEdge等,之后就是写入各种配置信息例如缓存、checkpoints等。

public class StreamingJobGraphGenerator {
  private StreamGraph streamGraph;
  private JobGraph jobGraph;
  // id -> JobVertex
  private Map<Integer, JobVertex> jobVertices;
  // 已经构建的JobVertex的id集合
  private Collection<Integer> builtVertices;
  // 物理边集合(排除了chain内部的边), 按创建顺序排序
  private List<StreamEdge> physicalEdgesInOrder;
  // 保存chain信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
  private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
  // 所有节点的配置信息,id -> StreamConfig
  private Map<Integer, StreamConfig> vertexConfigs;
  // 保存每个节点的名字,id -> chainedName
  private Map<Integer, String> chainedNames;
  
  // 构造函数,入参只有 StreamGraph
  public StreamingJobGraphGenerator(StreamGraph streamGraph) {
    this.streamGraph = streamGraph;
  }
  private JobGraph createJobGraph() {
		preValidate();
 
		// make sure that all vertices start immediately
		// 第一步:
        // 非batch模式默认为EAGER
		jobGraph.setScheduleMode(streamGraph.getScheduleMode());
 
		// Generate deterministic hashes for the nodes in order to identify them across
		// submission if they didn't change.
        // 第二步 确定节点的hash值
		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
 
		// Generate legacy version hashes for backwards compatibility
        // 第三步 确定节点用户自定义userHash值
		List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
		for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
		}
        // 第四步 构建链接
		setChaining(hashes, legacyHashes);
 
        // 第五步 JobVertex对应输入InphysicalEdge设为该物理StreamEdge
		//保存产生所有JobEdge时对应的StreamEdge
		setPhysicalEdges();
         
        // 第六步设置是否在一个slot执行task
		//设置slot分组策略
		setSlotSharingAndCoLocation();
 
		//设置缓存容量
		setManagedMemoryFraction(
			Collections.unmodifiableMap(jobVertices),
			Collections.unmodifiableMap(vertexConfigs),
			Collections.unmodifiableMap(chainedConfigs),
			id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
			id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
 
		//配置checkpoint属性
		configureCheckpointing();
 
		//配置savepoint属性
		jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
 
		//配置用户自定义文件
		JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
 
		// set the ExecutionConfig last when it has been finalized
		try {
			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
		}
		catch (IOException e) {
			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
					"This indicates that non-serializable types (like custom serializers) were registered");
		}
 
		return jobGraph;
	}
 
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
		// we separate out the sources that run as inputs to another operator (chained inputs)
		// from the sources that needs to run as the main (head) operator.
 
		// 遍历并缓存所有的sourceNode以及生成后面用作checkpoint的SourceCoordinatorProvider
		final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
		final Collection<OperatorChainInfo> initialEntryPoints = new ArrayList<>(chainEntryPoints.values());
 
		// iterate over a copy of the values, because this map gets concurrently modified
		for (OperatorChainInfo info : initialEntryPoints) {
			createChain(
					info.getStartNodeId(),
					1,  // operators start at position 1 because 0 is for chained source inputs
					info,
					chainEntryPoints);
		}
	}

解释一下上面的步骤:

第一步:JobGraph启动模式

  • EAGER:所有节点立即启动
  • LAZY_FROM_SOURCES:懒加载模式,默认方式,所有输入条件准备好了再启动

第二步、第三步:确定节点的hash值

若Flink任务失败了,各个算子是能够从checkpoint中恢复到失败前的状态的,恢复的时候的依据就是JobVertexID(hash值)进行状态恢复。相同的任务在恢复的时候要求算子的hash值不变,因此能够获得对应的状态进行恢复。

其中,Set<Integer> visited = new HashSet<>();记录已经生成hash值的节点,Queue<StreamNode> remaining = new ArrayDeque<>();记录剩余没有生成hash值的节点。
下面的while循环就是广度遍历法,构建整个节点的hash值。currentNode = remaining.poll()获取要构建hash值的节点,generateNodeHash(currentNode, hashFunction, hashes,streamGraph.isChainingEnabled(), streamGraph)生产hash值的方法,下面的for循环,获取该节点的所有的输出节点,并加入remaining和visited集合中,再次遍历,直到remaining没有值。

第四步:构建链接

构建之前,先判断是否可以连接。isChainable函数进行判断。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperator<?> headOperator = upStreamVertex.getOperator();
    StreamOperator<?> outOperator = downStreamVertex.getOperator();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();
}

共有9个条件:

  1. 下游节点只有一个输入
  2. 下游节点的操作符不为null
  3. 上游节点的操作符不为null
  4. 上下游节点在一个槽位共享组内
  5. 下游节点的连接策略是 ALWAYS
  6. 上游节点的连接策略是 HEAD 或者 ALWAYS
  7. edge的分区函数是ForwardPartitioner的实例
  8. 上下游节点的并行度相等
  9. 可以进行节点连接操作

链接的过程:

进入setChain函数之后,再调用createChain函数进行具体的生成。该函数参数sourceNodeId和currentNodeId,如果currentNodeId的输出边是不可合并边,则他们之间所有的StreamNode将构建JobGraph顶点,currentNodeId的边将构建JobGraph的边。

  • transitiveOutEdges:获取不可以合并的边,也就是最开始怎么构建JobGraph的边,即获取filter->keyBy边,也就是上图的问题3。
  • chainableOutputs:需要合并的边,例如source->filter边
  • nonChainableOutputs:不可以合并的边, 例如filter->keyBy边
private List<StreamEdge> createChain(
			final Integer currentNodeId,
			final int chainIndex,
			final OperatorChainInfo chainInfo,
			final Map<Integer, OperatorChainInfo> chainEntryPoints) {
 
		Integer startNodeId = chainInfo.getStartNodeId();
		//遍历未解索过的streamNode(createJobVertex后builtVertices会加入当前nodeId避免后续重复调用)
		if (!builtVertices.contains(startNodeId)) {
 
			//不可chain的edges
			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
 
			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
 
			StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
 
			//当前streamNode可能存在多个outEdges
			for (StreamEdge outEdge : currentNode.getOutEdges()) {
				//判断是否能chain
				if (isChainable(outEdge, streamGraph)) {
					chainableOutputs.add(outEdge);
				} else {
					nonChainableOutputs.add(outEdge);
				}
			}
 
			//若上面streamNode可链则继续循环调用createChain
			//此时获取下一个streamNodeId重复上述步骤并addAll所有不可链的edge用作后面connect直至当前的streamNode的outEdge不可链
			for (StreamEdge chainable : chainableOutputs) {
				transitiveOutEdges.addAll(
						createChain(chainable.getTargetId(), chainIndex + 1, chainInfo, chainEntryPoints));
			}
 
			//走到这里 分为两种情况:
			//一:当前streamNode的outEdge不可链 但是下游有streamNode
			//二:当前streamNode的outEdge不可链 下游无streamNode,属于end stream node
			//三:当前streamNode的outEdge不可链 下游已经创建完JobVertex(transitiveOutEdges.addAll(createChain....)循环调用完毕)
			for (StreamEdge nonChainable : nonChainableOutputs) {
				//进入这里属于outEdge不可链 但是下游有streamNode,先缓存起来,使用下个streamNode继续调用createChain,并且初始化chainIndex为1(因为下个streamNode是链头
				transitiveOutEdges.add(nonChainable);
				createChain(
						nonChainable.getTargetId(),
						1, // operators start at position 1 because 0 is for chained source inputs
						chainEntryPoints.computeIfAbsent(
							nonChainable.getTargetId(),
							(k) -> chainInfo.newChain(nonChainable.getTargetId())),
						chainEntryPoints);
			}
 
			//代码第一次走到这里说明createChain遍历到最后一个streamNode,所以从这里开始(反向)创建JobVertex
			//首先遍历出chainableOutputs 并merge当前链中所有StreamNode的资源(比如cpuCores,HeapMemory等)
			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
			chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
			chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
 
			OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
 
			//缓存所有inputFormat(比如jdbcinputformat,hbaseInputFormat等)
			if (currentNode.getInputFormat() != null) {
				getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
			}
 
			//同上
			if (currentNode.getOutputFormat() != null) {
				getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
			}
 
			//如果属于起始节点则生成JobVertex,主要为以下三步骤
			//1:创建JobVertex(如果是inputFormat或者outputFormat类型则是InputOutputFormatVertex)
			//2:设置并行度,资源等属性
			//3:缓存进StreamingJobGraphGenerator中
			StreamConfig config = currentNodeId.equals(startNodeId)
					? createJobVertex(startNodeId, chainInfo)
					: new StreamConfig(new Configuration());
 
			setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs, chainInfo.getChainedSources());
 
			//跟上面生成JobVertex一样 只有当前operator是head时候 才生成生成IntermediateDataSet 和 JobEdge 并跟下游相连
			if (currentNodeId.equals(startNodeId)) {
 
				config.setChainStart();
				config.setChainIndex(chainIndex);
				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
 
				//遍历出不可链的edge,生成IntermediateDataSet 和 JobEdge 并跟当前currentNodeId相连
				for (StreamEdge edge : transitiveOutEdges) {
					connect(startNodeId, edge);
				}
 
				config.setOutEdgesInOrder(transitiveOutEdges);
				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
 
			} else {
				chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
 
				config.setChainIndex(chainIndex);
				StreamNode node = streamGraph.getStreamNode(currentNodeId);
				config.setOperatorName(node.getOperatorName());
				chainedConfigs.get(startNodeId).put(currentNodeId, config);
			}
 
			config.setOperatorID(currentOperatorId);
 
			if (chainableOutputs.isEmpty()) {
				config.setChainEnd();
			}
			return transitiveOutEdges;
 
		} else {
			return new ArrayList<>();
		}
	}

在89行左右,有个connect函数,其中链接JobVertex的逻辑。

private void connect(Integer headOfChain, StreamEdge edge) {
 
		....
 
		ResultPartitionType resultPartitionType;
		//根据outEdge的shuffleMode类型生成不同的ResultPartition
		//ResultPartition的类型决定后面的数据交互中的流或批模式以及是否可做反压和基于Credit的通信模式(后面会做分析)
		//默认是UNDEFINED,此时将由partitioner类型和GlobalDataExchangeMode决定
		switch (edge.getShuffleMode()) {
			case PIPELINED:
				resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
				break;
			case BATCH:
				resultPartitionType = ResultPartitionType.BLOCKING;
				break;
			case UNDEFINED:
				resultPartitionType = determineResultPartitionType(partitioner);
				break;
			default:
				throw new UnsupportedOperationException("Data exchange mode " +
					edge.getShuffleMode() + " is not supported yet.");
		}
 
		checkAndResetBufferTimeout(resultPartitionType, edge);
 
		//创建JobEdge,上游为IntermediateDataSet,下游为JobVertex
		//IntermediateDataSet只是维护了它的producer和consumers的队列
		JobEdge jobEdge;
		if (isPointwisePartitioner(partitioner)) {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
				headVertex,
				DistributionPattern.POINTWISE,
				resultPartitionType);
		} else {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
					headVertex,
					DistributionPattern.ALL_TO_ALL,
					resultPartitionType);
		}
		// set strategy name so that web interface can show it.
		jobEdge.setShipStrategyName(partitioner.toString());
 
		if (LOG.isDebugEnabled()) {
			LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
					headOfChain, downStreamVertexID);
		}
	}
 
public JobEdge connectNewDataSetAsInput(
			JobVertex input,
			DistributionPattern distPattern,
			ResultPartitionType partitionType) {
 
		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
 
        //创建JobEdge
		JobEdge edge = new JobEdge(dataSet, this, distPattern);
		this.inputs.add(edge);
		dataSet.addConsumer(edge);
		return edge;
	}
 
public IntermediateDataSet createAndAddResultDataSet(
			IntermediateDataSetID id,
			ResultPartitionType partitionType) {
 
		//只是维护了当前JobVertex所有outEdge和ResultPartitionType
		IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
		//后面在创建JobVertex的时候会根据results.size生成相同数量的IntermediateResult
		this.results.add(result);
		return result;
	}

对所有边进行分组的代码

for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
    if (isChainable(outEdge, streamGraph)) {
        chainableOutputs.add(outEdge);
    } else {
        nonChainableOutputs.add(outEdge);
    }
}

递归调用,找到可以合并的边,进行合并。

for (StreamEdge chainable : chainableOutputs) {
    transitiveOutEdges.addAll(
            createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}

如果不可合并边,有输出节点,则继续递归调用,直到整个拓扑结构结束。

for (StreamEdge nonChainable : nonChainableOutputs) {
    transitiveOutEdges.add(nonChainable);
    createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}

具体的流程是:上面的transitiveOutEdges.add(nonChainable);就是找到不可合并边,也就是从startNodeId到currentNodeId,中间的所有的边都是可以合并的,可以构造一个新的JobGraph节点。不可合并边的输出节点,将重新作为startNodeId节点,向下遍历。从上面两个递归可以发现,构建JobGraph其实从后面往前构建的

然后获取JobGraph节点中合并的StreamNode对应hash值,这些节点都联合在一起构建一个顶点

List<Tuple2<byte[], byte[]>> operatorHashes =
    chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

byte[] primaryHashBytes = hashes.get(currentNodeId);

for (Map<Integer, byte[]> legacyHash : legacyHashes) {
    operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}

接着获取链式名称,如果不可合并边或者拓扑尾部节点,则名称中断

chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));

具体是创建节点还是合并节点则根据equals(startNodeId)函数判断

如果当前节点就是开始节点,说明递归已经回来了,下游所有要连接的节点已经获取,则这个连接上的所有的StreamNode节点要联合构建一个新的JobGraph节点,所以JobGraph的节点需要保存这些StreamNode的hash值。如果不是,则说明当前节点是一个内部节点,直接初始化配置,不需要构建JobGraph节点

 StreamConfig config = currentNodeId.equals(startNodeId)? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes): new StreamConfig(new Configuration());

然后把StreamNode节点的信息转移到JobGraph中

setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

如果是JobGraph的节点,说明遇到了不可合并边,则要构建JobGraph的边。构建边的不同方式,并把边作为下游JobGraph的输入边this.inputs.add(edge);

for (StreamEdge edge : transitiveOutEdges) {
    connect(startNodeId, edge);
}
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
				headVertex,
				DistributionPattern.POINTWISE,
				ResultPartitionType.PIPELINED_BOUNDED);
		} else {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(
					headVertex,
					DistributionPattern.ALL_TO_ALL,
					ResultPartitionType.PIPELINED_BOUNDED);
		}

如果不是JobGraph的节点,则把该SteamNode的信息,放入startNodeId集合中,最终放入JobGraph节点的config中,并且每一个节点的hash值是唯一的,放入每个节点的配置中。

顶点的构造的具体过程

通过构建顶点就是把StreamNode(source,filter)组合成JobVertex(source->filter)的过程。其中构建JobVertex的主要是新生的节点要获取对应的StreamNode配置信息,组合成一个新节点。而除了StartNode,其他的StreamNode的任务在chainedConfigs中

//获取StreamNode
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
//获取对应hash值
byte[] hash = hashes.get(streamNodeId);

if (hash == null) {
    throw new IllegalStateException("Cannot find node hash. " +
            "Did you generate them before calling this method?");
}
//初始化
JobVertexID jobVertexId = new JobVertexID(hash);
//初始化用户定义hash
List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
    hash = legacyHash.get(streamNodeId);
    if (null != hash) {
        legacyJobVertexIds.add(new JobVertexID(hash));
    }
}
//获取合并的StreamNode对应的hash值和对应的用户定义的hash值
List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
if (chainedOperators != null) {
    for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
        chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
        userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
    }
}
构建JobVertex
if (streamNode.getInputFormat() != null) {
			jobVertex = new InputFormatVertex(
					chainedNames.get(streamNodeId),
					jobVertexId,
					legacyJobVertexIds,
					chainedOperatorVertexIds,
					userDefinedChainedOperatorVertexIds);
			TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
			taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
		} else {
			jobVertex = new JobVertex(
					chainedNames.get(streamNodeId),
					jobVertexId,
					legacyJobVertexIds,
					chainedOperatorVertexIds,
					userDefinedChainedOperatorVertexIds);
		}
        
...
设置该节点的工作内容
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
//加入JobGraph
jobVertices.put(streamNodeId, jobVertex);
builtVertices.add(streamNodeId);
jobGraph.addVertex(jobVertex);

其他StreamNode的任务。

Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

if (chainedConfs == null) {
    chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);

构建JobGraph的边的过程

构建的边放入下游顶点inputs中,边的目的节点是JobVertex target,但是source却是IntermediateDataSet,而它是headVertex产生的,所以过程是:headVertex->IntermediateDataSet->target

public JobEdge connectNewDataSetAsInput(
			JobVertex input,
			DistributionPattern distPattern,
			ResultPartitionType partitionType) {

		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

		JobEdge edge = new JobEdge(dataSet, this, distPattern);
		this.inputs.add(edge);
		dataSet.addConsumer(edge);
		return edge;
	}

边的构造函数。

public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern) {
if (source == null || target == null || distributionPattern == null) {
    throw new NullPointerException();
}
this.target = target;
this.distributionPattern = distributionPattern;
this.source = source;
this.sourceId = source.getId();
}

最后,总结就是:StreamNode 转成 JobVertex,StreamEdge 转成 JobEdge,JobEdge 和 JobVertex 之间创建 IntermediateDataSet 来连接。关键点在于将多个 SteamNode chain 成一个 JobVertex的过程。

参考

Flink1.12源码解读——JobGraph执行图构建过程_ws0owws0ow的博客-CSDN博客

flink的JobGraph生成源码分析_陪你一起捡蛋壳的博客-CSDN博客

这篇关于Flink源码解读(二):JobGraph源码解读的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!