之前我们介绍了flink中task的运行机制,以及数据在task线程内部以及不同TaskManager之间的流转过程。其中,网络传输Buffer数据以及task接收Buffer数据都会申请对应的Segment内存段,其中涉及堆内和堆外内存。这里我们从NetworkBuffer资源管理、flink的内存模型和MemorySegment具体分析一下flink中的内存管理机制。
之前我们介绍了flink中网络传输数据是通过NetworkBuffer的数据结构作为字节容器,功能和Netty的ByteBuf相同,是具有应用计数并实现了池化MemorySegment实例包装类。
NetworkBuffer底层使用的是MemorySegment存储字节数据,并提供了一系列Buffer数据操作方法,可以高效的访问数据。主要包换MemorySegment、ByteBufferAllocator和BufferRecycler。ByteBufferAllocator是Netty的分配内存组件,这里的实现是NettyBufferPool,NetworkBuffer要应用在Netty中,需要bootstrap.childOption(ChannelOption.ALLOCATOR, nettyBufferPool)为当前的NetworkBuffer设定ByteBufAllocator。BufferRecycler负责对当前的Buffer进行回收,常见的实现类有RemoteInputChannel、LocalBufferPool等。
flink中通过NetworkBufferPool和LocalBufferPool进行NetworkBuffer的申请、缓存Buffer。其中NetworkBufferPool与整个TaskManager绑定,用于提供TaskManager所需的Buffer;LocalBufferPool为ResultPartition和InputGate提供Buffer(LocalBufferPool设计的主要目的:缓存Segement、避免反复申请、释放Segement的开销)。
之前我们了解到,RecordWriter会将StreamRecord序列化为二进制数据,然后向ResultPartition申请BufferBuilder对象,用于构建BufferConsumer对象并将二进制数据写入BufferBuilder的内存区中。BufferConsumer会被存储到ResultSubPartition的BufferConsumer队列中,再通过ViewReader下发到TCP Channel中。
其中BufferBuilder的申请过程就是向ResultPartition中的LocalBufferPool申请Segment。LocalBufferPool会将申请到的MemorySegment封装为BufferBuilder返回。
public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { private BufferPool bufferPool; @Override public BufferBuilder getBufferBuilder() throws IOException, InterruptedException { checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(); } }
class LocalBufferPool implements BufferPool { @Override public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { return toBufferBuilder(requestMemorySegmentBlocking()); } }
InputGate也通过LocalBufferPool为网络中读取的数据提供Buffer。InputChannel中包含了floatingBuffers和exclusiveBuffers两个队列缓冲网络中读取的Buffer数据。
InputGate组件创建后,Task会调用InputGate.setUp()方法对InputGate的每一个InputChannel初始化,其中就会为每一个InputChannel申请专有Buffer缓冲区(InputChannel直接向NetworkBufferPool申请固定Buffer)。
public class SingleInputGate extends InputGate { @Override public void setup() throws IOException, InterruptedException { checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool."); // 为每一个InputChannel申请专有Buffer assignExclusiveSegments(); BufferPool bufferPool = bufferPoolFactory.get(); setBufferPool(bufferPool); //向上游ResultPartition发生RequestPartition请求,创建对于的ViewReader requestPartitions(); } @VisibleForTesting public void assignExclusiveSegments() throws IOException { synchronized (requestLock) { for (InputChannel inputChannel : inputChannels.values()) { if (inputChannel instanceof RemoteInputChannel) { ((RemoteInputChannel) inputChannel).assignExclusiveSegments(); } } } }
只有当对于Task发生反压时,RemoteInputChannel才会向LocalBufferPool申请FloatingBuffer(如果:LocalBufferPool没有足够资源,LocalBufferPool就会继续向NetworkBufferPool申请),且FloatingBuffer用完之后会返还给LocalBufferPool,释放给其他InputChannel使用,即InputGate中的所有InputChannel共享FloatingBuffer资源。
由此可见,LocalBufferPool的设计目的即是为了暂存反压申请的多余Buffer,其他InputChannel因反压需要Segment时,可以优先是有LocalBufferPool中暂存的Segment。当LocalBufferPool满了后,再由NetworkBufferPool回收。
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { void onSenderBacklog(int backlog) throws IOException { int numRequestedBuffers = 0; synchronized (bufferQueue) { // Similar to notifyBufferAvailable(), make sure that we never add a buffer // after releaseAllResources() released all buffers (see above for details). if (isReleased.get()) { return; } //计算需要的Buffer数 numRequiredBuffers = backlog + initialCredit; //当前可用Buffer不够,申请FloatingBuffer while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { //通过InputGate的LocalBufferPool申请FloatingBuffer Buffer buffer = inputGate.getBufferPool().requestBuffer(); if (buffer != null) { //将FloatingBuffer加入到FloatingBuffer队列中 bufferQueue.addFloatingBuffer(buffer); numRequestedBuffers++; } else if (inputGate.getBufferProvider().addBufferListener(this)) { // 如果没有多余的Buffer可以申请,则将当前的InputChannel添加到InputGate的BufferListener队列中 isWaitingForFloatingBuffers = true; break; } } } //向ResultPartition发生InputChannel的Credit,Credit值即为unannouncedCredit的值 if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) { notifyCreditAvailable(); } } }
FloatingBuffer的申请时机取决于上游Task发生的BackLog指标(即上游ResultSubPartition中挤压的Buffer数量)。如果BackLog指标大于InputChannel所有可有的Buffer总数(即FloatingBuffer和ExclusiveBuffer队列中Buffer数量的总和),再想LocalBufferPool申请Buffer。