Java教程

Flink 流处理 API

本文主要是介绍Flink 流处理 API,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Flink 流处理 API

  • 1. Environment
    • getExecutionEnvironment
    • createLocalEnvironment
    • createRemoteEnvironment
  • 2. Source
    • 从集合读取数据
    • 从文件读取数据
    • 从 kafka 读取数据
    • 自定义 Source
  • 3. Transform
    • map
    • flatMap
    • Fliter
    • keyBy
      • 滚动聚合算子
    • Reduce
    • split 和 select
    • connect 和 coMap、coFlatMap
    • union
    • 支持的数据类型
    • UDF 函数(更细粒度的控制流)
    • 函数类(Function Classes)
    • 匿名函数(Lambda Functions)
    • 富函数(Rich Functions)
    • 数据重分区
  • 4. Sink
    • kafka
    • Redis
    • JDBC 自定义 Sink

       Flink 流处理 API 的调用过程主要分为以下几个步骤:

在这里插入图片描述
       所以 Flink API 也是围绕以下几个环节进行介绍。

1. Environment

       Environment,即 Flink 程序的执行环境。

getExecutionEnvironment

       创建一个执行环境,表示当前执行程序的上下文。

       如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。即 getExecutionEnvironment 会根据查询执行的方式返回什么样的运行环境,最常用的一种创建执行环境的方式。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

       如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认为 1。

在这里插入图片描述

       可以通过 setParallelism() 方法更改并行度。

env.setParallelism(2);

createLocalEnvironment

       返回本地执行环境,可以在调用时指定默认的并行度。

LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();

// 指定并行度为 2
LocalStreamEnvironment localEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment(2);

createRemoteEnvironment

       返回集群执行环境,将 Jar 提交到远程服务器。

       需要在调用时指定JobManager 的 IP 和 Port,并指定要在集群中运行到 Jar 包。

StreamExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, "WordCount.jar");

2. Source

       Source,即数据源,通过数据源读入数据至 Flink 程序中执行,Source 支持多种方式读取数据。

        存在如下一个类,用于记录用户日志。

import lombok.Data;

@Data
public class UserLog {

    private String id;

    private Long timeStamp;

    private String log;

    public UserLog() {

    }

    public UserLog(String id, Long timeStamp, String log) {
        this.id = id;
        this.timeStamp = timeStamp;
        this.log = log;
    }
}

从集合读取数据

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<UserLog> userLogs = Arrays.asList(
                new UserLog("user_1", 1547718199L, "aaa"),
                new UserLog("user_2", 1547718199L, "bbb"),
                new UserLog("user_3", 1547718199L, "ccc")
        );


        // 从集合中读取数据
        DataStream<UserLog> dataStream = env.fromCollection(userLogs);

        // 打印输出
        dataStream.print("userLogs");

        // 执行
        env.execute();
    }

在这里插入图片描述

从文件读取数据

# userLog.txt
user_1, 1547718212, aaa
user_1, 1547718213, bbb
user_1, 1547718214, ccc
user_2, 1547718212, ddd
user_2, 1547718213, eee
user_3, 1547718214, fff
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据
        DataStream<String> dataStream = env.readTextFile("userLog.txt");

        // 打印输出
        dataStream.print();

        // 执行
        env.execute();
    }

在这里插入图片描述

从 kafka 读取数据

       需要引入 kafka 连接器到依赖。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从文件中读取数据
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>(
                "user_log", new SimpleStringSchema(), properties));

        // 打印输出
        dataStream.print();

        // 执行
        env.execute();
    }

在这里插入图片描述
在这里插入图片描述

自定义 Source

       除了以上的source数据来源,我们还可以自定义source。只需要传入一个SourceFunction就可以。具体调用如下:

