启动入口
CliFrontend.main -> cli.parseParameters -> ACTION_RUN run(params); -> executeProgram -> invokeInteractiveModeForExecution -> callMainMethod(){ mainMethod = entryClass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); } --> SocketWindowWordCount.main(){ /************************************************* * TODO * 注释: 解析 host 和 port */ // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch(Exception e) { return; } /************************************************* * TODO * 注释: 获取 StreamExecutionEnvironment * 它呢,还是 跟 Spark 中的 SparkContext 还是有区别的! */ // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /************************************************* * TODO * 注释: 加载数据源得到数据抽象:DataStream * 其实最终,只是创建了一个 DataStreamSource 对象,然后把 SourceFunction(StreamOperator)和 StreamExecutionEnvironment * 设置到了 DataStreamSource 中, DataStreamSource 是 DataStream 的子类 * - * DataStream 的主要分类: * DataStreamSource 流数据源 * DataStreamSink 流数据目的地 * KeyedStream 按key分组的数据流 * DataStream 普通数据流 * - * 关于函数理解: * Function 传参 * Operator Graph 中抽象概念 * Transformation 一种针对流的逻辑操作 * 最终: Function ---> Operator ---> Transformation */ // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text // TODO 注释: 讲算子生成 Transformation 加入到 Env 中的 transformations 集合中 .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for(String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) // TODO 注释: 依然创建一个 DataStream(KeyedStream) .keyBy(value -> value.word) .timeWindow(Time.seconds(5)) // TODO 注释: .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); /************************************************* * TODO * 注释: 提交执行 */ env.execute("Socket Window WordCount"); } --> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制: 1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源 2、提供了 setParallelism() 设置程序的并行度 3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行的一些行为配置管理。 还管理了 Configuration 管理一些其他的配置 4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些Transformation 按照逻辑拼接起来,就能得到 StreamGragh(Transformation ->StreamOperator -> StreamNode) 5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph --> env.socketTextStream -> addSource(){ /************************************************* * TODO * 注释: 获取输出数据类型 */ TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); // TODO 注释: 判断是否是并行 boolean isParallel = function instanceof ParallelSourceFunction; clean(function); /************************************************* * TODO * 注释: 构建 SourceOperator * 它是 SourceFunction 的子类,也是 StreamOperator 的子类 */ final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); /************************************************* * TODO * 注释: 返回 DataStreamSource * 关于这个东西的抽象有四种: * 1、DataStream * 2、KeyedDataStream * 3、DataStreamSource * 4、DataStreamSink */ return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName); } --> text.flatMap(讲算子生成 Transformation 加入到 Env 中的 transformations 集合中){ /************************************************* * TODO * 注释: 通过反射拿到 算子的类型 */ TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(); /************************************************* * TODO * 注释: 算子执行的真正操作逻辑是: 将算子构建成 Transformation 加入 Env 中的 transformation 中的 * transformations 集合中。将来执行 StreamGraph 生成的时候,会将 Transformation 变成 Operator * - * flatMap 到最后,还是构建一个 DataStream (SingleOutputStreamOperator)对象返回,然后将 Transformation 加入到 * transformations 集合中,等待将来提交的之后,构建成 StreamGraph */ return flatMap(flatMapper, outType); --> flatMap(){ /************************************************* * TODO * 注释: flink把每一个算子transform成一个对流的转换 * 并且注册到执行环境中,用于生成StreamGraph * - * 第一步:用户代码里定义的UDF会被当作其基类对待,然后交给 StreamFlatMap 这个 operator 做进一步包装。 * 事实上,每一个Transformation都对应了一个StreamOperator。 * - * flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程 * - * StreamFlatMap 是一个 Function 也是一个 StreamOperator * - * StreamFlatMap = StreamOperator * flatMapper = Function * -最终调用 transform 方法来把 StreamFlatMap 这种StreamOperator 转换成 Transformation * 最终加入到 StreamExectiionEnvironment 的 List<Transformation<?>> transformations */ return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper))); } ---> doTransform(){ // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); /************************************************* * TODO * 注释: 构建: OneInputTransformation * 由于 flatMap 这个操作只接受一个输入,所以再被进一步包装为 OneInputTransformation */ OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, operatorName, operatorFactory, outTypeInfo,environment.getParallelism()); /************************************************* * TODO * 注释: 构建: SingleOutputStreamOperator */ @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); /************************************************* * TODO 重点 * 注释: 把 Operator 注册到执行环境中,用于生成 StreamGraph * 最后,将该 transformation 注册到执行环境中,当执行 generate 方法时,生成 StreamGraph 图结构。 */ getExecutionEnvironment().addOperator(resultTransform); /************************************************* * TODO * 注释: * SingleOutputStreamOperator 也是 DataStream 的子类,也就是返回了一个新的 DataStream * 然后调用新的 DataStream 的某一个算子,又生成新的 StreamTransformation, * 继续加入到 StreamExecutionEnvironment 的 transformations */ return returnStream; } } -> env.execute(提交执行)
StreamGraph
env.execute() -> StreamGraph sg = getStreamGraph(jobName); -> getStreamGraphGenerator().setJobName(jobName).generate(){ /************************************************* * TODO * 注释: 执行各种算子的 transformation: 由 算子 生成 Transformation 来构建 StreamGraph * 当时在执行各种算子的时候,就已经把算子转换成对应的 Transformation 放入 transformations 集合中了 * 自底向上(先遍历 input transformations) 对转换树的每个 transformation 进行转换 */ for(Transformation<?> transformation : transformations) { // TODO 注释: 从 Env 对象中,把 Transformation 拿出来,然后转换成 StreamNode // TODO 注释: Function --> Operator --> Transformation --> StreamNode transform(transformation); } --> transformOneInputTransform(){ // 1. 生成 streamNode /************************************************* * TODO * 注释: 添加一个 Operator(实际上 StreamGraph 端会添加一个 StreamNode) */ streamGraph .addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperatorFactory(), transform.getInputType(),transform.getOutputType(), transform.getName()); } -->addOperator -> addNode(){ /************************************************* * TODO * 注释: 生成一个 StreamNode */ StreamNode vertex = new StreamNode(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(),vertexClass); /************************************************* * TODO * 注释: 添加一个 StreamNode * * 这个 vertexID 就是 Transformation id transform.getId() * Transformation id 在创建 Transformation 时生成 * 即一个Transformation 对应一个 StreamNode */ streamNodes.put(vertexID, vertex); } // 2. 生成 StreamEdge /************************************************* * TODO * 注释: 定义该 SreamNode 的 入边 */ for(Integer inputId : inputIds) { /************************************************* * TODO * 注释: 设置当前 StreamNode 和 上游所有 StreamNode 之间的 StreamEdge * inputId 上游 * transform.getId() 下游 */ streamGraph.addEdge(inputId, transform.getId(), 0); } -> addEdge -> addEdgeInternal(){ /************************************************* * TODO * 注释: 构建 StreamNode 之间的 边(StreamEdge) 对象 */ StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); // TODO 注释: 给 上游 StreamNode 设置 出边 getStreamNode(edge.getSourceId()).addOutEdge(edge); // TODO 注释: 给 下游 StreamNode 设置 入边 getStreamNode(edge.getTargetId()).addInEdge(edge); } } 总结: 1、生成上游顶点和下游顶点 StreamNode upstreamNode | StreamNode downstreamNode 2、根据上下游顶点生成 StreamEdge StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode...) 3、将成的StreamEdge 加入上游StreamNode 的 出边 getStreamNode(edge.getSourceId()).addOutEdge(edge); 为啥不直接用 upstreamNode.addOutEdge(edge); 4、将成的StreamEdge 加入下游StreamNode 的 入边 getStreamNode(edge.getTargetId()).addInEdge(edge);
JobGraph
execute(sg); -> executeAsync -> AbstractSessionClusterExecutor.execute(){ // 1. 将streamgraph优化得到jobgraph final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); // 2. 调用RestClient中的netty 客户端进行提交 到 服务端执行 // 通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理 clusterClient.submitJob(jobGraph) } // 1. 将streamgraph优化得到jobgraph -> PipelineExecutorUtils.getJobGraph(pipeline, configuration) -> FlinkPipelineTranslationUtil.getJobGraph -> pipelineTranslator.translateToJobGraph -> streamGraph.getJobGraph -> StreamingJobGraphGenerator.createJobGraph -> new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(){ /************************************************* * TODO * 注释: 重点: 设置 Chaining, 将可以 Chain 到一起的 StreamNode Chain 在一起, * 这里会生成相应的 JobVertex 、JobEdge 、 IntermediateDataSet 对象 * 把能 chain 在一起的 Operator 都合并了,变成了 OperatorChain * - * 大致逻辑: * 这里的逻辑大致可以理解为,挨个遍历节点: * 1、如果该节点是一个 chain 的头节点,就生成一个 JobVertex, * 2、如果不是头节点,就要把自身配置并入头节点,然后把头节点和自己的出边相连; * 对于不能chain的节点,当作只有头节点处理即可 * - * 作用: * 能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。 */ setChaining(hashes, legacyHashes); // TODO 注释: 设置 PhysicalEdges // TODO 注释: 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 // TODO 注释: 出边集合,已经在 上面的代码中,已经搞定了。 setPhysicalEdges(); // TODO 注释: 设置 SlotSharingAndCoLocation setSlotSharingAndCoLocation(); } --> setChaining(){ /************************************************* * TODO * 注释: * 1、一个 StreamNode 也可以认为是 做了 chain 动作 StreamNode -> JobVertex * 2、两个 StreamNode 做了 chain 动作 StreamNode + StreamNode -> JobVertex */ // TODO 注释: 处理每个 StreamNode for(Integer sourceNodeId : streamGraph.getSourceIDs()) { /************************************************* * TODO * 注释: 把能 chain 在一起的 StreamNode 都 chain 在一起 */ createChain(sourceNodeId, 0, new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph)); } } ---> createChain(){ /************************************************* * TODO -> * 注释: 判断是否可以 chain 在一起! * 当前这个地方做的事情,只是当前这个 StreamNode 和它的直接下游 StreamNode */ for(StreamEdge outEdge : currentNode.getOutEdges()) { /************************************************* * TODO 重点 1 isChainable * 注释: 判断一个 StreamGraph 中的一个 StreamEdge 链接的上下游 Operator(StreamNode) 是否可以 chain 在一起 * */ if(isChainable(outEdge, streamGraph)) { // TODO 注释: 加入可 chain 集合 chainableOutputs.add(outEdge); } else { // TODO 注释: 加入不可 chain 集合 nonChainableOutputs.add(outEdge); } } // TODO 注释: 把可以 chain 在一起的 StreamEdge 两边的 Operator chain 在一个形成一个 OperatorChain for(StreamEdge chainable : chainableOutputs) { // TODO 注释: 递归 chain // TODO 注释: 如果可以 chain 在一起的话,这里的 chainIndex 会加 1 transitiveOutEdges.addAll(createChain(chainable.getTargetId(), chainIndex + 1, chainInfo)); } // 不能chain在一起的 for(StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); // TODO 注释: 不能 chain 一起的话,这里的 chainIndex 是从 0 开始算的,后面也肯定会走到 createJobVertex 的逻辑 createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId())); } /************************************************* * TODO -> 重点 2 createJobVertex * 注释: 把chain在一起的多个 Operator 创建成一个 JobVertex * 如果当前节点是 chain 的起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig * createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig * * --总结: * StreamGraph -> JobGrahph * 判断哪些StreamEge可以执行优化(chain),将 多个StreanNode 并成一个 JobVertex * * StreamNode_A -> (StreamNode_B -> StreamNode_C) * B,C chain在一起, startNodeId = B // Integer startNodeId = chainInfo.getStartNodeId(); * 当 currentNodeId = B 则 B 创建 JobVertex * 当 currentNodeId = C 则 C 不创建 JobVertex */ StreamConfig config = currentNodeId.equals(startNodeId) ? // TODO -> createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration()); // TODO 注释: chain 在一起的多条边 connect 在一起 for(StreamEdge edge : transitiveOutEdges) { /** * 重点 3 根据 StreamNode和 StreamEdge 生成 JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连 * TODO -> */ connect(startNodeId, edge); } } // 重点 1 isChainable isChainable(){ // TODO 注释: 获取上游 SourceVertex StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); // TODO 注释: 获取下游 TargetVertex StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); /************************************************* * TODO * 注释: 判断是否能 chain 在一起 */ // TODO 条件1. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) A -> B B A 一一对应 如果shuffle类,那么B的入度就 >= 2 return downStreamVertex.getInEdges().size() == 1 // TODO 注释: 条件2. 上下游算子实例处于同一个SlotSharingGroup中 && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // TODO -> 注释: 这里面有 3 个条件 条件 345 && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph){ // TODO 注释: 获取 上游 Operator StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory(); // TODO 注释: 获取 下游 Operator StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory(); // TODO 注释:条件3、前后算子不为空 如果上下游有一个为空,则不能进行 chain if(downStreamOperator == null || upStreamOperator == null) { return false; } /************************************************* * 条件4、上游算子的链接策略是 always 或者 head * 条件5、下游算子的链接策略必须是 always */ if(upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER || downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) { return false; } } // TODO 注释:条件6 两个算子间的物理分区逻辑是ForwardPartitioner // (无shuffle,当前节点的计算数据,只会发给自己 one to one 如上游50个task 计算完直接发送给下游50个task) && (edge.getPartitioner() instanceof ForwardPartitioner) // TODO 注释:条件7 两个算子间的shuffle方式不等于批处理模式 && edge.getShuffleMode() != ShuffleMode.BATCH // TODO 注释:条件8 上下游算子实例的并行度相同 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // TODO 注释:条件9 启动了 chain && streamGraph.isChainingEnabled(); } // 重点 2 createJobVertex createJobVertex(){ // TODO 注释: 获取 startStreamNode StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); // TODO 注释: 生成一个 JobVertexID JobVertexID jobVertexId = new JobVertexID(hash); // JobVertex 初始化 if(chainedInputOutputFormats.containsKey(streamNodeId)) { jobVertex = new InputOutputFormatVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs); chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration())); } else { // TODO 注释: 创建一个 JobVertex jobVertex = new JobVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs); } // 将生成好的 JobVertex 加入到: JobGraph jobGraph.addVertex(jobVertex); } // 重点 3 根据 StreamNode和 StreamEdge 生成 JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连 connect(startNodeId, edge){ //生成JobEdge JobEdge jobEdge; if(isPointwisePartitioner(partitioner)) { jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType); } else { // TODO -> 创建 IntermediateDataSet jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType); } } ----> connectNewDataSetAsInput(){ // TODO -> input是JobVertex 即 JobVertex 创建 IntermediateDataSet IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); // TODO 创建 JobEdge JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); // TODO IntermediateDataSet -> JobEdge dataSet.addConsumer(edge); return edge; // 至此形成流图 JobVertex -> IntermediateDataSet -> JobEdge } ================================================================================================================================== // 2. 调用RestClient中的netty 客户端进行提交 到 服务端执行 -> clusterClient.submitJob(jobGraph){ /************************************************* * TODO * 注释: 先持久化: 把 JobGragh 持久化到磁盘文件形成 jobGraphFile * 1、持久化 JobGragh 的前缀:flink-jobgraph * 2、持久化 JobGragh 的后缀:.bin * 当我们把 JobGraph 持久化了之后,变成了一个文件: jobGraphFile * 然后其实,在提交 JobGraph 到 Flink 集群运行的时候,其实提交的就是: 这个文件! * 将来,最终是有 FLink 集群的 WebMonitor(JobSubmitHandler) 去接收请求来执行处理 * JobSubmitHandler 在执行处理的第一件事情: 把传送过来的这个文件反序列化得到 JobGraph 这个对象 */ CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { try { final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); try(ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { objectOut.writeObject(jobGraph); /************************************************* * TODO * 注释: 等待持久化完成之后,然后加入带上传文件系列 * 补充: thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的 Future 对象。 */ CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { List<String> jarFileNames = new ArrayList<>(8); List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); Collection<FileUpload> filesToUpload = new ArrayList<>(8); // TODO 注释: 加入待上传的文件系列 filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); for(Path jar : jobGraph.getUserJars()) { jarFileNames.add(jar.getName()); // 上传 filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); } // -> TODO -> 注释:sendRetriableRequest() 提交 真正提交 requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), requestAndFileUploads.f0, requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable())); // TODO 注释: 等 sendRetriableRequest 提交完成之后,删除生成的 jobGraghFile Files.delete(jobGraphFile); } --> sendRetriableRequest -> restClient.sendRequest(){ /************************************************* * TODO * 注释: 通过 Netty 客户端发送请求给 Netty 服务端 */ final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort); /************************************************* * TODO -> * 注释: 发送请求 到 WebMonitorEndpoint 的 Netty 服务端 * 最终由: JobSubmitHandler 来执行处理 */ httpRequest.writeTo(channel); }