从图中可以看出,flink在处理数据中有三大块:source operator sink
最开始的编写hbase相关的sink代码( 工作相关业务代码已屏蔽)如下:
public class BatchHbaseSink extends RichSinkFunction<JSONObject> { private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class); private Connection connection; private long lastInvokeTime; private final List<Put> puts = new ArrayList(); /** * 每次处理最多500条数据 */ private final int maxSize = 500; /** * 最长延迟5秒 */ private final long delayTime = 5000L; public BatchHbaseSink() { System.out.println("new batch =========== " + Thread.currentThread().getName()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HBaseUtil.getConnection( this.config.getZookeeperQuorum(), this.config.getClientPort() ); lastInvokeTime = System.currentTimeMillis(); } @Override public void invoke(JSONObject value, Context context) throws Exception { try { .....省略 Put put = new Put(keySuffix + keySuffix).getBytes()); put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString())); puts.add(put); final long currentTime = System.currentTimeMillis(); final long l = currentTime - lastInvokeTime; if (puts.size() >= maxSize || l >= delayTime) { Table table = connection.getTable(TableName.valueOf(tableName)); // 数据提交 table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } catch (Exception e) { logger.error("error:", e); } } @Override public void close() throws IOException { if (connection != null) { connection.close(); } } }
public class BatchHbaseSink extends RichSinkFunction<JSONObject> { private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class); private Connection connection; public BatchHbaseSink() { System.out.println("new batch =========== " + Thread.currentThread().getName()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HBaseUtil.getConnection( this.config.getZookeeperQuorum(), this.config.getClientPort() ); } @Override public void invoke(JSONObject value, Context context) throws Exception { try { .....省略 Put put = new Put(keySuffix + keySuffix).getBytes()); put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString())); Table table = connection.getTable(TableName.valueOf(tableName)); // 数据提交 table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } catch (Exception e) { logger.error("error:", e); } } @Override public void close() throws IOException { if (connection != null) { connection.close(); } } }
System.out.println("---------" + Thread.currentThread().getName() + "---------");
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (2/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (4/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (8/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (7/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (1/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (6/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (5/8)---------
private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList()); private final static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); //定时刷新缓存数据到hbase public void initFlush() { scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { try { if (CollectionUtils.isNotEmpty(puts) && flag) { Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable())); table.put(puts); puts.clear(); table.close(); } } catch (Exception e) { logger.error("刷新缓存错误:", e); } }, 30, 5, TimeUnit.SECONDS); }
@Override public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); }
private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList()); //标志位 private volatile boolean flag = true; //定时刷新缓存数据到hbase public void initFlush() { final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { try { if (CollectionUtils.isNotEmpty(puts) && flag) { Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable())); table.put(puts); puts.clear(); table.close(); } } catch (Exception e) { logger.error("刷新缓存错误:", e); } }, 30, 5, TimeUnit.SECONDS); }