在 Flink 的流式操作中, 会涉及不同的时间概念
处理时间是指的执行操作的各个设备的时间。
对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。比如, 一个长度为 1 个小时的窗口将会包含设备时钟表示的 1 个小时内所有的数据. 假设应用程序在 9:15am 分启动, 第 1 个小时窗口将会包含 9:00am 到 10:00am 所有的数据, 然后下个窗口是 10:00am-11:00am, 等等。
处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟. 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序。
在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。
这篇文章之前所使用的全是处理时间
事件时间是指的这个事件发生的时间.
在 event 进入 Flink 之前, 通常被嵌入到了 event 中, 一般作为这个 event 的时间戳存在.
在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关。事件时间程序必须制定如何产生 Event Time Watermarks。在事件时间体系中, Watermarks是表示时间进度的标志(作用就相当于现实时间的时钟)。
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。 由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候, 如果使用事件时间, 就指定时间分配器为事件时间分配器。
在 1.12 之前默认的时间语义是处理时间, 从 1.12 开始, Flink 内部已经把默认的语义改成了事件时间
考虑这样情况,结合上一篇开窗案例,假设滚动窗口大小 5min,存在一个 9:00-9:05,现在传感器在这个时间段每分钟生成了 5 条数据即 9:00、9:01、9:02、9:03、9:04,但因为网络原因数据在 9:06 才到达 flink 参与计算,但此时 9:00-9:05 窗口已经关闭这批数据就会被丢失。
事件时间可以不依赖处理时间来表示时间的进度。例如,在程序中即使处理时间和事件时间有相同的速度, 事件时间可能会轻微的落后处理时间。另外一方面,使用事件时间可以在几秒内处理已经缓存在 Kafka 中多周的数据, 这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口。因此在 Flink 中测量事件时间的进度机制就是 WaterMark,同时 WaterMark 作为内嵌在数据流中的特殊数据,并且携带一个时间戳,在开窗中使用作为窗口关闭的衡量标准。具备单调递增的特性。如 9:00-9:05 的窗口,只要 WaterMark 携带的时间没有到 9:05 窗口就不会被关闭。一个 Watermark(t)表示在这个流里面事件时间已经到了时间 t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳 t2<= t (时间比较旧或者等于时间戳)
在下面的这个图中, 事件是有序的(按照他们自己的时间戳来看), WaterMark 是流中一个简单的周期性的标记,通常情况下, WaterMark是一种标记, 是流中的一个点, 所有在这个时间戳(WaterMark中的时间戳)前的数据应该已经全部到达. 一旦 WaterMark 到达了算子, 则这个算子会提高他内部的时钟的值为这个 WaterMark 的值。此时可以解决因延迟迟到的数据。
W(6) 表示事件时间已经到 6s,但实际上内部的最大时间戳是需要减 1ms,类比窗口的结束时间,防止存在两条事件时间相同的数据
若数据不仅有延迟,而且各个数据源延迟不一样,这就导致数据延迟且无序,那么上述的 WaterMark 取法就存在问题,此时需要给予 WaterMark 一个合理的延迟,这个延迟取决去系统最大的乱序程度可以再略高一点
其相当于每个窗口往后延迟一定时间再关闭,数据还是根据自身所携带的时间进入对应的窗口,也就是同一时刻会存在若干个窗口当延迟后的 WaterMark 到达窗口的关闭时间,窗口关闭输出数据。
在 Flink 中,WaterMark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解,如果 WaterMark设置的延迟太久,收到结果的速度可能就会很慢,而如果WaterMark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。
如何在代码中生成 WaterMark 呢?Flink 内置两个 WaterMark 生成器,forMonotonousTimestamps 和 BoundedOutOfOrdernessWatermarks 分别对应有序和无序流,但从本质上看 forMonotonousTimestamps 是延迟为 0 的 BoundedOutOfOrdernessWatermarks
代码如下:
package day06; import bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class Flink01_WaterMark_EventTime { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度,不设置涉及 watermark 传递 env.setParallelism(1); // 读取数据,转换成 POJO SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] els = value.split(" "); return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])); } }).returns(Types.POJO(WaterSensor.class)); // 提取事件时间 WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }); waterSensorDS // 注册事件时间 .assignTimestampsAndWatermarks(wm) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("vc") .print(); env.execute(); } }
结果如下:
只要输入的时间戳小于 1577844005 第一个窗口就不会关闭,此时已经和处理时间没有关系了。
WaterMark 延迟 3s 解决乱序问题,代码如下:
package day06; import bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; public class Flink02_WaterMark_OutOfOrder { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度,不设置涉及 watermark 传递 env.setParallelism(1); // 读取数据,转换成 POJO SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] els = value.split(" "); return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])); } }).returns(Types.POJO(WaterSensor.class)); // 提取事件时间,设置 WaterMark 延迟 3s WatermarkStrategy<WaterSensor> wm = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }); waterSensorDS // 注册事件时间 .assignTimestampsAndWatermarks(wm) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("vc") .print(); env.execute(); } }
结果如下:
可以看出即使 1577844001、1577844002 迟到也是可以被统计到的,若是这两条数据在 1577844008 之后到达即超过了延迟时间后才到,那此种方式这两条数据就被丢了。
注意到上述的代码都是设置并行度为 1,若注释掉并行度或者并行度设置为 2,这时候就会发现即使数据的时间已经到了 1577844008 第一个窗口都不会被关闭,这是因为:多并行度下 WaterMark 向下游传递采用的是轮播,下游取最小的 WaterMark。假设并行度为 2,如图:
此时 WaterMark 到达 8,因为轮播到下游,且下游取最小的导致 keyBy 算子 WaterMark 依然是 1,除非再来一条让 WaterMark 到达 8 的数据,才会让下游的 WaterMark 发生改变。如图:
总结 WaterMark 规律如下:
那么上述问题如何解决呢,设置并行度为 1 肯定不显示,那么可以将生成 WaterMark 的时机提前,在 Source 后直接提取生成即可,代码如下:
package day06; import bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; public class Flink03_WaterMark_OutOfOrder2 { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 提取事件时间 WatermarkStrategy<String> wm = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(" ")[1]) * 1000; } }); // 读取数据,转换成 POJO env.socketTextStream("localhost", 1111) // 注册事件时间 .assignTimestampsAndWatermarks(wm) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] els = value.split(" "); return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])); } }).returns(Types.POJO(WaterSensor.class)) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("vc") .print(); env.execute(); } }
再次测试:
Flink 提供 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性)。都需要继承 接口: WatermarkGenerator,实际生产中都是 periodic,periodic 默认 200ms 生成一次 WaterMark,punctuated 每条数据生成一次 WaterMark;
优缺点比较:
package day06; import bean.WaterSensor; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.time.Duration; public class Flink05_WaterMark_Periodic { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 配置定时生成时间 env.getConfig().setAutoWatermarkInterval(200); // 设置并行度,不设置涉及 watermark 传递 env.setParallelism(1); // 读取数据,转换成 POJO SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("localhost", 1111) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] els = value.split(" "); return new WaterSensor(els[0], Long.parseLong(els[1]), Double.parseDouble(els[2])); } }).returns(Types.POJO(WaterSensor.class)); // 提取事件时间 WatermarkStrategy<WaterSensor> wm = new WatermarkStrategy<WaterSensor>() { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new MyPeriodic(2000); } }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000; } }); SingleOutputStreamOperator<WaterSensor> result = waterSensorDS // 注册事件时间 .assignTimestampsAndWatermarks(wm) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 允许数据迟到 2s .allowedLateness(Time.seconds(2)) // 3s + 2s 后的数据放入测输出流 .sideOutputLateData(new OutputTag<WaterSensor>("Side") { }) .sum("vc"); result.print("master"); result.getSideOutput(new OutputTag<WaterSensor>("Side") { }).print("side"); env.execute(); } private static class MyPeriodic implements WatermarkGenerator<WaterSensor> { private long maxTs; // 允许延迟的最大时间 private final long maxDelay; MyPeriodic(long maxDelay) { this.maxDelay = maxDelay; // 因为周期生成,需要在赋值加 maxDelay,否则调动 onPeriodicEmit 直接变成最大值 maxTs = Long.MIN_VALUE + maxDelay - 1; } // 每条数据调用一次 @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { // 保证 WaterMark 递增 maxTs = Math.max(maxTs, eventTimestamp); } // 周期性调用(默认200ms) @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTs - maxDelay - 1)); } } }
不要问我为什么写的这么漂亮,因为 BoundedOutOfOrdernessWatermarks 就是这么实现的