Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据。该预定义的数据接收器支持写入文件和标准输入输出及socket。
连接器提供用于与各种第三方系统连接的代码。目前支持这些系统:
要在应用程序中使用其中一个连接器,通常需要其他第三方组件,例如数据存储或消息队列的服务器。
虽然本节中列出的流连接器是Flink项目的一部分,并且包含在源版本中,但它们不包含在二进制分发版中。
Flink的其他流处理连接器正在通过Apache Bahir发布,包括:
使用连接器不是将数据输入和输出Flink的唯一方法。一种常见的模式是在一个Map或多个FlatMap 中查询外部数据库或Web服务以渲染主数据流。
Flink提供了一个用于异步I / O的API, 以便更有效,更稳健地进行这种渲染。
当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。如果所涉及的数据具有比写入更少的读取,则更好的方法可以是外部应用程序从Flink获取所需的数据。在可查询的状态界面,允许通过Flink被管理的状态,按需要查询支持这个。
2 HDFS连接器此连接器提供一个Sink,可将分区文件写入任一Hadoop文件系统支持的文件系统 。
请注意,流连接器当前不是二进制发布的一部分
可以配置分段行为以及写入,但我们稍后会介绍。这是可以创建一个默认情况下汇总到按时间拆分的滚动文件的存储槽的方法
唯一必需的参数是存储桶的基本路径。可以通过指定自定义bucketer,写入器和批量大小来进一步配置接收器。
默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式"yyyy-MM-dd–HH"命名存储区。这种模式传递给 DateTimeFormatter使用当前系统时间和JVM的默认时区来形成存储桶路径。用户还可以为bucketer指定时区以格式化存储桶路径。每当遇到新日期时,都会创建一个新存储桶。
例如,如果有一个包含分钟作为最精细粒度的模式,将每分钟获得一个新桶。每个存储桶本身都是一个包含多个部分文件的目录:接收器的每个并行实例将创建自己的部件文件,当部件文件变得太大时,接收器也会在其他文件旁边创建新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。如果存储桶最近未写入,则视为非活动状态。默认情况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一个BucketingSink。
也可以通过指定自定义bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用数据元或元组的属性来确定bucket目录。
默认编写器是StringWriter。这将调用toString()传入的数据元并将它们写入部分文件,由换行符分隔。在a setWriter() 上指定自定义编写器使用BucketingSink。如果要编写Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置为使用压缩。
有两个配置选项指定何时应关闭零件文件并启动新零件文件:
Long.MAX_VALUE
)当满足这两个条件中的任何一个时,将启动新的部分文件。看如下例子:
Java
Scala
这将创建一个接收器,该接收器将写入遵循此模式的存储桶文件:
Java
生成结果
date-time是我们从日期/时间格式获取的字符串
parallel-task是并行接收器实例的索引
count是由于批处理大小或批处理翻转间隔而创建的部分文件的运行数
然而这种方式创建了太多小文件,不适合HDFS!仅供娱乐!
3 Apache Kafka连接器此连接器提供对Apache Kafka服务的事件流的访问。
Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。
为用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08(部分flink-connector-kafka)是合适的。
然后,导入maven项目中的连接器:
环境配置参考
假设你刚刚开始并且没有现有的Kafka或ZooKeeper数据
由于Kafka控制台脚本对于基于Unix和Windows的平台不同,因此在Windows平台上使用bin \ windows \而不是bin /,并将脚本扩展名更改为.bat。
Kafka使用ZooKeeper,因此如果还没有ZooKeeper服务器,则需要先启动它。
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。 默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中键入一些消息以发送到服务器。
Kafka还有一个命令行使用者,它会将消息转储到标准输出。
从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主要版本。 相反,它在Flink发布时跟踪最新版本的Kafka。
如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。 如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。
通过Kafka客户端API和代理的兼容性保证,通用Kafka连接器与较旧和较新的Kafka代理兼容。 它与版本0.11.0或更高版本兼容,具体取决于所使用的功能。
要执行迁移,请参阅升级作业和Flink版本指南和
Flink Kafka Consumer参与了检查点,并保证在故障期间没有数据丢失,并且计算处理元素“恰好一次”。(注意:这些保证自然会假设Kafka本身不会丢失任何数据。)
请注意,Flink在内部将偏移量作为其分布式检查点的一部分进行快照。 承诺给Kafka的抵消只是为了使外部的进展观与Flink对进展的看法同步。 这样,监控和其他工作可以了解Flink Kafka消费者在多大程度上消耗了一个主题。
和接收器(FlinkKafkaProducer)。
除了从模块和类名中删除特定的Kafka版本之外,API向后兼容Kafka 0.11连接器。
Flink的Kafka消费者被称为FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。
构造函数接受以下参数:
Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。
在 DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message) 为每个Kafka消息调用该方法,从Kafka传递值。
从它开始通常很有帮助AbstractDeserializationSchema,它负责将生成的Java / Scala类型描述为Flink的类型系统。实现vanilla的用户DeserializationSchema需要自己实现该getProducedType(…)方法。
为了访问Kafka消息的键和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)
。
为方便起见,Flink提供以下模式:
要使用此反序列化模式,必须添加以下附加依赖项:
当遇到因任何原因无法反序列化的损坏消息时,有两个选项 - 从deserialize(…)方法中抛出异常将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者以静默方式跳过损坏的消息。请注意,由于使用者的容错能力(请参阅下面的部分以获取更多详细信息),因此对损坏的消息执行失败将使消费者尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。
Flink的Kafka Producer被称为FlinkKafkaProducer011(或010 对于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,对于Kafka>=1.0.0的版本来说)。
它允许将记录流写入一个或多个Kafka主题。
上面的示例演示了创建Flink Kafka Producer以将流写入单个Kafka目标主题的基本用法。对于更高级的用法,还有其他构造函数变体允许提供以下内容:
Flink Kafka Consumer允许配置如何确定Kafka分区的起始位置。
Flink Kafka Consumer的所有版本都具有上述明确的起始位置配置方法。
还可以指定消费者应从每个分区开始的确切偏移量:
上面的示例将使用者配置为从主题的分区0,1和2的指定偏移量开始myTopic。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到setStartFromGroupOffsets()该特定分区的默认组偏移行为(即)。
请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或检查点中的偏移量确定。
在0.9之前,Kafka没有提供任何机制来保证至少一次或恰好一次的语义。
启用Flink的检查点时,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次传输保证。
除了开启Flink的检查点,还应该配置setter方法:
总之,默认情况下,Kafka生成器对版本0.9和0.10具有至少一次保证,即
setLogFailureOnly设置为false和setFlushOnCheckpoint设置为true。
默认情况下,重试次数设置为“0”。这意味着当setLogFailuresOnly设置为时false,生产者会立即失败,包括Leader更改。
默认情况下,该值设置为“0”,以避免重试导致目标主题中出现重复消息。对于经常更改代理的大多数生产环境,建议将重试次数设置为更高的值。
Kafka目前没有生产者事务,因此Flink在Kafka主题里无法保证恰好一次交付
启用Flink的检查点后,FlinkKafkaProducer011
对于Kafka >= 1.0.0版本是FlinkKafkaProduce
可以提供准确的一次交付保证。
除了启用Flink的检查点,还可以通过将适当的语义参数传递给FlinkKafkaProducer011,选择三种不同的算子操作模式
Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。
Kafka broker默认 transaction.max.timeout.ms 设置为15分钟。此属性不允许为生产者设置大于其值的事务超时。
FlinkKafkaProducer011默认情况下,将transaction.timeout.msproducer config中的属性设置为1小时,因此transaction.max.timeout.ms在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 该属性。
在read_committed模式中KafkaConsumer,任何未完成的事务(既不中止也不完成)将阻止来自给定Kafka主题的所有读取超过任何未完成的事务。换言之,遵循以下事件顺序:
即使事务2已经提交了记录,在事务1提交或中止之前,消费者也不会看到它们。这有两个含义:
Semantic.EXACTLY_ONCE 模式为每个FlinkKafkaProducer011实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻碍消费者阅读Kafka主题的延迟事务,这是必要的。但是,如果Flink应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个检查点完成之前按比例缩小Flink应用程序是不安全的 FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR。
启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他 算子操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。
因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。
启用流式传输作业的检查点。 将定期快照流式数据流的分布式状态。 如果发生故障,流数据流将从最新完成的检查点重新启动。
该作业在给定的时间间隔内定期绘制检查点。 状态将存储在配置的状态后端。
此刻未正确支持检查点迭代流数据流。 如果“force”参数设置为true,则系统仍将执行作业。
要使用容错的Kafka使用者,需要在运行环境中启用拓扑的检查点:
另请注意,如果有足够的处理插槽可用于重新启动拓扑,则Flink只能重新启动拓扑。因此,如果拓扑由于丢失了TaskManager而失败,那么之后仍然必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。
如果未启用检查点,Kafka使用者将定期向Zookeeper提交偏移量。
参考Streaming Connectors
Kafka官方文档