Flink在实时处理滑动窗口数据时, 由于窗口时间长, 滑动较为频繁, 导致算子计算压力过大, 下游算子计算速度抵不上上游数据产生速度, 会出现背压现象.
需求: 统计6小时用户设备共同用户数, 每10min统计一次
@Data @AllArgsConstructor // flatMap转换对象 private static class UserDevice { private final String userId; private final String deviceId; } @Data // 用户设备统计结果 // 第一个map存放用户最新设备, 直接put覆盖, 取最新设备 // 第二个map存放设备对应用户, 因为要去重, 所以使用set存放 private static class UserDeviceSummary { private final Map<String, String> userDevices = new HashMap<>(60000); // (uid, did) private final Map<String, Set<String>> deviceUsers = new HashMap<>(60000); // (did, Set<uid>) }
dataStreamSource .flatMap((FlatMapFunction<JSONArray, UserDevice>) (array, collector) -> { try { array.forEach(e -> { JSONObject one = (JSONObject) e; // 只处理opay_show事件 app_name in ('opay', '1') if (one.containsKey("uid") && one.containsKey("did")) { collector.collect(new UserDevice(one.getString("uid"), one.getString("did"))); } }); } catch (Exception ignored) { } }).returns(TypeInformation.of(new TypeHint<UserDevice>() { })).name("Stream flat map") .timeWindowAll(Time.hours(6), Time.minutes(10)) // 滑动窗口 .allowedLateness(Time.minutes(1)) .process(new ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>() { @Override public void process(ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>.Context context, Iterable<UserDevice> elements, Collector<UserDeviceSummary> out) throws Exception { UserDeviceSummary uds = new UserDeviceSummary(); for (UserDevice ud : elements) { try { // 不用线程安全集合, 提升效率 由于并行度为1, 应该不会有并发 uds.getUserDevices().put(ud.getUserId(), ud.getDeviceId()); if (!uds.getDeviceUsers().containsKey(ud.getDeviceId())) { uds.getDeviceUsers().put(ud.getDeviceId(), new HashSet<>()); } uds.getDeviceUsers().get(ud.getDeviceId()).add(ud.getUserId()); } catch (Exception ignore) { } } out.collect(uds); } }).name("Process to Map") .process(new ProcessFunction<UserDeviceSummary, Map<String, Integer>>() { @Override public void processElement(UserDeviceSummary uds, ProcessFunction<UserDeviceSummary, Map<String, Integer>>.Context ctx, Collector<Map<String, Integer>> out) throws Exception { Map<String, Integer> result = new HashMap<>(); for (String uid : uds.getUserDevices().keySet()) { try { int count = uds.getDeviceUsers().get(uds.getUserDevices().get(uid)).size(); result.put(uid, count); } catch (Exception e) { System.out.println("Process for sink error: " + e.getMessage()); } } out.collect(result); // 清空数据 协助gc uds.getUserDevices().clear(); uds.getDeviceUsers().clear(); result.clear(); } }).name("User device calc").print();
开始运行正常, 随着时间的推移, 数据堆积越来越大, 滑动过程中, 最大会有6h / 10min = 36次并行计算, cpu压力比较大, 并行度只能为1
使用滚动窗口替换滑动窗口, 既节省了内存, 也减少了cpu计算. 每10min滚动一次, 外部使用queue存储, 最大保存36个元素
private static final int SUMMARY_LIST_CAPACITY = 36; // merge list中36个元素 生成一个新的元素, 输出到下游 private static UserDeviceSummary merge(List<UserDeviceSummary> list) { UserDeviceSummary result = list.get(0); // 此处最好应该添加summary时间, 避免长时间没数据流入导致数据错误 int length = Math.min(list.size(), SUMMARY_LIST_CAPACITY); System.out.println("Merge tumbling summary: " + length); for (int i = 1; i < length; i++) { UserDeviceSummary current = list.get(i); result.getUserDevices().putAll(current.getUserDevices()); current.getDeviceUsers().forEach((key, value) -> result.getDeviceUsers().merge(key, value, (s1, s2) -> { s1.addAll(s2); return s1; })); } return result; }
List<UserDeviceSummary> list = new LinkedList<>(); dataStreamSource .flatMap((FlatMapFunction<JSONArray, UserDevice>) (array, collector) -> { try { array.forEach(e -> { JSONObject one = (JSONObject) e; // 只处理opay_show事件 app_name in ('opay', '1') if (one.containsKey("uid") && one.containsKey("did")) { collector.collect(new UserDevice(one.getString("uid"), one.getString("did"))); } }); } catch (Exception ignored) { } }).returns(TypeInformation.of(new TypeHint<UserDevice>() { })).name("Stream flat map") .timeWindowAll(Time.minutes(10)) // 使用滚动窗口代替滑动窗口, 节省资源 .process(new ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>() { @Override public void process(ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>.Context context, Iterable<UserDevice> elements, Collector<UserDeviceSummary> out) throws Exception { UserDeviceSummary uds = new UserDeviceSummary(); for (UserDevice ud : elements) { try { // 不用线程安全集合, 提升效率 uds.getUserDevices().put(ud.getUserId(), ud.getDeviceId()); if (!uds.getDeviceUsers().containsKey(ud.getDeviceId())) { uds.getDeviceUsers().put(ud.getDeviceId(), new HashSet<>()); } uds.getDeviceUsers().get(ud.getDeviceId()).add(ud.getUserId()); } catch (Exception ignore) { } } list.add(uds); if (list.size() > SUMMARY_LIST_CAPACITY) { list.remove(0); } out.collect(merge(list)); } }).name("Process to Map") .process(new ProcessFunction<UserDeviceSummary, Map<String, Integer>>() { @Override public void processElement(UserDeviceSummary uds, ProcessFunction<UserDeviceSummary, Map<String, Integer>>.Context ctx, Collector<Map<String, Integer>> out) throws Exception { Map<String, Integer> result = new HashMap<>(); for (String uid : uds.getUserDevices().keySet()) { try { int count = uds.getDeviceUsers().get(uds.getUserDevices().get(uid)).size(); result.put(uid, count); } catch (Exception e) { System.out.println("Process for sink error: " + e.getMessage()); } } out.collect(result); uds.getUserDevices().clear(); uds.getDeviceUsers().clear(); result.clear(); } }).name("User device calc").print();
再次部署, 服务运行正常!