转载:Flink 源码之时间处理
使用Environment的setStreamTimeCharacteristic
方法指定系统使用的时间类型。方法参数为TimeCharacteristic
。
TimeCharacteristic
为枚举类型,定义如下。
@PublicEvolving public enum TimeCharacteristic { ProcessingTime, IngestionTime, EventTime }
和之前所说的时间类型一一对应。
StreamExecutionEnvironment
的setStreamTimeCharacteristic
方法源码如下:
@PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } }
这里我们发现如果系统TimeCharacteristic
为EventTime
或者IngestionTime,会设置一个默认的自动watermark间隔时间(auto watermark interval)。这个参数是用来对齐集群中所有机器的watermark的。所有发送到下游的watermark一定是auto watermark interval的整数倍(通过源码分析发现该配置仅对IngestionTime生效)。具体逻辑在下文StreamSourceContexts部分分析。
StreamSourceContexts
类负责根据系统的TimeCharacteristic
来决定生成哪种类型的SourceContext。SourceContext在SourceFunction使用(参见 Flink 使用之数据源),不同的SourceContext对数据timestamp处理的行为不同。
SourceFunction
中使用的SourceContext由getSourceContext方法决定。
getSourceContext方法的调用链如下所示:
LegacySourceFunctionThread.run
: headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
在这一行代码中传入了StreamStatusMaintainer
。可以追溯到StreamTask的getStreamStatusMaintainer
方法,返回的是一个OperatorChain。StreamSource.run: this.ctx = StreamSourceContexts.getSourceContext
getSourceContext方法的源码如下:
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> output, long watermarkInterval, long idleTimeout) { final SourceFunction.SourceContext<OUT> ctx; switch (timeCharacteristic) { case EventTime: ctx = new ManualWatermarkContext<>( output, processingTimeService, checkpointLock, streamStatusMaintainer, idleTimeout); break; case IngestionTime: ctx = new AutomaticWatermarkContext<>( output, watermarkInterval, processingTimeService, checkpointLock, streamStatusMaintainer, idleTimeout); break; case ProcessingTime: ctx = new NonTimestampContext<>(checkpointLock, output); break; default: throw new IllegalArgumentException(String.valueOf(timeCharacteristic)); } return ctx; }
从源码可以看出,SourceContext有三种:
ManualWatermarkContext
NonTimestampContext
AutomaticWatermarkContext
其中ManualWatermarkContext
和AutomaticWatermarkContext
具有相同的父类WatermarkContext。
下面逐个分析WatermarkContext的方法。
@Override public void collect(T element) { // 防止和checkpoint操作同时进行 synchronized (checkpointLock) { // 改变stream的状态为ACTIVE状态 streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { // failOnNextCheck:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。 this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } processAndCollect(element); } }
WatermarkContext的streamStatusMaintainer
只有一个实现类OperatorChain
。该变量由StreamTask
的operatorChain
传入。
nextCheck
为ScheduledFuture
类型。
failOnNextCheck
:如果下一个空闲检查已被安排,需要设置为true。当元素被collect之后,需要设置该变量为false。
如果没有安排下一次空闲检查,需要调用scheduleNextIdleDetectionTask
。代码稍后分析。
最后调用processAndCollect
方法,包含具体的处理和收集数据的逻辑。该方法为抽象方法,稍后分析。
scheduleNextIdleDetectionTask
代码如下:
private void scheduleNextIdleDetectionTask() { if (idleTimeout != -1) { // reset flag; if it remains true when task fires, we have detected idleness failOnNextCheck = true; // 安排一个空闲检测任务。该任务在idleTimeout之后执行 // getCurrentProcessingTime()返回的是系统当前时间 nextCheck = this.timeService.registerTimer( this.timeService.getCurrentProcessingTime() + idleTimeout, new IdlenessDetectionTask()); } }
IdlenessDetectionTask
的源码如下:
private class IdlenessDetectionTask implements ProcessingTimeCallback { @Override public void onProcessingTime(long timestamp) throws Exception { synchronized (checkpointLock) { // set this to null now; // the next idleness detection will be scheduled again // depending on the below failOnNextCheck condition // 设置nextCheck为null // 这样下次调用collect方法的时候会再次安排一个空闲检测任务 nextCheck = null; if (failOnNextCheck) { // 标记数据源为空闲 markAsTemporarilyIdle(); } else { // 再次安排一个空闲检测任务 scheduleNextIdleDetectionTask(); } } } }
markAsTemporarilyIdle
方法:
@Override public void markAsTemporarilyIdle() { synchronized (checkpointLock) { // 设置operatorChain的状态为空闲 streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE); } }
经过以上分析我们不难发现collect方法具有自动空闲检测的功能
。数据被收集的时候会设置stream为active状态
,并设置一个空闲检查任务
。该任务会在idleTimeout
时间之后触发。如果在此期间内,仍没有数据被数据源采集,该数据源会被标记为空闲
。如果期间内有数据到来,failOnNextCheck会被设置为false。此时空闲检测任务执行之后便不会标记数据源为空闲状态,取而代之的是再次安排一个空闲检测任务。
collectWithTimestamp
方法在收集元素的同时,为元素绑定时间戳。代码如下:
@Override public void collectWithTimestamp(T element, long timestamp) { synchronized (checkpointLock) { streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } processAndCollectWithTimestamp(element, timestamp); } }
这段方法和collect方法的逻辑完全一致。同样具有定期检测数据源是否闲置的功能。在方法最后调用了子类的processAndCollectWithTimestamp
方法。
emitWatermark方法用于向下游发送watermark。代码如下:
@Override public void emitWatermark(Watermark mark) { // 此处多了一个判断,在允许使用watermark的情形下才会调用 if (allowWatermark(mark)) { synchronized (checkpointLock) { streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } processAndEmitWatermark(mark); } } }
此方法的逻辑和collect方法逻辑基本一致,不再赘述。
close方法用于关闭SourceContext,该方法会取消下一次空闲检测任务。代码如下:
@Override public void close() { cancelNextIdleDetectionTask(); }
EventTime时间类型使用的是ManualWatermarkContext。ManualWatermarkContext相比父类多了两个成员变量:
ManualWatermarkContext实现父类的方法如下:
@Override protected void processAndCollect(T element) { output.collect(reuse.replace(element)); } @Override protected void processAndCollectWithTimestamp(T element, long timestamp) { output.collect(reuse.replace(element, timestamp)); } @Override protected void processAndEmitWatermark(Watermark mark) { output.emitWatermark(mark); } @Override protected boolean allowWatermark(Watermark mark) { // 永远允许发送watermark,所以返回true return true; }
IngestionTime时间类型使用的是AutomaticWatermarkContext。
此类的构造方法如下:
private AutomaticWatermarkContext( final Output<StreamRecord<T>> output, final long watermarkInterval, final ProcessingTimeService timeService, final Object checkpointLock, final StreamStatusMaintainer streamStatusMaintainer, final long idleTimeout) { super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); this.output = Preconditions.checkNotNull(output, "The output cannot be null."); Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms."); // 通过 auto watermark interval配置 this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<>(null); this.lastRecordTime = Long.MIN_VALUE; // 获取系统当前时间 long now = this.timeService.getCurrentProcessingTime(); // 设置一个watermark发送定时器,在watermarkInterval时间之后触发 this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, checkpointLock, output)); }
WatermarkEmittingTask主要代码逻辑如下:
@Override public void onProcessingTime(long timestamp) { // 获取系统当前时间 final long currentTime = timeService.getCurrentProcessingTime(); // 加锁,不能和checkpoint操作同时运行 synchronized (lock) { // we should continue to automatically emit watermarks if we are active // 需要OperatorChain的状态为ACTIVE if (streamStatusMaintainer.getStreamStatus().isActive()) { // idleTimeout 不等于-1意味着设置了数据源的空闲超时时间 // 发送watermark的时候也检查数据源空闲时间 if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) { // if we are configured to detect idleness, piggy-back the idle detection check on the // watermark interval, so that we may possibly discover idle sources faster before waiting // for the next idle check to fire markAsTemporarilyIdle(); // no need to finish the next check, as we are now idle. cancelNextIdleDetectionTask(); } else if (currentTime > nextWatermarkTime) { // align the watermarks across all machines. this will ensure that we // don't have watermarks that creep along at different intervals because // the machine clocks are out of sync // 取watermarkTime 为最接近currentTime 的watermarkInterval整数倍 // 这称为watermark对齐操作,因为集群机器的时间是不同步的 final long watermarkTime = currentTime - (currentTime % watermarkInterval); // 发送watermark output.emitWatermark(new Watermark(watermarkTime)); // 设置下次发送的watermark的时间,注意和下次执行发送watermark任务的时间不同 nextWatermarkTime = watermarkTime + watermarkInterval; } } } // 再次安排一个watermark发送任务 long nextWatermark = currentTime + watermarkInterval; nextWatermarkTimer = this.timeService.registerTimer( nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output)); }
通过以上分析我们不难发现AutomaticWatermarkContext是自动定时发送watermark到下游的。发送的间隔为watermarkInterval。
processAndCollect方法和逻辑如下所示:
@Override protected void processAndCollect(T element) { lastRecordTime = this.timeService.getCurrentProcessingTime(); output.collect(reuse.replace(element, lastRecordTime)); // this is to avoid lock contention in the lockingObject by // sending the watermark before the firing of the watermark // emission task. // lastRecordTime如果大于nextWatermarkTime需要立即发送一次watermark // nextWatermarkTime为下次要发送的watermark的时间,和下次执行发送watermark任务的时间不同 // 发送的watermark的时间一定比执行发送watermark任务的时间早 // 如果没有此判断,到下次发送watermark任务执行之后,发送的watermark时间会早于这条数据的时间,下游不会及时处理这条数据。 if (lastRecordTime > nextWatermarkTime) { // in case we jumped some watermarks, recompute the next watermark time final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval); // nextWatermarkTime比lastRecordTime大 // 因此下游会立即开始处理这条数据 nextWatermarkTime = watermarkTime + watermarkInterval; output.emitWatermark(new Watermark(watermarkTime)); // we do not need to register another timer here // because the emitting task will do so. } }
processAndCollectWithTimestamp
方法如下所示。第二个参数timestamp被忽略。IngestionTime使用系统时间作为元素绑定时间。
@Override protected void processAndCollectWithTimestamp(T element, long timestamp) { processAndCollect(element); }
最后我们分析下allowWatermark
和processAndEmitWatermark
方法。AutomaticWatermarkContext
不允许我们显式要求发送watermark
。只能通过定时任务发送。只有当waterMark
时间为Long.MAX_VALUE
并且nextWatermarkTime
不为Long.MAX_VALUE
才可以发送。发送过这个特殊的watermark之后,关闭定时发送watermark的任务
。代码如下所示:
@Override protected boolean allowWatermark(Watermark mark) { // allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE; } /** This will only be called if allowWatermark returned {@code true}. */ @Override protected void processAndEmitWatermark(Watermark mark) { nextWatermarkTime = Long.MAX_VALUE; output.emitWatermark(mark); // we can shutdown the watermark timer now, no watermarks will be needed any more. // Note that this procedure actually doesn't need to be synchronized with the lock, // but since it's only a one-time thing, doesn't hurt either final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; if (nextWatermarkTimer != null) { nextWatermarkTimer.cancel(true); } }
这个类比较简单,不处理任何和timestamp相关的逻辑。也不会发送任何watermark。在此不做过多的分析。
InternalTimeServiceImpl.registerProcessingTimeTimer SystemProcessingTimeService.registerTimer SystemProcessingTimeService.wrapOnTimerCallback ScheduledTask.run TimerInvocationContext.invoke InternalTimeServiceImpl.onProcessingTime(): triggerTarget.onProcessingTime(timer);
registerProcessingTimeTimer方法注册一个ProcessingTime定时器:
@Override // 该方法主要在windowOperator和SimpleTimerService中调用 // 在windowOperator调用,namespace传入当前window // 在SimpleTimerService调用,namespace传入VoidNamespace.INSTANCE public void registerProcessingTimeTimer(N namespace, long time) { // 这是一个PriorityQueue。获取timestamp最小的timer InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); // 如果新加入的timer的timestamp是最小的,方法返回true if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier // 如果新加入的timer的timetstamp在队列中最小(最先执行) // 需要取消掉原有的timer // 再重新注册timer,timestamp为新加入timer的timetstamp if (time < nextTriggerTime) { if (nextTimer != null) { nextTimer.cancel(false); } nextTimer = processingTimeService.registerTimer(time, this); } } }
InternalTimeServiceImpl
维护了一个processingTimeTimersQueue
变量。该变量是一个有序的队列,存储了一系列定时器对象。
InternalTimeServiceManager
在获取InternalTimeServiceImpl
会调用它的startTimerService
方法。该方法会把第一个(时间最早的timer)注册到一个ScheduledThreadPoolExecutor
上。因此第一个timer到时间的时候会调用InternalTimeServiceImpl
的onProcessingTime
方法。
InternalTimeServiceImpl的onProcessingTime
方法代码如下:
@Override public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; InternalTimer<K, N> timer; // 一直循环获取时间小于参数time的所有定时器,并运行triggerTarget的onProcessingTime方法 // 例如WindowOperator中的internalTimerService,triggerTarget就是WindowOperator自身 while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onProcessingTime(timer); } // 执行到这一步的时候timer的timetamp刚好大于参数time // 此时在安排下一个定时器 if (timer != null && nextTimer == null) { nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); } }
由以上分析可知processingTimeTimersQueue的timer中,始终会有一个timestamp最小的timer被注册为定时任务。每次触发定时器总会有一个timestamp
刚好大于该定时器timestamp的定时器(来自processingTimeTimersQueue
)被安排定时执行。
上部分 InternalTimeServiceImpl.registerProcessingTimeTimer会调用
SystemProcessingTimeService.registerTimer方法。其源代码如下:
@Override public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) { // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark // T says we won't see elements in the future with a timestamp smaller or equal to T. // With processing time, we therefore need to delay firing the timer by one ms. // 此处计算delay的值 // 依照英文注释所言,这里额外延迟1ms触发是要和watermark的语义一致 long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { // 这里schedule一个timer // wrapOnTimerCallback返回一个ScheduledTask对象 // ScheduledTask对象封装了定时timestamp和定时执行的任务逻辑 return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(delay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; } } }
一个Operator持有一个InternalTimeServiceImpl实例。调用链如下:
registerOrGetTimerService
各个Task接收watermark到响应watermark事件的调用链如下:
StreamTaskNetworkInput.processElement StatusWatermarkValve.inputWatermark StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels OneInputStreamTask.emitWatermark AbstractStreamOperator.processWatermark InternalTimeServiceManager.advanceWatermark InternalTimeServiceImpl.advanceWatermark: triggerTarget.onEventTime(timer);
以windowOperator为例。如果系统的TimeCharacteristic
设置的是EventTime,每次元素到来之后都会注册一个EventTime定时器,时间为window结束时间。
@Override public void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); }
注册一个EventTime定时器就是在eventTimeTimersQueue
中添加一个timer。eventTimeTimersQueue
和processingTimeTimersQueue
结构完全一样。只不过是用于专门存放EventTime的定时器。下面的问题就是什么时候Flink会使用这些timer触发计算呢?
这个方法在接收到watermark的时候调用。主要逻辑为从eventTimeTimersQueue中依次取出触发时间小于参数time的所有定时器,调用triggerTarget.onEventTime方法。triggerTarget.onEventTime含有operator基于eventTime计算的具体逻辑。
advanceWatermark方法代码如下:
public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
上面的方法在InternalTimeServiceManager中调用。InternalTimeServiceManager的advanceWatermark方法循环调用内部所有InternalTimerService的advanceWatermark方法。
public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } }
该方法的调用在AbstractStreamOperator的processWatermark中,代码如下:
public void processWatermark(Watermark mark) throws Exception { if (timeServiceManager != null) { timeServiceManager.advanceWatermark(mark); } // 向下游继续发送watermark output.emitWatermark(mark); }
按照调用链,我们继续跟踪到OneInputStreamTask的emitWatermark方法:
@Override public void emitWatermark(Watermark watermark) throws Exception { synchronized (lock) { watermarkGauge.setCurrentWatermark(watermark.getTimestamp()); operator.processWatermark(watermark); } }
接下来是StatusWatermarkValve的findAndOutputNewMinWatermarkAcrossAlignedChannels方法:
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all channels for (InputChannelStatus channelStatus : channelStatuses) { // 阅读inputStreamStatus方法可知input channel变为空闲状态的时候watermark对齐状态为false // 获取所有对齐状态channel的watermark最小值 if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark // 发送watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); } }
接下来分析inputWatermark方法:
public void inputWatermark(Watermark watermark, int channelIndex) throws Exception { // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle). if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) { long watermarkMillis = watermark.getTimestamp(); // if the input watermark's value is less than the last received watermark for its input channel, ignore it also. if (watermarkMillis > channelStatuses[channelIndex].watermark) { // 更新channel的watermark channelStatuses[channelIndex].watermark = watermarkMillis; // previously unaligned input channels are now aligned if its watermark has caught up // 设置channel的watermark对齐状态为true // 该channel之前是空闲状态,且watermark已被更新,因此这里设置其对齐状态为true if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) { channelStatuses[channelIndex].isWatermarkAligned = true; } // now, attempt to find a new min watermark across all aligned channels // 调用上个代码片段的方法 findAndOutputNewMinWatermarkAcrossAlignedChannels(); } } }
最后我们跟踪到调用inputWatermark方法的位置在StreamTaskNetworkInput的processElement方法:
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception { if (recordOrMark.isRecord()){ output.emitRecord(recordOrMark.asRecord()); } else if (recordOrMark.isWatermark()) { statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel); } else if (recordOrMark.isLatencyMarker()) { output.emitLatencyMarker(recordOrMark.asLatencyMarker()); } else if (recordOrMark.isStreamStatus()) { statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } }
很明显,该方法判断接收到元素的类型调用对应的处理逻辑。再向上跟踪就是Task之间传递数据的逻辑,会在后续博客中分析。
经过上面的分析我们已经了解了operator是怎样的传递和响应接收到的watermark的。接下来还有一个地方需要研究,那就是watermark是怎样的产生的。
watermark可以在两个地方产生:
assignTimestampsAndWatermarks
有两个版本,一个接收AssignerWithPeriodicWatermarks
另一个是AssignerWithPunctuatedWatermarks
。我们先看源代码,稍后分析他们的不同之处。
AssignerWithPeriodicWatermarks版本的代码如下所示:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
AssignerWithPunctuatedWatermarks版本的代码如下所示:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPunctuatedWatermarksOperator<T> operator = new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
这两个版本的代码基本一致,仅仅是使用的operator不同。
TimestampsAndPeriodicWatermarksOperator
首先我们分析下TimestampsAndPeriodicWatermarksOperator源码。如下所示:
public class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; private transient long watermarkInterval; private transient long currentWatermark; public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) { super(assigner); // 允许此operator和它前后的其他operator形成operator chain this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); currentWatermark = Long.MIN_VALUE; // 获取env中配置的自动watermark触发间隔 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); // 注册一个processing time定时器,在watermarkInterval之后触发,调用本类的onProcessingTime方法 getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } @Override public void processElement(StreamRecord<T> element) throws Exception { // 调用用户传入的TimestampAssigner的extractTimestamp方法,获取timestamp final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 收集此元素和timestamp并发往下游 output.collect(element.replace(element.getValue(), newTimestamp)); } @Override // open方法中注册的定时器触发的时候执行此方法 public void onProcessingTime(long timestamp) throws Exception { // register next timer // 调用用户传入的方法获取当前watermark Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } // 再次schedule一个processing time定时任务 long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } /** * Override the base implementation to completely ignore watermarks propagated from * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit * watermarks from here). */ // 忽略上游的所有watermark // 有一个例外就是上接收到timestamp为Long.MAX_VALUE的watermark // 此时意味着输入流已经结束,需要将这个watermark发往下游 @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } @Override public void close() throws Exception { super.close(); // emit a final watermark // operator关闭的时候再次出发一次watermark发送操作 Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } } }
TimestampsAndPunctuatedWatermarksOperator
该类的源码分析如下:
public class TimestampsAndPunctuatedWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPunctuatedWatermarks<T>> implements OneInputStreamOperator<T, T> { private static final long serialVersionUID = 1L; private long currentWatermark = Long.MIN_VALUE; public TimestampsAndPunctuatedWatermarksOperator(AssignerWithPunctuatedWatermarks<T> assigner) { super(assigner); this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<T> element) throws Exception { final T value = element.getValue(); // 调用用户方法获取timestamp final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 收集元素 output.collect(element.replace(element.getValue(), newTimestamp)); // 调用用户方法获取watermark,发送给下游 final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp); if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } /** * Override the base implementation to completely ignore watermarks propagated from * upstream (we rely only on the {@link AssignerWithPunctuatedWatermarks} to emit * watermarks from here). */ // 和TimestampsAndPeriodicWatermarksOperator的方法一样,不再赘述 @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } }
经过分析可知这两个operator最大的区别是TimestampsAndPeriodicWatermarksOperator会周期性的发送watermark,即便没有数据,仍会周期性发送timestamp相同的watermark,而TimestampsAndPunctuatedWatermarksOperator不会周期性发送watermark,只在每次元素到来的时候才发送watermark。
这个timestamp提取器适用于顺序到来元素携带的timestamp严格递增的场景。
以下是extractTimestamp方法的源代码。该方法多了一个判断逻辑。如果新元素提取出的timestamp比currentTimestamp小的话,说明timestamp没有严格递增。接下来violationHandler的handleViolation会被调用。handleViolation是timestamp没有严格递增时候的回调函数。用户可以自己实现回调函数,也可以使用系统实现好的两个回调,分别是:
@Override public final long extractTimestamp(T element, long elementPrevTimestamp) { final long newTimestamp = extractAscendingTimestamp(element); if (newTimestamp >= this.currentTimestamp) { this.currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } }
BoundedOutOfOrdernessTimestampExtractor
watermark最常用的场景就是允许一定程度的数据乱序(有一个来迟数据的最大允许容忍时间,超过这个时间的数据不会被计算,由旁路输出处理)。Flink根据这种场景为我们实现好了一个timestamp提取器。该提取器中有一个重要变量maxOutOfOrderness,含义为上句话括号中所述的数据来迟最大容忍时间。该提取器是一个抽象类,使用时需要用户继承此类,实现extractTimestamp(T element)方法,编写根据元素来获取timestamp的逻辑。
该提取器的extractTimestamp(T element, long previousElementTimestamp)方法和分析如下所示:
@Override public final long extractTimestamp(T element, long previousElementTimestamp) { // 调用用户实现的方法,从元素获取timestamp long timestamp = extractTimestamp(element); // currentMaxTimestamp存储了已处理数据最大的timestamp // 初始值为Long.MIN_VALUE + maxOutOfOrderness if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; }
此方法由之前所讲的两个operator调用。用户不需要考虑如何实现这个方法,只需要实现该方法间接调用的extractTimestamp(T element)方法即可。
getCurrentWatermark获取当前watermark方法代码如下:
@Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. // 主要逻辑在此,发送watermark的时间为减去maxOutOfOrderness // 含义为maxOutOfOrderness时间之前的数据已经到齐 // 这样保证了只有maxOutOfOrderness时间之前的数据才进行计算 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 此处防止watermark倒流 if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); }
IngestionTimeExtractor
和AutomaticWatermarkContext生成watermark的逻辑基本一致,只是没有watermark对齐操作。使用系统当前时间作为watermark的timestamp发往下游。
public class IngestionTimeExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = -4072216356049069301L; private long maxTimestamp; @Override public long extractTimestamp(T element, long previousElementTimestamp) { // make sure timestamps are monotonously increasing, even when the system clock re-syncs final long now = Math.max(System.currentTimeMillis(), maxTimestamp); maxTimestamp = now; return now; } @Override public Watermark getCurrentWatermark() { // make sure timestamps are monotonously increasing, even when the system clock re-syncs final long now = Math.max(System.currentTimeMillis(), maxTimestamp); maxTimestamp = now; return new Watermark(now - 1); } }