我们在上一篇文章的基础上增加一个MapFunction的算子操作,并运行起来
@Test public void testFlinkHelloWorld2() throws Exception { DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080); SingleOutputStreamOperator<Long> map = lines.map(((line -> Long.parseLong(line)))); map.print(); streamExecutionEnvironment.execute(); }
该流式HelloWorld程序主要是从8080网络端口读取数据,并通过MapFunction算子操作把字符串数据转换成Long型数据,最后输出
我们看一下的算子流图长什么样子
可以看出,该算子流图与我们的代码一一对应,通过Source进行了Map的Transformation转换,然后通过Sink输出结果
那么Source、Transformation、Sink到底是什么?
正如上一篇文章说的Flink的应用程序编程无外呼三部曲
图片官网连接: https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/overview
简单来说就是通过Source、Transformation、Sink等算子表示的Transformation之间的转换,最终在API层面的表现就是DataStream之间的转换
那么我们具体看看Source、Transformation、Sink到底是什么
Source: 数据源,用于从集合或外部系统接入数据源,其本质是实现SourceFunction被Operator封装并组成的Transformation(源码)
Transformation: 数据转换的各种操作,可以将数据转换计算成你想要的数据,其本质是一系统被Operator封装的ProcessFunction并组成的Transformation(源码)
Sink: 接收器,将转换计算后的数据发送到控制台或输入到外部系统,其本质是实现SinkFunction被Operaotr封装并组成的Transformation(源码)
虽然在源码层面Source、Tranforamtion和Sink都是Transformation,但为了概念不混淆,对外声明上我们还是按照Souce、Transformation和Sink来加以区别,毕竟Source与Sink与普通的正常的Transformation实现还是有自己的不同之处
那么Flink的Source有哪些类型呢? 总体上可以分为内置的Source与自定义的Source,但本质上都是直接或间隔调用StreamExecutionEnvironment类的addSource方法、实现了SourceFunction并被Operator所封装再组成的Transformation、最后对外提供使用的DataStreamSource
内置的Flink的Source主要有:
1) fromElements(...): 来自元素集合 2) fromCollection(...): 来自集合数据 3) fromParallelCollection(...): 来自并行集合数据 4) readTextFile(...)/readFile(...)/readFileStream(...): 来自文件 5) socketTextStream(...): 来自文本网络流 6) createInput(...)/createFileInput(...): 来自支持InputSplit的InputFormat类型,例如flink-connector-jdbc的JdbcRowDataInputFormat
从并发的角度看,Flink的Source有两种实现方法,一种是不支持并发,例如socketTextStream,另一种是支持并发,例如Kafka的多分区形式,具体源码实现可以参考后文
Flink Source背后的Transformation主要是 LegacySourceTransformation 与 SourceTransformation
Flink的Transformation种类繁多,主要是基于DataStream或KeyedStream(本质也是DataStream但提供更丰富的keyBy后的算子操作类型)
具体有以下这些:
1) Map: 最简单的转换之一,一对一处理,输入是一条数据流数据,输出的也是一条数据流数据 2) FlatMap: 一对多处理,输入一条数据流数据,输出0条或多条数据流数据 3) Filter: 对数据流数据进行过滤操作,符合条件的数据流数据输向下游,不符合条件的则过滤条不给下游流处理 4) KeyBy: 基于 key 对流进行分区,分区操作有多种类型,分区函数也有多种 5) Reduce: 对数据流作聚合操作汇总出一条数据流数据,常用的 average/sum/min/max/count 等方法都可使用 reduce 可实现 6) Window: 允许按横切的时间或数据流条数对 KeyedStream 进行分组聚合统计并输出汇总数据流 7) WindowAll: 在非分区数据流上进行Window操作 8) Union: 将两个或多个数据流组合在一起再进行算子操作 9) Window Join: 根据 key 分区将同一个 window 的两个数据流 Join 起来 ...
具体链接可以参考官网: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview
Flink的Sink主要向外部系统输出数据,内置的Sink种类不多,都是通过DataStream操作并返回DataStreamSink,主要有:
1) print(): 最常用的,用于向控制台输出,一般用于单元测试或调试程序等 2) writeAsText(...): 写出成文本文件 3) writeAsCsv(...): 写出成Csv文件 4) writeToSocket(...): 向网络流输出数据 ...
Flink Sink背后的Transformation主要是 LegacySinkTransformation 与 SinkTransformation
新的HelloWorld的FlatMap流式应用程序如下:
@Test public void testFlinkHelloWorld3() throws Exception { DataStreamSource<String> lines = streamExecutionEnvironment.socketTextStream("localhost", 8080); SingleOutputStreamOperator<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { if (value.contains(",")) { String[] values = value.split(","); for (String temp : values) { out.collect(temp); } } else { out.collect(value); } } }); flatMap.print(); streamExecutionEnvironment.execute(); }
程序很简单,就是控制台中输入的字符串中如果含有逗号的,则拆分成多条字符串并向下游输出多条数据,如果没有则原封不动的向下游输出数据