DataStream<UserLog> dataStream = env.addSource(new SourceFunction<UserLog>() {
    @Override
    public void run(SourceContext<UserLog> ctx) throws Exception {

    }

    @Override
    public void cancel() {

    }
});

       我们通过重写run方法控制数据的生成,通过cancel方法控制不再读入数据。

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从自定义读取数据
        final UserLogSource userLogSource = new UserLogSource();
        DataStream<UserLog> dataStream = env.addSource(userLogSource);
        // 打印输出
        dataStream.print();
        // 执行
        env.execute();
    }

    // 实现自定义的 SourceFunction
    public static class UserLogSource implements SourceFunction<UserLog> {
        // 定义标识位,用来控制数据的产生
        private volatile boolean running = true;
        @Override
        public void run(SourceContext<UserLog> ctx) throws Exception {
            int num = 0;
            while (running) {
                UserLog userLog = new UserLog();
                userLog.setId(num++ + "");
                userLog.setTimeStamp(System.currentTimeMillis());
                userLog.setLog("log:" + num);
                ctx.collect(userLog);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }

在这里插入图片描述

       可以看到,自定义source的方式非常灵活,可以实现任意形式的数据读入。如从 kafka、文件、mysql、redis、es 等都可以通过自定义source等的方式进行读入。

3. Transform

       在将数据读入后,我们可以需要进行大量的转换操作,而转换操作就是通过如下 API 实现的。

map

       将数据元素转换为另一种格式的数据元素。

在这里插入图片描述

// 将元素由 string 转换为对应 string 的长度
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return value.length();
    }
});
test.txt
a
bb
ccc
dddd
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream =
                env.readTextFile("test.txt");

        // map 把 String 转换成长度输出
        DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) {
                return value.length();
            }
        });

        // 打印输出
        mapStream.print("map");

        env.execute();
    }

在这里插入图片描述

flatMap

       map只支持一对一的转换,如果需要进行一对多转换,则需要通过flatMap实现。
在这里插入图片描述

test.txt
a,1
bb,2
ccc,3
dddd,4
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // flatMap,按逗号分字段
        DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> collector) {
                String[] fields = value.split(",");
                for (String field : fields) {
                    collector.collect(field);
                }
            }
        });

        // 打印输出
        flatMapStream.print("flatMap");
        env.execute();
    }

Fliter

       过滤掉数据流中的元素。

在这里插入图片描述

test.txt
a1
b1
c1
d1
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream =env.readTextFile("test.txt");
                
        // filter,筛选 a 开头的数据
        DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) {
                return value.startsWith("a");
            }
        });

        // 打印输出
        filterStream.print("filter");

        env.execute();
    }

在这里插入图片描述

keyBy

       逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同的key的元素,在内部以hash的形式实现。

       调用keyBy,数据类型由DataStream 转换为KeyedStream
在这里插入图片描述

滚动聚合算子

       这些算子可以针对KeyedStream的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()
test.txt
user_1,100000,log_1
user_1,300000,log_3
user_1,200000,log_2
user_2,200000,log_5
user_2,100000,log_4
user_3,300000,log_6
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成UserLog类型
        DataStream<UserLog> dataStream = inputStream.map(new MapFunction<String, UserLog>() {
            public UserLog map(String value) throws Exception {
                String[] array = value.split(",");
                return new UserLog(array[0], Long.parseLong(array[1]), array[2]);
            }
        });

        // 分组
        KeyedStream<UserLog, Tuple> keyedStream = dataStream.keyBy("id");

        // 滚动聚合,取当前最大的时间戳
        DataStream<UserLog> resultStream = keyedStream.max("timeStamp");

        resultStream.print();

        env.execute();
    }

在这里插入图片描述

       通过输出可以看到,随着数据流的到来,成功获得最大的数据。

Reduce

       一个分组数据流的聚合操作,合并当前元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

       有如下一个需求,我们不仅要知道各科目最大的成绩,还要知道最大成绩的所属学生。如果只通过 max 的话,是无法满足我们的需求的,此时,就需要通过 reduce 来实现该需求。

@Data
public class Score {

    private String course;

    private String student;

    private int score;

    public Score() {

    }

    public Score(String course, String student, int score) {
        this.course = course;
        this.student = student;
        this.score = score;
    }
}
test.txt
语文,小明,80
语文,小王,70
语文,小张,90
语文,小李,100
数学,小明,100
数学,小王,90
数学,小张,80
数学,小李,70
英语,小明,70
英语,小王,80
英语,小张,90
英语,小李,100
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");
        // 转换成Score类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });
        // 分组
        KeyedStream<Score, Tuple> keyedStream = dataStream.keyBy("course");
        // reduce 聚合,取最大成绩
        DataStream<Score> reduceStream = keyedStream.reduce(new ReduceFunction<Score>() {
            @Override
            public Score reduce(Score value1, Score value2) throws Exception {

                String student = value1.getStudent();
                if (value1.getScore() < value2.getScore()) {
                    student = value2.getStudent();
                }
                return new Score(value1.getCourse(), student, Math.max(value1.getScore(), value2.getScore()));
            }
        });
        reduceStream.print();
        env.execute();
    }

