MySql教程

Flink实现单词计数并写入MySQL

本文主要是介绍Flink实现单词计数并写入MySQL,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Flink实现单词计数并写入MySQL

依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

导包:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.Iterator;

java代码:

public class toMySQL {
    public static void main(String[] args) throws Exception {
        JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456")
                .setQuery("insert into  words (word,count) values (?,?) ")
                //设置为每2条数据就提交一次
                .setBatchInterval(2)
                .finish();

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.readTextFile("datas/1.txt");

        //将读取的字符串按照空格分割成单个单词
        FlatMapOperator<String, String> data1 = data.flatMap(new FlatMapIterator<String, String>() {
            @Override
            public Iterator<String> flatMap(String s) throws Exception {
                //先把标点符号都去除
                String s1 = s.replace("?", "");
                String s2 = s1.replace(".", "");
                String s3 = s2.replace(",", "");
                return Arrays.asList(s3.split(" ")).iterator();
            }
        });

        MapOperator<String, Tuple2<String, Integer>> data2 = data1.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //将Tuple对象,按第一个元素进行分区,再将第二个元素进行累加
        AggregateOperator<Tuple2<String, Integer>> data3 = data2.groupBy(0).sum(1);

        MapOperator<Tuple2<String, Integer>, Row> data4 = data3.map(new MapFunction<Tuple2<String, Integer>, Row>() {
            @Override
            public Row map(Tuple2<String, Integer> ss) throws Exception {
                Row row = new Row(2);
                row.setField(0, ss.f0);
                row.setField(1, ss.f1);
                return row;
            }
        });

        data4.print();
        data4.output(jdbcOutput);
        env.execute();
    }
}

结果:
在这里插入图片描述
在这里插入图片描述
ps注意:
由于数据库字段名不区分大小写,因此不要把word设置为主键。因为存在大小写不同的单词,会发生主键冲突。

这篇关于Flink实现单词计数并写入MySQL的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!