由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。
如果我们要求一个operator支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到侧输出流。
window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。
当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。
package com.atguigu import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object AllowedLateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("Linux1", 9999, '\n') val s = stream .map(line => { val arr = line.split(" ") (arr(0), arr(1).toLong * 1000) }) // .assignAscendingTimestamps(_._2) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) { override def extractTimestamp(element: (String, Long)): Long = element._2 }) .keyBy(_._1) // [0,5),... .timeWindow(Time.seconds(5)) // 水位线超过 窗口结束时间 窗口闭合计算,但不销毁 // 水位线超过 窗口结束时间 + allowed lateness,窗口更新结果并销毁 .allowedLateness(Time.seconds(5)) .process(new MyAllowedLateProcess) s.print() env.execute() } class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long), String, String,TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = { lazy val isUpdate = getRuntimeContext.getState( new ValueStateDescriptor[Boolean]("update", Types.of[Boolean]) ) if (!isUpdate.value()) { out.collect("在水位线超过窗口结束时间的时候,窗口第一次闭合计算") isUpdate.update(true) } else { out.collect("迟到元素来了以后,更新窗口闭合计算的结果") } } } }
如果想对这些迟到数据处理,我们可以使用Flink的侧输出(Side Output)功能,将迟到数据发到某个特定的流上。后续我们可以根据业务逻辑的要求,对迟到的数据流进行处理。
样例:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
// 数据流有三个字段:(key, 时间戳, 数值) val input: DataStream[(String, Long, Int)] = ... val mainStream = input.keyBy(item => item._1) .timeWindow(Time.seconds(5)) // 将输出写到late-elements里 .sideOutputLateData(new OutputTag[(String, Long, Int)]("late-elements")) .aggregate(new CountAggregate) // 接受late-elements,形成一个数据流 val lateStream: DataStream[(String, Long, Int)] = mainStream.getSideOutput(new OutputTag[(String, Long, Int)]("late-elements"))
上面的代码将迟到的内容写进名为“late-elements”的OutputTag
下,之后使用getSideOutput
获取这些迟到的数据。
参考:
https://zhuanlan.zhihu.com/p/103519313
https://blog.csdn.net/sghuu/article/details/103704415