本文主要研究一下debezium的BlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
public interface Reader { public static enum State { /** * The reader is stopped and static. */ STOPPED, /** * The reader is running and generated records. */ RUNNING, /** * The reader has completed its work or been explicitly stopped, but not all of the generated records have been * consumed via {@link Reader#poll() polling}. */ STOPPING; } public String name(); public State state(); public void uponCompletion(Runnable handler); public default void initialize() { // do nothing } public default void destroy() { // do nothing } public void start(); public void stop(); public List<SourceRecord> poll() throws InterruptedException; } 复制代码
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java
public class BlockingReader implements Reader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>(); private final AtomicReference<State> state = new AtomicReference<>(); private final Metronome metronome; private final String name; private final String runningLogMessage; public BlockingReader(String name, String runningLogMessage) { this.name = name; this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM); this.runningLogMessage = runningLogMessage; } /** * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested. */ @Override public List<SourceRecord> poll() throws InterruptedException { if (state.get() == State.STOPPED) { return null; } metronome.pause(); state.compareAndSet(State.RUNNING, State.STOPPING); return null; } @Override public State state() { return state.get(); } @Override public void uponCompletion(Runnable handler) { assert this.uponCompletion.get() == null; this.uponCompletion.set(handler); } @Override public void start() { state.set(State.RUNNING); logger.info(runningLogMessage); } @Override public void stop() { try { state.set(State.STOPPED); // Cleanup Resources Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once if (completionHandler != null) { completionHandler.run(); } } finally { logger.info("Blocking Reader has completed."); } } @Override public String name() { return name; } } 复制代码
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java
public class TimedBlockingReader extends BlockingReader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Duration timeout; private volatile Timer timer; /** * @param name Name of the reader * @param timeout Duration of time until this TimedBlockingReader should stop */ public TimedBlockingReader(String name, Duration timeout) { super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding"); this.timeout = timeout; } @Override public void start() { super.start(); this.timer = Threads.timer(Clock.SYSTEM, timeout); } @Override public List<SourceRecord> poll() throws InterruptedException { super.poll(); // Stop when we've reached the timeout threshold if (timer != null && timer.expired()) { stop(); } return null; } } 复制代码
BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null