图片来源{https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/}
从图中可以看出,flink在处理数据中有三大块:source operator sink
在工作时负责sink这块的数据下层工作,因为下层源有很多,有redis,hbase这些。在实现hbase相关的sink时,我优先考虑使用批量进行sink下层,这样可以提高下层数据时的效率问题。
最开始的编写hbase相关的sink代码( 工作相关业务代码已屏蔽)如下:
public class BatchHbaseSink extends RichSinkFunction<JSONObject> { private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class); private Connection connection; private long lastInvokeTime; private final List<Put> puts = new ArrayList(); /** * 每次处理最多500条数据 */ private final int maxSize = 500; /** * 最长延迟5秒 */ private final long delayTime = 5000L; public BatchHbaseSink() { System.out.println("new batch =========== " + Thread.currentThread().getName()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HBaseUtil.getConnection( this.config.getZookeeperQuorum(), this.config.getClientPort() ); lastInvokeTime = System.currentTimeMillis(); } @Override public void invoke(JSONObject value, Context context) throws Exception { try { .....省略 Put put = new Put(keySuffix + keySuffix).getBytes()); put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString())); puts.add(put); final long currentTime = System.currentTimeMillis(); final long l = currentTime - lastInvokeTime; if (puts.size() >= maxSize || l >= delayTime) { Table table = connection.getTable(TableName.valueOf(tableName)); // 数据提交 table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } catch (Exception e) { logger.error("error:", e); } } @Override public void close() throws IOException { if (connection != null) { connection.close(); } } }
我开始的想法很简单,每次等500条数据一次性写入hbase,同时设置延时时间为5秒,防止数据积压
但是当我发送20条需要处理的数据时,发现每次hbase每次写入的数据都不满足20条,运行多次后都有同样的问题,我重新review了代码逻辑,发现逻辑处理方面没有什么问题,我开始把代码改成,来一条数据发送一条数据代码如下
public class BatchHbaseSink extends RichSinkFunction<JSONObject> { private static final Logger logger = LoggerFactory.getLogger(BatchHbaseSink.class); private Connection connection; public BatchHbaseSink() { System.out.println("new batch =========== " + Thread.currentThread().getName()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = HBaseUtil.getConnection( this.config.getZookeeperQuorum(), this.config.getClientPort() ); } @Override public void invoke(JSONObject value, Context context) throws Exception { try { .....省略 Put put = new Put(keySuffix + keySuffix).getBytes()); put.addColumn(family.getBytes(), qualifier.getBytes(), Bytes.toBytes(val.toString())); Table table = connection.getTable(TableName.valueOf(tableName)); // 数据提交 table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } catch (Exception e) { logger.error("error:", e); } } @Override public void close() throws IOException { if (connection != null) { connection.close(); } } }
这样处理下层数据,数据不会发生‘’丢失‘’的情况,所以我第一反应是不是缓存的问题导致的,缓存出问题的场景大多数情况是出现线程安全的问题,所以我怀疑是不是线程安全问题,我在程序中添加了线程打印日志,看看是不是多线程处理数据。
System.out.println("---------" + Thread.currentThread().getName() + "---------");
处理数据过程中打印了如下日志:
---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (2/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (4/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (8/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (7/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (1/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (6/8)--------- ---------Window(TumblingEventTimeWindows(10000), EventTimeTrigger, BaseOperator, PassThroughWindowFunction) -> (Sink: Print to Std. Out, Process -> (Sink: Unnamed, Sink: Unnamed)) (5/8)---------
可以看出,sink过程中有8个线程做处理,所以此时的我觉得已经找到问题的根因了,代码是多线程执行的,存在线程安全问题,所以我给批量处理的数据做了如下处理,将缓存改成线程安全的集合,同时将字段lastInvokeTime修饰为volatile,至此我以为我已经解决了该问题,但是结果仍然是发送的20条数据只落了部分数据到hbase。
这时我有点穷驴技穷了,我重新review了代码,发现一处代码逻辑漏洞
首先我开始设置批量处理数据时,只有当数据超过500条或者超过时间限制5秒钟我就会将数据都刷入hbase中,问题出现在了超时时间限制。我发送的数据是一次性的发送20条,当不满足500条的时候不会发送,但是触发超时5秒也会,但是这个超时只有在下次消费数据时才会触发这个逻辑,所以我需要定时任务去定时的将缓存中积压的数据刷到hbase,防止source一直没有数据导致缓存中的数据无法下层甚至丢失
知道原因,并且有解决办法就是采用定时任务来,我添加了如下代码:
private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList()); private final static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); //定时刷新缓存数据到hbase public void initFlush() { scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { try { if (CollectionUtils.isNotEmpty(puts) && flag) { Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable())); table.put(puts); puts.clear(); table.close(); } } catch (Exception e) { logger.error("刷新缓存错误:", e); } }, 30, 5, TimeUnit.SECONDS); }
想法是好的,现实很残酷,运行下来,下层到hbase的数据仍然缺失。我开始怀疑人生了,我debug了代码,发现缓存中的数据为空
如图:
这时候只能去看底层flink的源码了,我通过跟踪堆栈,发现了执行逻辑,sink的invoke方法的执行,在streamSink类中被调用:
@Override public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); }
我debug到了这边:
如下:
可以看出我上图中用黑色笔圈出的部分,每个线程执行的hbasesink对象都是不一样,也就是说每个线程都有自己的hbasesink对象。问题迎刃而解了,我设置的全局线程池去刷新缓存,是没法针对每个线程中的hbasesink中的缓存操作的,(线程局部变量对其它线程是不可见的)。所以我的做法很简单,我将全局刷新的线程,变为线程局部处理线程,也就是sink线程的子线程,变化逻辑非常简单:
private final List<Put> puts = Collections.synchronizedList(Lists.newArrayList()); //标志位 private volatile boolean flag = true; //定时刷新缓存数据到hbase public void initFlush() { final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1); scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { try { if (CollectionUtils.isNotEmpty(puts) && flag) { Table table = connection.getTable(TableName.valueOf(hbaseStorage.getTable())); table.put(puts); puts.clear(); table.close(); } } catch (Exception e) { logger.error("刷新缓存错误:", e); } }, 30, 5, TimeUnit.SECONDS); }
如上,我将创建线程的步骤放到了方法里,而非全局变量。