需要对大量数据进行处理时,全部读取放入内存会导致内存溢出。针对这种处理大数据出现的常见问题,可以采用流式读取数据的方法。对于不同的关系型数据库,流式读取数据的方式略有不同,下面对常见的关系型数据库流式读取数据的方式做一点个人总结。
前提:使用JDBC方式读取
MySQL中需要特别注意的是数据库连接的url,必须配置useCursorFetch=true。例如:
jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true
具体流式读取大数据的代码:
public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception { logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize); List<Map<String, String>> rows = new ArrayList(); int rowIndex = 0; int batchIndex = 0; Statement stmt = conn.createStatement(1003, 1007); stmt.setFetchSize(fetchSize); stmt.setFetchDirection(1000); ResultSet rs = stmt.executeQuery(sql); while(rs.next()) { Map<String, String> row = new HashMap(); for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) { row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i)); } rows.add(row); ++rowIndex; if (rows.size() % fetchSize == 0) { // TODO 数据处理逻辑 doSomething() rows.clear(); ++batchIndex; } } if (!rows.isEmpty()) { // TODO 数据处理逻辑 doSomething() } return rowIndex; }
参数说明: conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception { logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize); List<Map<String, String>> rows = new ArrayList(); int rowIndex = 0; int batchIndex = 0; Statement stmt = conn.createStatement(1003, 1007); stmt.setFetchSize(fetchSize); stmt.setFetchDirection(1000); ResultSet rs = stmt.executeQuery(sql); while(rs.next()) { Map<String, String> row = new HashMap(); for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) { row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i)); } rows.add(row); ++rowIndex; if (rows.size() % fetchSize == 0) { // TODO 数据处理逻辑 doSomething() rows.clear(); ++batchIndex; } } if (!rows.isEmpty()) { // TODO 数据处理逻辑 doSomething() } return rowIndex; }
参数说明: conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
public int fetchData(Connection conn, int fetchSize, String sql) throws Exception { logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize); List<Map<String, String>> rows = new ArrayList(); int rowIndex = 0; int batchIndex = 0; Statement stmt = conn.createStatement(1003, 1007); conn.setAutoCommit(false); // 关闭自动提交事务 stmt.setFetchSize(fetchSize); ResultSet rs = stmt.executeQuery(sql); while(rs.next()) { Map<String, String> row = new HashMap(); for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) { row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i)); } rows.add(row); ++rowIndex; if (rows.size() % fetchSize == 0) { // TODO 数据处理逻辑 doSomething() rows.clear(); ++batchIndex; } } if (!rows.isEmpty()) { // TODO 数据处理逻辑 doSomething() } return rowIndex; }
参数说明: conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
其他关系型数据库流式读取数据的需求还没有遇到,后期会继续完善。