Flink中的Window包含三部分:
1、Time Window----时间窗口
2、Session Window----会话窗口(待没有数据的时候开始计算)
3、Count Window----统计窗口(每n条数据计算一次)
时间窗口包含四部分:
TumblingProcessingTimeWindows:滚动的处理时间窗口 TumblingEventTimeWindows:滚动的事件时间窗口(需要设置时间字段和水位线) SlidingProcessingTimeWindows: 滑动的处理时间窗口(滑动窗口需要指定窗口大小和滑动时间) SlidingEventTimeWindows:滑动的事件时间窗口(滑动窗口需要指定窗口大小和滑动时间) 滚动:两个时间窗口之间没有交叉; 滑动:两个时间窗口之间有交叉
package com.shujia.flink.window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //读取socket数据 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //拆分、转成kv格式 val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)) /** * 滚动的处理时间窗口 * .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) * 简写: * .timeWindow(Time.seconds(5)) */ //将单词分组,添加时间、并统计数量,打印 kvDS.keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1) .print() env.execute() } }
滚动的事件时间窗口:需要设置时间字段和水位线
package com.shujia.flink.window import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //读取socket数据 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //拆分、转成kv格式 val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)) //设置时间字段, 水位线默认等于最新数据的时间戳,水位线只增加不减少 val assDS: DataStream[(String, Int)] = kvDS.assignTimestampsAndWatermarks( //执行水位线前移的时间 new BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(5)) { //指定时间戳字段 override def extractTimestamp(element: (String, Int)): Int = element._2 } ) //将单词分组,添加时间、并统计数量,打印 kvDS.keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(5)))//上面那一行是本行的简写 .sum(1) .print() env.execute() } }
滑动窗口需要指定窗口大小和滑动时间
package com.shujia.flink.window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //读取socket数据 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //拆分、转成kv格式 val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)) //将单词分组,添加时间、并统计数量,打印 kvDS.keyBy(_._1) .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5))) .sum(1) .print() env.execute() } }
待没有数据的时候开始计算,将前面的数据放到一个窗口中进行计算,每一个key是独立计时的
会话窗口包含两种:
ProcessingTimeSessionWindows: 处理时间的会话窗口 EventTimeSessionWindows: 事件时间的会话窗口(需要设置时间字段和水位线)
package com.shujia.flink.window import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time object Demo1TimeWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //读取socket数据 val linesDS: DataStream[String] = env.socketTextStream("master", 8888) //拆分、转成kv格式 val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)) //将单词分组,添加时间、并统计数量,打印 kvDS.keyBy(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) //当间隔5秒后,没有数据传入,那么开始计算 .sum(1) .print() env.execute() } }
需要设置时间字段和水位线
package com.shujia.flink.window import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows} import org.apache.flink.streaming.api.windowing.time.Time object Demo2SessionWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //当数据量比较小时,将并行度设置为1 env.setParallelism(1) //设置时间模式为事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val eventDS: DataStream[(String, Long)] = linesDS.map(line => { val split: Array[String] = line.split(",") (split(0), split(1).toLong) }) //设置水位线和时间字段 val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks( //执行水位线前移的时间 new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) { //指定时间戳字段 override def extractTimestamp(element: (String, Long)): Long = element._2 } ) assDS .map(kv => (kv._1, 1)) .keyBy(_._1) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .sum(1) .print() env.execute() } }
package com.shujia.flink.window import org.apache.flink.streaming.api.scala._ object Demo3CountWindow { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)) /** * 滚动的统计窗口 * 滑动的统计窗口 * */ kvDS .keyBy(_._1) .countWindow(10)//滚动的统计窗口---每隔10条数据计算一次 .countWindow(10, 2) //每隔两条数据将最近的10条数据放到一个窗口中进行计算 .sum(1) .print() env.execute() } }