关于Transform中的union操作:
对两个或者两个以上的相同类型的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
例如:将3个通类型的流进行合并
DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5); DataStreamSource<Integer> stream2 = env.fromElements(10, 20, 30, 40, 50); DataStreamSource<Integer> stream3 = env.fromElements(100, 200, 300, 400, 500); // 把多个流union在一起成为一个流, 这些流中存储的数据类型必须一样: 水乳交融 stream1 .union(stream2) .union(stream3)
刚才说到union在连接时有个限制就是必须要连接同种类型的流,这种情况有很大的局限性,所以Flink又为我们提供了另外一种连接的算子Connect,它可以连接不同类型的数据,但是Connect算子不可以向Union一样进行多流连接,它只能是2条流相连.
Connect算子和Union的区别最大在于:
如下示例,将第一条流中的数据进行二次平方,将第二条流中的每一条数据开头都加上hello
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.util.Collector; public class Flink05_Transform_Connect { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3); DataStreamSource<String> stringDataStreamSource = env.fromElements("Tom", "Jack", "Marry"); //分别执行map操作 SingleOutputStreamOperator<Integer> IntDS = integerDataStreamSource.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * value; } }); SingleOutputStreamOperator<String> StrDS = stringDataStreamSource.map(x -> "hello: " + x); //connect合流操作 ConnectedStreams<String, Integer> connectDS = StrDS.connect(IntDS); SingleOutputStreamOperator<Object> res = connectDS.map(new CoMapFunction<String, Integer, Object>() { //分别对两条流进行操作 @Override public Object map1(String value) throws Exception { return value; } @Override public Object map2(Integer value) throws Exception { return "二次平方后的结果为: "+ value*value; } }); res.print(); env.execute(); } }