在这里插入图片描述

split 和 select

       split:根据某些特征把一个DataStream拆分成两个或者多个DataStream

在这里插入图片描述

       select:从一个 SplitStream 中获取一个或者多个DataStream

在这里插入图片描述

test.txt
语文,小明,80
语文,小王,30
语文,小张,90
语文,小李,100
数学,小明,100
数学,小王,40
数学,小张,80
数学,小李,70
英语,小明,70
英语,小王,80
英语,小张,50
英语,小李,100
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成SensorReading类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 分流,按照分数60分为界,分成两条流
        SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
            @Override
            public Iterable<String> select(Score score) {
                return score.getScore() > 60 ?
                        Collections.singletonList("pass") : Collections.singletonList("unPass");
            }
        });

        DataStream<Score> passStream = splitStream.select("pass");
        DataStream<Score> unPassStream = splitStream.select("unPass");
        
        passStream.print("pass");
        unPassStream.print("unPass");
        
        env.execute();
    }

在这里插入图片描述

connect 和 coMap、coFlatMap

       connect:连接两个保持他们类型的数据流,两个数据流被 connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

在这里插入图片描述
       coMapcoFlatMap:作用于ConnectedStreams上,功能与mapflatMap一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。

public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成SensorReading类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 1、分流,按照分数60分为界,分成两条流
        SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
            @Override
            public Iterable<String> select(Score score) {
                return score.getScore() > 60 ?
                        Collections.singletonList("pass") : Collections.singletonList("unPass");
            }
        });

        DataStream<Score> passStream = splitStream.select("pass");
        DataStream<Score> unPassStream = splitStream.select("unPass");

        // 2、合流,将及格对数据流转换成二元组类型
        DataStream<Tuple2<String, Integer>> warningStream = passStream.map(new MapFunction<Score, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Score score) throws Exception {
                return new Tuple2<String, Integer>(score.getCourse(), score.getScore());
            }
        });

        // 3、与不及格对数据流连接合并之后,输出状态信息
        ConnectedStreams<Tuple2<String, Integer>, Score> connectedStream = warningStream.connect(unPassStream);


        DataStream<Object> resultStream = connectedStream.map(new CoMapFunction<Tuple2<String, Integer>, Score, Object>() {

            @Override
            public Object map1(Tuple2<String, Integer> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "pass");
            }

            @Override
            public Object map2(Score value) throws Exception {
                return new Tuple3<>(value.getCourse(), value.getScore(), "unpass");
            }
        });

        resultStream.print("resultStream");
 }

在这里插入图片描述

union

       union:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新 DataStream

在这里插入图片描述

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("text.txt");

        // 转换成SensorReading类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 1、分流,按照分数60分为界,分成两条流
        SplitStream<Score> splitStream = dataStream.split(new OutputSelector<Score>() {
            @Override
            public Iterable<String> select(Score score) {
                return score.getScore() > 60 ?
                        Collections.singletonList("pass") : Collections.singletonList("unPass");
            }
        });

        DataStream<Score> passStream = splitStream.select("pass");
        DataStream<Score> unPassStream = splitStream.select("unPass");

        // 2、union 联合多条流

        DataStream<Score> allScoreStream = passStream.union(unPassStream);
        allScoreStream.print("allScoreStream");
        env.execute();
    }

在这里插入图片描述

支持的数据类型

       Flink流应用程序处理的是以数据对象表示的事件流。在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送他们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

       Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda 函数或范型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

       Flink 支持 Java 和 Scala 中常见的数据类型,使用最广泛的类型有以下几种。

  • 基础数据类型

       Flink支持所有的JavaScala基础数据类型,Int、Double、Long、String。

  • Java 和 Scala 元组(Tuples
  • Scala 样例类(case classes)
  • Java 简单对象(POJOs)
  • 其他(Arrays、List、Maps、Enums)等

UDF 函数(更细粒度的控制流)

函数类(Function Classes)

       Flink 暴露了所有 udf 函数的接口(实现方式为接口或抽象类)。例如MapFunctionFilterFunctionProcessFunction 等等。

       下面例子实现了FilterFunction接口:

	DataStream<String> filterStream = dataStream.filter(new FlinkFilter());

    public static class FlinkFilter implements FilterFunction<String> {
        @Override
        public boolean filter(String value) throws Exception {
            return value.contains("flink");
        }
    }

       也可以将函数实现为匿名类:

   DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
       @Override
       public boolean filter(String value) throws Exception {
           return value.contains("flink");
       }
   });

