这里说一点:
DeduplicateFunctionBase
父类中的ValueState
进行keyby状态去重的,因此为何upsert-kafka需要在kafka的message中带有key;/** * Base class for deduplicate function. * * @param <T> Type of the value in the state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { private static final long serialVersionUID = 1L; // the TypeInformation of the values in the state. protected final TypeInformation<T> typeInfo; protected final long stateRetentionTime; protected final TypeSerializer<OUT> serializer; // state stores previous message under the key. protected ValueState<T> state; public DeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long stateRetentionTime) { this.typeInfo = typeInfo; this.stateRetentionTime = stateRetentionTime; this.serializer = serializer; } @Override public void open(Configuration configure) throws Exception { super.open(configure); ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } state = getRuntimeContext().getState(stateDesc); } }
state进行deduplicate具体实现:
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper
/** * Processes element to deduplicate on keys with process time semantic, sends current element as * last row, retracts previous element if needed. * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates * @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRowOnProcTime( RowData currentRow, boolean generateUpdateBefore, boolean generateInsert, ValueState<RowData> state, Collector<RowData> out) throws Exception { checkInsertOnly(currentRow); if (generateUpdateBefore || generateInsert) { // use state to keep the previous row content if we need to generate UPDATE_BEFORE // or use to distinguish the first row, if we need to generate INSERT RowData preRow = state.value(); state.update(currentRow); if (preRow == null) { // the first row, send INSERT message currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { if (generateUpdateBefore) { preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } } else { // always send UPDATE_AFTER if INSERT is not needed currentRow.setRowKind(RowKind.UPDATE_AFTER); out.collect(currentRow); } }