本篇文章以Oracle为例:
public class SinkOracle extends RichSinkFunction<Tuple4<String, Long, String, Double>> { private Connection connection; private PreparedStatement statement; // 1,初始化 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(""); connection = DriverManager.getConnection("","",""); String sql = "insert into STREAMING.TRANSACTION VALUES(?,?,?,?)"; String sql2 = "select * from STREAMING.TRANSACTION"; statement = connection.prepareStatement(sql); } // 2,执行 @Override public void invoke(Tuple4<String, Long, String, Double> value, Context context) throws Exception { System.out.println("value.toString()-------" + value.toString()); statement.setString(1, value.f0); statement.setLong(2, value.f1); statement.setString(3, value.f2); statement.setDouble(4, value.f3); statement.execute(); } // 3,关闭 @Override public void close() throws Exception { super.close(); if (statement != null) statement.close(); if (connection != null) connection.close(); } }