Apache Flink是一个在无界和有界数据流上进行有状态计算的框架。Flink提供了不同抽象级别的多个API,并为常见用例提供了专用库。
在这里,我们介绍Flink易于使用且富有表现力的API和库。
流处理框架可以构建和执行的应用程序类型取决于该框架对流、状态和时间的控制程度。在下面,我们将描述流处理应用程序的这些构建块,并解释Flink处理它们的方法。
显然,流是流处理的一个基本方面。然而,流可以具有不同的特性,这些特性会影响流的处理方式。Flink是一个通用的处理框架,可以处理任何类型的流。
每个非平凡的流应用程序都是有状态的,也就是说,只有对单个事件应用转换的应用程序不需要状态。任何运行基本业务逻辑的应用程序都需要记住事件或中间结果,以便在稍后的时间点访问它们,例如,当接收到下一个事件时或在特定的持续时间之后。
应用状态是Flink中最重要的一个特征。通过查看Flink在状态处理上下文中提供的所有特性,可以看出这一点。
时间是流媒体应用程序的另一个重要组成部分。大多数事件流都有内在的时间语义,因为每个事件都是在特定的时间点生成的。此外,许多常见的流计算都是基于时间的,例如windows聚合、会话、模式检测和基于时间的连接。流处理的一个重要方面是应用程序如何测量时间,即事件时间和处理时间的差异。
Flink提供了一系列丰富的与时间相关的功能。
事件时间模式:使用事件时间语义处理流的应用程序根据事件的时间戳计算结果。因此,无论是处理记录的还是实时的事件,事件时间处理都允许获得准确且一致的结果。
水印支持:Flink在事件时间应用程序中使用水印来推理时间。水印也是一种灵活的机制,可以权衡结果的延迟和完整性。
延迟数据处理:在使用水印以事件时间模式处理流时,可能会发生在所有相关事件到达之前计算已经完成的情况。这种事件称为迟发事件。Flink提供了多个选项来处理延迟事件,例如通过侧输出重新路由事件,以及更新之前完成的结果。
处理时间模式:除了事件时间模式外,Flink还支持处理时间语义,该语义执行由处理器的挂钟时间触发的计算。处理时间模式可以适用于某些具有严格低延迟要求的应用程序,这些应用程序可以容忍近似的结果。
Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。
下文中,我们将简要描述每一种 API 及其应用,并提供相关的代码示例。
/** * 将相邻的 keyed START 和 END 事件相匹配并计算两者的时间间隔 * 输入数据为 Tuple2<String, String> 类型,第一个字段为 key 值, * 第二个字段标记 START 和 END 事件。 */public static class StartEndDuration extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> { private ValueState<Long> startTime; @Override public void open(Configuration conf) { // obtain state handle startTime = getRuntimeContext() .getState(new ValueStateDescriptor<Long>("startTime", Long.class)); } /** Called for each processed event. */ @Override public void processElement( Tuple2<String, String> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { switch (in.f1) { case "START": // set the start time if we receive a start event. startTime.update(ctx.timestamp()); // register a timer in four hours from the start event. ctx.timerService() .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000); break; case "END": // emit the duration between start and end event Long sTime = startTime.value(); if (sTime != null) { out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime)); // clear the state startTime.clear(); } default: // do nothing } } /** Called when a timer fires. */ @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) { // Timeout interval exceeded. Cleaning up the state. startTime.clear(); }}
这个例子充分展现了 KeyedProcessFunction 强大的表达力,也因此是一个实现相当复杂的接口。
DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()、reduce()、aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。
下面的代码示例展示了如何捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。
// 网站点击 Click 的数据流DataStream<Click> clicks = DataStream<Tuple2<String, Long>> result = clicks // 将网站点击映射为 (userId, 1) 以便计数 .map( // 实现 MapFunction 接口定义函数 new MapFunction<Click, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Click click) { return Tuple2.of(click.userId, 1L); } }) // 以 userId (field 0) 作为 key .keyBy(0) // 定义 30 分钟超时的会话窗口 .window(EventTimeSessionWindows.withGap(Time.minutes(30L))) // 对每个会话窗口的点击进行计数,使用 lambda 表达式定义 reduce 函数 .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。
Flink 的关系型 API 旨在简化数据分析、数据流水线和 ETL 应用的定义。
下面的代码示例展示了如何使用 SQL 语句查询捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。此示例与上述 DataStream API 中的示例有着相同的逻辑。
SELECT userId, COUNT(*)FROM clicksGROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
Flink 具有数个适用于常见数据处理应用场景的扩展库。这些库通常嵌入在 API 中,且并不完全独立于其它 API。它们也因此可以受益于 API 的所有特性,并与其他库集成。