1、flink和kafka交互有两种jar包:
通过maven仓库搜索
https://mvnrepository.com/search?q=org.apache.flink+++flink-connector-kafka-
老的jar包有三种:
0.11版本(只更新到1.11.x): <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.11.2</version> </dependency> 0.10版本(只更新到1.11.x): <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.12</artifactId> <version>1.11.2</version> </dependency> 0.9版本(只更新到1.10.x): <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.12</artifactId> <version>1.10.3</version> </dependency>
新版的jar包,相关服务做了统一处理。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.12.3</version> </dependency>
2、FlinkFixedPartitioner源码:
package org.apache.flink.streaming.connectors.kafka.partitioner; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.Preconditions; @PublicEvolving public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { private static final long serialVersionUID = -3785320239953858777L; private int parallelInstanceId; public FlinkFixedPartitioner() { } public void open(int parallelInstanceId, int parallelInstances) { Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); this.parallelInstanceId = parallelInstanceId; } public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty."); return partitions[this.parallelInstanceId % partitions.length]; } public boolean equals(Object o) { return this == o || o instanceof FlinkFixedPartitioner; } public int hashCode() { return FlinkFixedPartitioner.class.hashCode(); } }
sdd