依赖:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> </dependencies>
导包:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.FlatMapIterator; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Row; import java.util.Arrays; import java.util.Iterator;
java代码:
public class toMySQL { public static void main(String[] args) throws Exception { JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456") .setQuery("insert into words (word,count) values (?,?) ") //设置为每2条数据就提交一次 .setBatchInterval(2) .finish(); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data = env.readTextFile("datas/1.txt"); //将读取的字符串按照空格分割成单个单词 FlatMapOperator<String, String> data1 = data.flatMap(new FlatMapIterator<String, String>() { @Override public Iterator<String> flatMap(String s) throws Exception { //先把标点符号都去除 String s1 = s.replace("?", ""); String s2 = s1.replace(".", ""); String s3 = s2.replace(",", ""); return Arrays.asList(s3.split(" ")).iterator(); } }); MapOperator<String, Tuple2<String, Integer>> data2 = data1.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }); //将Tuple对象,按第一个元素进行分区,再将第二个元素进行累加 AggregateOperator<Tuple2<String, Integer>> data3 = data2.groupBy(0).sum(1); MapOperator<Tuple2<String, Integer>, Row> data4 = data3.map(new MapFunction<Tuple2<String, Integer>, Row>() { @Override public Row map(Tuple2<String, Integer> ss) throws Exception { Row row = new Row(2); row.setField(0, ss.f0); row.setField(1, ss.f1); return row; } }); data4.print(); data4.output(jdbcOutput); env.execute(); } }
结果:
ps注意:
由于数据库字段名不区分大小写,因此不要把word设置为主键。因为存在大小写不同的单词,会发生主键冲突。