C/C++教程

Checkpoint对齐机制源码分析

本文主要是介绍Checkpoint对齐机制源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现Exactly-Once语义,不对齐,实现At-Least-Once语义。官方文档解释:

 

 

 注:图片截图 https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/internals/stream_checkpointing.html#exactly-once-vs-at-least-once

对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。

checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:

//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler
//被调用
public static CheckpointBarrierHandler createCheckpointBarrierHandler(
      StreamTask<?, ?> checkpointedTask,
      CheckpointingMode checkpointMode,
      IOManager ioManager,
      InputGate inputGate,
      Configuration taskManagerConfig) throws IOException {

    CheckpointBarrierHandler barrierHandler;
    if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
      long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
      if (!(maxAlign == -1 || maxAlign > 0)) {
        throw new IllegalConfigurationException(
          TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
          + " must be positive or -1 (infinite)");
      }
      if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
        barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
      } else {
        barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
      }
    } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
      barrierHandler = new BarrierTracker(inputGate);
    } else {
      throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
    }

    if (checkpointedTask != null) {
      barrierHandler.registerCheckpointEventHandler(checkpointedTask);
    }
    return barrierHandler;
  }
由此可见BarrierBuffer用来实现对齐机制,BarrierTracker用来实现非对齐机制。


对齐-BarrierBuffer
在BarrierBuffer包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels ,BufferBlocker内部包含一个ArraryDeque的队列,用于缓存对齐时的数据,blockedChannels用于判断通道是否处于对齐状态中。对齐流程方法:

@Override
  public BufferOrEvent getNextNonBlocked() throws Exception {
    while (true) {

      //.....
      BufferOrEvent bufferOrEvent = next.get();
      if (isBlocked(bufferOrEvent.getChannelIndex())) {
         //当前获取数据channel处于对齐状态中则将数据添加到缓存中
         //也就是 BufferBlocker中
        bufferBlocker.add(bufferOrEvent);
        checkSizeLimit();
      }
      else if (bufferOrEvent.isBuffer()) {
        //buffer 则直接返回
        return bufferOrEvent;
      }
      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
        if (!endOfStream) {
          // 处理CheckpointBarrier 类型的数据
          processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
        }
      }
      //.......
    }
  }
processBarrier方法:
 private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
    //barrierId表示当前批次的checkpointId
    final long barrierId = receivedBarrier.getId();
    // 如果是单输入流 则直接触发checkpoint
    if (totalNumberOfInputChannels == 1) {
      if (barrierId > currentCheckpointId) {
        // new checkpoint
        currentCheckpointId = barrierId;
        notifyCheckpoint(receivedBarrier);
      }
      return;
    }
    //多输入流的处理,numBarriersReceived表示已接收到的
     //当前批次checkpointId 的channel 个数
     //numBarriersReceived >0 表示正在对齐过程中
    if (numBarriersReceived > 0) {
      // this is only true if some alignment is already progress and was not canceled
      if (barrierId == currentCheckpointId) {
        // regular case
        onBarrier(channelIndex);
      }
      else if (barrierId > currentCheckpointId) {
        // 如果到来的barrierId也就是checkpointId 大于当前正在
        //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)
        // 并且重置blockedChannels状态 重置numBarriersReceived为0
        //然后开启下一次(barrierId) checkpoint对齐机制
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
            "Skipping current checkpoint.",
          inputGate.getOwningTaskName(),
          barrierId,
          currentCheckpointId);

        notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
        releaseBlocksAndResetBarriers();
        beginNewAlignment(barrierId, channelIndex);
      }
      else {
        // ignore trailing barrier from an earlier checkpoint (obsolete now)
        return;
      }
    }
    else if (barrierId > currentCheckpointId) {
      //numBarriersReceived==0   开启一次新的chechpoint
      //将对应的blockedChannels置为阻塞状态true
      beginNewAlignment(barrierId, channelIndex);
    }
    else {
      // either the current checkpoint was canceled (numBarriers == 0) or
      // this barrier is from an old subsumed checkpoint
      return;
    }

    // check if we have all barriers - since canceled checkpoints always have zero barriers
    // this can only happen on a non canceled checkpoint
    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
      // actually trigger checkpoint
      if (LOG.isDebugEnabled()) {
        LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
          inputGate.getOwningTaskName(),
          receivedBarrier.getId(),
          receivedBarrier.getTimestamp());
      }
      //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中
      //被消费 ,然后触发checkpoint
      releaseBlocksAndResetBarriers();
      notifyCheckpoint(receivedBarrier);
    }
  }

 

非对齐-BarrierTracker对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的checkpointBarrier达到就开始执行checkpoint。

public BufferOrEvent getNextNonBlocked() throws Exception {
    while (true) {
      Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
      if (!next.isPresent()) {
        // buffer or input exhausted
        return null;
      }

      BufferOrEvent bufferOrEvent = next.get();
      if (bufferOrEvent.isBuffer()) {
        return bufferOrEvent;
      }
      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
        processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
      }
      else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
        processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
      }
      else {
        // some other event
        return bufferOrEvent;
      }
    }
  }
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
    final long barrierId = receivedBarrier.getId();
    // 如果只有一个输入则直接触发checkpoint
    if (totalNumberOfInputChannels == 1) {
      notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
      return;
    }

    // general path for multiple input channels
    if (LOG.isDebugEnabled()) {
      LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
    }

    // find the checkpoint barrier in the queue of pending barriers
    CheckpointBarrierCount cbc = null;
    int pos = 0;
   //寻找同一批次的checkpoint
    for (CheckpointBarrierCount next : pendingCheckpoints) {
      if (next.checkpointId == barrierId) {
        cbc = next;
        break;
      }
      pos++;
    }

    if (cbc != null) {
      // add one to the count to that barrier and check for completion
      int numBarriersNew = cbc.incrementBarrierCount();
      if (numBarriersNew == totalNumberOfInputChannels) {
        // 集齐七龙珠 可以触发checkpoint了
        for (int i = 0; i <= pos; i++) {
          pendingCheckpoints.pollFirst();
        }

        // notify the listener
        if (!cbc.isAborted()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Received all barriers for checkpoint {}", barrierId);
          }

          notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
        }
      }
    }
    else {
      // 新的开始了
      if (barrierId > latestPendingCheckpointID) {
        latestPendingCheckpointID = barrierId;
        pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

        // make sure we do not track too many checkpoints
        if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
          pendingCheckpoints.pollFirst();
        }
      }
    }
  }
非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个CheckpointBarrierCount类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当checkpointBarrier个数与channel个数相同则会触发checkpoint。
这篇关于Checkpoint对齐机制源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!