消息队列MQ

flink sink到kafka 默认分区器 FlinkFixedPartitioner相关源码分析

本文主要是介绍flink sink到kafka 默认分区器 FlinkFixedPartitioner相关源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

 

1、flink和kafka交互有两种jar包:

通过maven仓库搜索

https://mvnrepository.com/search?q=org.apache.flink+++flink-connector-kafka-

 

   老的jar包有三种:

0.11版本(只更新到1.11.x):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.11.2</version>
</dependency>


0.10版本(只更新到1.11.x):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
    <version>1.11.2</version>
</dependency>


0.9版本(只更新到1.10.x):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
    <version>1.10.3</version>
</dependency>

 

 新版的jar包,相关服务做了统一处理。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.12.3</version>
</dependency>

 

2、FlinkFixedPartitioner源码:

package org.apache.flink.streaming.connectors.kafka.partitioner;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;

    public FlinkFixedPartitioner() {
    }

    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
        this.parallelInstanceId = parallelInstanceId;
    }

    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        return partitions[this.parallelInstanceId % partitions.length];
    }

    public boolean equals(Object o) {
        return this == o || o instanceof FlinkFixedPartitioner;
    }

    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

 

sdd

这篇关于flink sink到kafka 默认分区器 FlinkFixedPartitioner相关源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!