转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型
相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型
Flink 1.10 对内部事件处理的线程模型做了一个大的改进,采用了类似 Actor 的信箱模型。这篇文章我们将深入 Flink 内部 Mailbox 线程模型的设计即实现。
在之前的线程模型中,StreamTask
中可能存在多个潜在的线程会修改内部的状态,因此需要通过加锁
的方式来确保线程安全的状态,这个全局的锁就是著名的 checkpointLock
。通过 checkpointLock
控制线程间的并发会让程序代码变得很复杂,并且锁对象还通过一些 API 暴露给了用户(例如 SourceFunction#getCheckpointLock()),如果没有正确加锁很容易引发线程安全问题。
为了解决这个问题,社区提出了基于 Mailbox 的线程模型,见 FLINK-12477
。Mailbox 机制借鉴了 Actor 模型,通过单个 Mailbox 线程配合阻塞队列的方式,将内部状态的修改交由单个线程完成,从而避免多线程的问题。相比于使用 checkpointLock
,Mailbox 模型另一个好处是方便控制事件处理的优先级,通过锁竞争很难达到类似的效果。
在原始的线程模型中,checkpointLock 主要用在三个地方:
事件处理:包括 events, watermarks, barriers, latency markers 的处理和发送
checkpoint
触发:通过 RPC 调用触发 checkpoint
(在 Source 中)、通知 checkpoint
的完成情况,(注:对下游来说,checkpoint 触发和取消是通过 barrier 触发的,归为第一种情况)
Processing Time Timers: 处理时间定时器是通过 ScheduledExecutor
异步执行的(事件事件定时器触发是通过 watermark 触发的,归为第一种情况)
在新的改进方案中,对锁的替换不仅仅要做到排他的效果,对于事件处理还需要保证原子性。
Mailbox 模型的核心思想其实比较简单,其底层就是 FIFO 的队列 + 一个单线程的循环事件处理
。所有需要处理的事件都封装成一个 Mail 投递到 Mailbox 中,然后按先后顺序由单线程加以处理
,从而简化了并发访问问题。
在使用 Mailbox
以前,StreamTask
的核心逻辑是在 StreamTask#run()
中,内部是一个循环的事件处理。除此以外,checkpoint trigger
和 processing time timer
在其它线程中运行。
在改进方案中,StreamTask 的基础逻辑大致如下(伪代码,来自设计文档):
BlockingQueue<Runnable> mailbox = ... void runMailboxProcessing() { //TODO: can become a cancel-event through mailbox eventually Runnable letter; while (isRunning()) { while ((letter = mailbox.poll()) != null) { letter.run(); } defaultAction(); } } void defaultAction() { // e.g. event-processing from an input }
上面只是核心代码的大致逻辑,具体的实现还有一些优化,比如队列的公平性
。之前的抢锁
操作是完全没有任何公平性
而言的。
在这个模型下,事件处理的循环被移到了 Mailbox
处理线程中,因此以往在 StreamTask#run()
中的循环逻辑就不再需要了。但这里会有个问题,因为历史原因,Flink Source Function
的核心逻辑是一个循环,这个循环不能和 Mailbox
的事件循环穿插执行,因此需要进行兼容性处理。在 FLIP-27
提出的新的 Source 接口中,已经可以比较好地和 Mailbox 模型进行兼容了。
对于 checkpoint trigger 和 processing time timer,只需要将对应的操作封装为 Mail 投递到 Mailbox 中,等待 Mailbox 线程进行处理即可
。
下面这张图展示了 Mailbox 线程模型中的核心抽象。
Mail 中封装了需要处理的消息和相应的动作
,checkpoint trigger 和 processing time timer 就是通过 Mail 触发的
;TaskMailbox
用于存储 Mail(需要处理的消息);MailboxProcessor
负责从 TaskMailbox 中取出信件并处理;其它的调用方通过 MailboxExecutor 向 TaskMailbox 中投递信件。
MailboxDefaultAction
则是 MailboxProcessor
的默认动作,如前所述,MailboxDefaultAction
主要负责处理基础的 stream event、barrier、watermark
等。在 Mailbox
主线程的循环中,处理完新的 Mail 后就会执行该动作。MailboxDefaultAction
通过一个 MailboxController
和 Mailbox
进行交互,可以借此获悉所有的事件都处理完毕,或者临时暂停 MailboxDefaultAction
。
TaskMailbox
的内部使用了一个普通的 Deque
存储写入的 Mail
,对 Deque
读写通过一个 ReentrantLock
来加以保护。Mailbox
的一个主要特性是可以做优先级控制
,每一个 Mail
都有其优先级
,从 TaskMailbox
获取 Mail
时可以指定优先级
,实际实现时就是通过遍历队列元素比较优先级
。
为了减少读取队列时的同步开销
,TaskMailbox
支持创建一个 batch
后续消费,相当于把队列中的元素存入一个额外的队列,后续消费时就避免了加锁的操作。
MailboxProcessot
核心就是前面提过的事件循环
,在这个事件循环中,除了处理 TaskMailbox
中的事件外,还有一个 MailboxDefaultAction
用做默认的行为
。
MailboxDefaultAction
和 TaskMailbox
内部的 Mail 的区别在于,Mail 通常用于一些控制类的消息处理
,例如 checkpoint 触发,而 MailboxDefaultAction
则用于数据流上的普通消息处理
(如正常的数据记录,barrier)等。数据流上的消息数据量比较大,通过邮箱内部队列进行处理显然开销比较大。
public class MailboxProcessor implements Closeable { //邮箱 protected final TaskMailbox mailbox; // 默认行为,用于普通的数据流上的消息数据处理 protected final MailboxDefaultAction mailboxDefaultAction; /** * Runs the mailbox processing loop. This is where the main work is done. This loop can be * suspended at any time by calling {@link #suspend()}. For resuming the loop this method should * be called again. * * // 运行邮箱处理循环。 这是完成主要工作的地方。 */ public void runMailboxLoop() throws Exception { suspended = !mailboxLoopRunning; final TaskMailbox localMailbox = mailbox; // 检查当前运行线程是否是 mailbox 线程,只有 mailbox 线程能运行该方法 //确保当前调用必须发生在 Mailbox 的事件处理线程中 checkState( localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!"); // mailbox 状态必须是 OPEN assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!"; final MailboxController defaultActionContext = new MailboxController(this); // TODO:邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务... while (isNextLoopPossible()) { // The blocking `processMail` call will not return until default action is available. // 在默认操作可用之前,阻塞的`processMail`调用将不会返回。 // 处理事件,这是一个阻塞方法,如果默认行为不可用,方法不会返回 processMail(localMailbox, false); // 再做一次检查,因为上面的 mail 处理可能会改变运行状态 if (isNextLoopPossible()) { // TODO: 执行一个默认的动作 邮箱默认操作在StreamTask构造器中指定,为 processInput mailboxDefaultAction.runDefaultAction( // 根据需要在默认操作中获取锁 defaultActionContext); // lock is acquired inside default action as needed } } }
MailboxExecutor
的主要作用是向 TaskMailbox
中投递 Mail
,这个接口被设计为类似 java.util.concurrent.Executor
接口。提交 Mail
的行为可以在任意线程中进行,因为 TaskMailbox 内部有基于锁的同步控制
。
除了提交 Mail 外,MailboxExecutor
还有一个比较重要的作用体现在 MailboxExecutor#yield
方法中。yield 这个词在程序设计语言中非常常见,但其含义往往又让人摸不着头脑。从字面解释来看,yield 有“让出”,“屈服”之意,在一些场景下也有“生成”的意思。这里我们不纠结这个,还是来看看这个方法设计的意图的什么。
Mailbox 模型中所有的事件都是在单个事件处理线程中处理的,排除掉优先级的因素,所有的事件按照 FIFO 的顺序加以处理
。正常情况下,这种处理顺序是没有问题的。但是考虑到一种特殊的情况,如果要完成对事件A的处理需要等待一个条件,只有在处理完事件B之后这个条件才能满足,但是事件B在队列里的顺序是在事件A之后的,这样某种程度上来说就造成了一种 “死锁”。
yield 方法就是为了解决上面的问题,yield 会从队列中取出下一个事件进行处理,看上去像是暂时“让出”了对当前事件的处理。
说起来有点抽象,看一个示例:
MailboxExecutor mailboxExecutor = .... mailboxExecutor.executr(() -> { // ... // 当前事件处理的逻辑,要完成,需要依赖后面某个事件的处理 while (resource not available) { // 取出下一个事件处理 mailboxExecutor.yield(); } // 继续处理当前事件 // ... })
注意,为了不破坏 Mailbox
模型单线程执行的特性,这个方法必须在 Mailbox
事件处理线程中调用。这是一个阻塞方法,因此可能会阻塞事件处理线程。有些场景下可能还需要依赖事件处理线程来提交新的事件,因此也提供了非阻塞的 tryYield
方法。
StreamTask 的核心是处理消息流中的 StreamRecord,这个处理逻辑是 MailboxProcessor 的默认行为,即:
class StreamTask { protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { InputStatus status = inputProcessor.processInput(); //处理输入 if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) { return; } if (status == InputStatus.END_OF_INPUT) { // 没有后续的输入了,告知 MailboxDefaultAction.Controller controller.allActionsCompleted(); return; } // 暂时没有输入的情况 TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup(); TimerGauge timer; CompletableFuture<?> resumeFuture; if (!recordWriter.isAvailable()) { timer = ioMetrics.getBackPressuredTimePerSecond(); resumeFuture = recordWriter.getAvailableFuture(); } else { timer = ioMetrics.getIdleTimeMsPerSecond(); resumeFuture = inputProcessor.getAvailableFuture(); } // 一旦有输入了,就告知 controller 要恢复 MailboxDefaultAction 的处理 assertNoException( resumeFuture.thenRun( // 首先会暂停 MailboxDefaultAction 的处理 new ResumeWrapper(controller.suspendDefaultAction(timer), timer))); } private static class ResumeWrapper implements Runnable { private final Suspension suspendedDefaultAction; private final TimerGauge timer; public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) { this.suspendedDefaultAction = suspendedDefaultAction; timer.markStart(); this.timer = timer; } @Override public void run() { timer.markEnd(); suspendedDefaultAction.resume(); } } }
对于 checkpoint 的触发,是通过 MailboxExecutor 提交一个 Mail 来实现的:
@Override public Future<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { CompletableFuture<Boolean> result = new CompletableFuture<>(); mainMailboxExecutor.execute( () -> { try { // 触发Checkpoint操作 这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger // 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。 result.complete( triggerCheckpointAsyncInMailbox( checkpointMetaData, checkpointOptions)); } catch (Exception ex) { // Report the failure both via the Future result but also to the mailbox result.completeExceptionally(ex); throw ex; } }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions); return result; }
checkpoint 完成或者放弃的通知也是提交到 Mailbox 中运行的:
class StreamTask { // checkpoint 完成或者失败的回调通知操作 private Future<Void> notifyCheckpointOperation( RunnableWithException runnable, String description) { CompletableFuture<Void> result = new CompletableFuture<>(); mailboxProcessor .getMailboxExecutor(TaskMailbox.MAX_PRIORITY) .execute( () -> { try { runnable.run(); } catch (Exception ex) { result.completeExceptionally(ex); throw ex; } result.complete(null); }, description); return result; } }
对于 processing time timer 的触发也是类似的:
class streamTask { public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() { return mailboxExecutor -> new ProcessingTimeServiceImpl( timerService, callback -> deferCallbackToMailbox(mailboxExecutor, callback)); } ProcessingTimeCallback deferCallbackToMailbox( MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) { return timestamp -> { // 提交到 mailbox 中运行 mailboxExecutor.execute( () -> invokeProcessingTimeCallback(callback, timestamp), "Timer callback for %s @ %d", callback, timestamp); }; } }
前面提到,因为历史遗留的问题,SourceFunction
被设计成一个无限的循环,这个循环不能和 Mailbox
的事件循环穿插执行,因此需要进行兼容性处理。
SourceStreamTask
被设计为 StreamTask
的子类,会启动另外一个独立的线程 LegacySourceFunctionThread
运行 SourceFunction
中的循环。这样相当于有两个线程在同时运行
SourceFunction
中生成数据流中的数据Mailbox
中的事件处理线程。为了防止这两个线程发生冲突,在 SourceStreamTask 中保留了 checkpoint lock,用于在这两个线程间进行并发控制。
为了达到这样的效果,Flink 提供了一个 StreamTaskActionExecutor
的封装,用来运行 Runnable
。正常情况下,StreamTaskActionExecutor
的实现就是直接去运行 Runnable
;同时也提供了一个 SynchronizedStreamTaskActionExecutor
的实现,在运行 Runnable
的时候会进行加锁控制,这样就把获取锁的操作引入到 Mailbox
处理线程中了:
class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor { private final Object mutex; public SynchronizedStreamTaskActionExecutor(Object mutex) { this.mutex = mutex; } @Override public void run(RunnableWithException runnable) throws Exception { synchronized (mutex) { runnable.run(); } } }
Mailbox 模型是常见的用来控制并发的一种设计,通过引入 Mailbox 的线程模型,Flink 简化了 StreamTask 的代码逻辑,规避了多线程竞争带来的并发问题。
通过对 Mailbox、 MailboxProcessor、MailboxExecutor 这几个接口的设计进行分析,可以看出 Flink 的 Mailbox 模型设计还是比较优雅的,在使用方面也比较简单,很值得我们在开发其它项目的时候参考。