Join
/** * * 将两个数据流,进行join * * 如果让两个流能够join上,必须满足以下两个条件 * 1.由于数据是分散在多台机器上,必须将join条件相同的数据通过网络传输到同一台机器的同一个分区中(按照条件进行KeyBy) * 2.让每个流中的数据都放慢,等等对方(划分相同类型,长度一样的窗口) * */ public class EventTumblingWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1000,o001,c001 DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777); //1200,c001,图书 DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888); //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API //提取两个流的Watermark SingleOutputStreamOperator<String> lines1WithWatermark = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); SingleOutputStreamOperator<String> lines2WithWatermark = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); //对两个流进行处理 SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1 = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2 = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); //将两个流join DataStream<Tuple5<Long, String, String, Long, String>> result = tpStream1.join(tpStream2) .where(tp1 -> tp1.f2) //第一个流keyBY的字段 .equalTo(tp2 -> tp2.f1) //第二个流keyBy的字段 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //划分窗口 //全量聚合的处理逻辑 .apply(new JoinFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() { //窗口触发后,条件相同的,并且在同一个窗口内的数据,会传入到join方法中 @Override public Tuple5<Long, String, String, Long, String> join(Tuple3<Long, String, String> first, Tuple3<Long, String, String> second) throws Exception { return Tuple5.of(first.f0,first.f1,first.f2,second.f0,second.f2); } }); result.print(); env.execute(); } }
LeftOuterJoin
/** * 将两个数据流,实现LeftOuterJoin * * 如果让两个流能够join上,必须满足以下两个条件 * 1.由于数据是分散在多台机器上,必须将join条件相同的数据通过网络传输到同一台机器的同一个分区中(按照条件进行KeyBy) * 2.让每个流中的数据都放慢,等等对方(划分相同类型,长度一样的窗口) * */ public class EventTumblingWindowLeftOuterJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1000,o001,c001 DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777); //1200,c001,图书 DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888); //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API //提取两个流的Watermark SingleOutputStreamOperator<String> lines1WithWatermark = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); SingleOutputStreamOperator<String> lines2WithWatermark = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); //对两个流进行处理 SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1 = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2 = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); //将两个流leftOuterJoin DataStream<Tuple5<Long, String, String, Long, String>> result = tpStream1.coGroup(tpStream2) .where(tp1 -> tp1.f2) //第一个流keyBy的字段 .equalTo(tp2 -> tp2.f1)//第二个流keyBy的字段 .window(TumblingEventTimeWindows.of(Time.seconds(5)))//划分窗口 .apply(new CoGroupFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() { /** * coGroup当窗口触发后,每个key会调用一次coGroup * 三种情况会调用coGroup方法 * 1.第一个流和第二个流中,都有key相同的数据数据,并且在同一个窗口呢,那么coGroup方法中的两个Iterable都不为empty * 2.第一个流中出现了同一个key的数据,.第二个流中没有出现相同key的数据,那么coGroup方法中的第一个Iterable不为empty,第二个为empty * 3.第二个流中出现了同一个key的数据,.第一个流中没有出现相同key的数据,那么coGroup方法中的第二个Iterable不为empty,第一个为empty * @param first * @param second * @param out * @throws Exception */ @Override public void coGroup(Iterable<Tuple3<Long, String, String>> first, Iterable<Tuple3<Long, String, String>> second, Collector<Tuple5<Long, String, String, Long, String>> out) throws Exception { for (Tuple3<Long, String, String> left : first) { //实现左外连接 //先循环左流的数据 boolean isEmpty = false; for (Tuple3<Long, String, String> right : second) { isEmpty = true; out.collect(Tuple5.of(left.f0, left.f1, left.f2, right.f0, right.f2)); } if (!isEmpty) { out.collect(Tuple5.of(left.f0, left.f1, left.f2, null, null)); } } } }); result.print(); env.execute(); } }
intervalJoin
/** * 将两个数据流不划分窗口,按照时间范围进行join,即intervalJoin * * 以第一个流中的数据为标准进行比较时间 * * 实现步骤: * 1.分别将两个流按照相同的条件进行KeyBy(可以保证key等值的数据一定进入到同一台机器的同一个分区中) * 2.将两个数据流的数据缓存到KeyedState,然后将两个流Connected到一起(可以共享状态) * */ public class EventTumblingWindowIntervalJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1000,o001,c001 DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777); //1200,c001,图书 DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888); //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API //提取两个流的Watermark SingleOutputStreamOperator<String> lines1WithWatermark = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); SingleOutputStreamOperator<String> lines2WithWatermark = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); //对两个流进行处理 SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1 = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2 = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() { @Override public Tuple3<Long, String, String> map(String input) throws Exception { String[] fields = input.split(","); return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]); } }); //将两个流join KeyedStream<Tuple3<Long, String, String>, String> keyedStream1 = tpStream1.keyBy(tp -> tp.f2); KeyedStream<Tuple3<Long, String, String>, String> keyedStream2 = tpStream2.keyBy(tp -> tp.f1); SingleOutputStreamOperator<Tuple5<Long, String, String, Long, String>> result = keyedStream1.intervalJoin(keyedStream2) .between(Time.seconds(-1), Time.seconds(1)) //指定的时间范围 .upperBoundExclusive() //不包括上界 .process(new ProcessJoinFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() { @Override public void processElement(Tuple3<Long, String, String> left, Tuple3<Long, String, String> right, Context ctx, Collector<Tuple5<Long, String, String, Long, String>> out) throws Exception { out.collect(Tuple5.of(left.f0,left.f1,left.f2,right.f0,right.f2)); } }); result.print(); env.execute(); } }