Java教程

FLINK基础(115): DS算子与窗口(24)窗口 (9) Allowed Lateness

本文主要是介绍FLINK基础(115): DS算子与窗口(24)窗口 (9) Allowed Lateness,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1 使用迟到元素更新窗口计算结果(Updating Results by Including Late Events)

由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

如果我们要求一个operator支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到侧输出流。

window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。

当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

package com.atguigu

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AllowedLateTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env.socketTextStream("Linux1", 9999, '\n')
    val s = stream
      .map(line => {
        val arr = line.split(" ")
        (arr(0), arr(1).toLong * 1000)
      })
//      .assignAscendingTimestamps(_._2)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        override def extractTimestamp(element: (String, Long)): Long = element._2
      })
      .keyBy(_._1)
      // [0,5),...
      .timeWindow(Time.seconds(5))
      // 水位线超过 窗口结束时间 窗口闭合计算,但不销毁
      // 水位线超过 窗口结束时间 + allowed lateness,窗口更新结果并销毁
      .allowedLateness(Time.seconds(5))
      .process(new MyAllowedLateProcess)
    s.print()
    env.execute()
  }
  class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long),
    String, String,TimeWindow] {
    override def process(key: String,
                         context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[String]): Unit = {
      lazy val isUpdate = getRuntimeContext.getState(
        new ValueStateDescriptor[Boolean]("update", Types.of[Boolean])
      )
      if (!isUpdate.value()) {
        out.collect("在水位线超过窗口结束时间的时候,窗口第一次闭合计算")
        isUpdate.update(true)
      } else {
        out.collect("迟到元素来了以后,更新窗口闭合计算的结果")
      }
    }
  }
}

2 将迟到数据发送到另外一个流

如果想对这些迟到数据处理,我们可以使用Flink的侧输出(Side Output)功能,将迟到数据发到某个特定的流上。后续我们可以根据业务逻辑的要求,对迟到的数据流进行处理。

样例:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
// 数据流有三个字段:(key, 时间戳, 数值)
val input: DataStream[(String, Long, Int)] = ...

val mainStream = input.keyBy(item => item._1)
        .timeWindow(Time.seconds(5))
        // 将输出写到late-elements里
        .sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements"))
        .aggregate(new CountAggregate)

// 接受late-elements,形成一个数据流
val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements"))

上面的代码将迟到的内容写进名为“late-elements”的OutputTag下,之后使用getSideOutput获取这些迟到的数据。

参考:

https://zhuanlan.zhihu.com/p/103519313

https://blog.csdn.net/sghuu/article/details/103704415

 

这篇关于FLINK基础(115): DS算子与窗口(24)窗口 (9) Allowed Lateness的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!