匿名函数(Lambda Functions)

DataStream<String> filterStream = dataStream.filter(value -> value.contains("flink"));

富函数(Rich Functions)

       “富函数”是DataStream API提供的一个函数类的接口,所有 Flink 函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction

       Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open() 方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open 会被调用。
  • close() 方法是生命周期中的最后一个调用的方法,做一些清理工作。
  • getRuntimeContext() 方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。
public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成 Score 类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        DataStream<Tuple2<String, Integer>> resultStream = dataStream.map(new MyMapper());

        resultStream.print();

        env.execute();
    }

    public static class MyMapper extends RichMapFunction<Score, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(Score score) throws Exception {
            return new Tuple2<>(score.getCourse(), getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化工作,一般是定义状态,或者建立数据库连接
            System.out.println("open");
        }

        @Override
        public void close() throws Exception {
            // 一般是关闭连接
            System.out.println("close");
        }
    }

在这里插入图片描述

数据重分区

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        inputStream.print("input");

        // 1. shuffle
        DataStream<String> shuffleStream = inputStream.shuffle();
        shuffleStream.print("shuffleStream");

        // 2. keyBy
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });
        dataStream.keyBy("course").print("keyBy");

        // 3、global
        inputStream.global().print("global");

        env.execute();
    }

在这里插入图片描述

4. Sink

       Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。所有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxx))

       官方提供了一部分的框架的 sink。除此以外,需要用户自定义 sink。

在这里插入图片描述

在这里插入图片描述

kafka

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
     <version>1.10.1</version>
 </dependency>
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从kafka中读取数据
        DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<String>(
                "score_consume", new SimpleStringSchema(), properties));

        // 转换成SensorReading类型
        DataStream<String> dataStream = inputStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] array = value.split(",");
                System.out.println(new Score(array[0], array[1], Integer.parseInt(array[2])).toString());
                return new Score(array[0], array[1], Integer.parseInt(array[2])).toString();
            }
        });

        // 输出到 kafka
        dataStream.addSink(new FlinkKafkaProducer011<String>(
                "localhost:9092", "score_produce", new SimpleStringSchema()
        ));

        env.execute();
    }

在这里插入图片描述
在这里插入图片描述

Redis

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成 Score 类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        // 定义 Jedis 连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();

        dataStream.addSink(new RedisSink<Score>(config, new MyRedisMapper()));

        env.execute();
    }

    // 自定义 RedisMapper
    public static class MyRedisMapper implements RedisMapper<Score> {
        // 定义保存数据到 Redis 的命令,存成Hash表,hset scores course student-score
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(
                    RedisCommand.HSET, "scores"
            );
        }

        @Override
        public String getKeyFromData(Score score) {
            return score.getCourse();
        }

        @Override
        public String getValueFromData(Score score) {
            return score.getStudent() + "-" + score.getScore();
        }
    }

在这里插入图片描述

JDBC 自定义 Sink

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.29</version>
</dependency>
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件中读取数据
        DataStream<String> inputStream = env.readTextFile("test.txt");

        // 转换成 Score 类型
        DataStream<Score> dataStream = inputStream.map(new MapFunction<String, Score>() {
            @Override
            public Score map(String value) throws Exception {
                String[] array = value.split(",");
                return new Score(array[0], array[1], Integer.parseInt(array[2]));
            }
        });

        dataStream.addSink(new MyJdbcSink());

        env.execute();
    }

    // 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction<Score> {
        Connection connection = null;
        PreparedStatement insertStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/test", "root", "111111");
            insertStmt = connection.prepareStatement("insert into t_score (course, student, score) value (?, ?, ?)");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(Score score, Context context) throws Exception {
            insertStmt.setString(1, score.getCourse());
            insertStmt.setString(2, score.getStudent());
            insertStmt.setInt(3, score.getScore());
            insertStmt.execute();
        }

        @Override
        public void close() throws Exception {
            insertStmt = null;
            connection.close();
        }
    }
这篇关于Flink 流处理 API的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!