转载请注明出处:
kafka进行消息生产发送代码示例:
public class KafkaProducerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static Properties initConfig() ( Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties. put ("client. id", "producer. client. id. demo"); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String,String> record = new ProducerRecord<> (topic, "hello, Kafka1 "); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } } }
构建的消息对象ProducerRecord, 它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个value属性,比如"Hello, Kafka!"只是ProducerRecord对象中的一个属性。 ProducerRecord类的定义如下:
public class ProducerRecord<K, V> { private final String topic; //主题 private final Integer partition; //分区号 private final Headers headers; //消息头部 private final K key; //键 private final V value; //值 private final Long timestamp; //消息的时间戳 //省略其他成员方法和构造方法 }
其中topic和 partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,它大多用来设定 一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键, 它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。
key可以让消息再进行二次归类, 同 一个key的消息会被划分到同 一个分区中, 有key的消息还可以支持日志压缩的功能,value是指消息体,一般不为空,如果为空则表示特定的消息 一墓碑消息;timestamp是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间.
KafkaProducer是线程安全的, 可以在多个线程中共享单个KafkaProducer实例,也 可以将KafkaProducer实例进行池化来供其他线程调用。
发送消息主要有三种模式: 发后即忘(fire-and-forget)、同步(sync)及异步Casync)。
发后即忘,它只管往Kafka中发送消息而并不关心消息是否正确到达。 在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。 这种发送方式的性能最高, 可靠性也最差。
KafkaProducer的 send()方法并非是void类型, 而是Future<RecordMetadata>类型, send()方法有2个重载方法,具体定义如下:
public Future<RecordMetadata> send(ProducerRecord<K, V> record) public Future<RecordMetadata> send(ProducerRecord<K, V> record,Callback callback)
实现同步的发送方式, 可以利用返回的 Future 对象实现:
try { producer.send(record) .get(); } catch (ExecutionException I InterruptedException e) { e.printStackTrace(); }
send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。 示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待Kaflca的响应,直到消息发送成功, 或者发生异常。 如果发生异常,那么就需要捕获异常并交由外层逻辑处理。
try { Future<RecordMetadata> future = producer.send{record); RecordMetadata metadata= future.get(); System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset()); } catch (ExecutionException I InterruptedException e) { e.printStackTrace () ; }
这样可以获取一个RecordMetadata对象, 在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、 时间戳等。
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。 而在对侧, 消费者需要用反序列化器(Deserializer)把从Kafka 中收到的字节数组转换成相应的对象。
为 了方便, 消息的key和value都使用了字符串, 对应程序中的序列化器也使用了客户端自带的org.apache.kafka. common. serialization. StringSerializer, 除了用于String 类型的序列化器,还有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long这几种类型, 它们都实现了org.apache.kafka. common. serialization. Serializer接口
消息在通过send( )方法发往broker 的过程中,有可能需要经过拦截器(Interceptor)、 序列化器(Serializer)和分区器(Parttitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的, 而序列化器是必需的。 消息 经过 序列化 之后就需要确定它发往的分区 ,如果消息ProducerRecord中指定了 partitition字段, 那么就不需要分区器的作用, 因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord中没有 指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。 分区器的作用 就是为消息 分配分区。
Kafka 中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它实现了org.apache.kafka.clients.producer.Partitioner 接口, 这个接口中定义了2个方法, 具体如下所示。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); public void close();
其中 partition()方法用来计算分区号,返回值为 int 类型。partition()方法中的参数分别表示主题 、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。 close()方法在关闭分区器的时候用来回收一些资源 。
默认的分区器会对key 进行哈希(采用MurmurHash2 算法 ,具备高运算性能及低碰撞率),最终根据得到 的 哈希值来计算分区号, 拥有相同 key 的消息会被写入同一个分区 。 如果 key 为 null ,那么消息将会以轮询的方式发往主题内的各个可用分区。
生产者拦截器既可以用 来在消息发送前做一些准备工作 ,比如按照某个规则过滤不符合要求的消 息、修改消 息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器 的 使用 也 很方便,主要是自定义实现org .apache.kafka. clients. producer.Producerlnterceptor 接口。ProducerInterceptor 接 口中包含 3 个方法 :
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Excepti on exception ); public void close() ;
KafkaProducer 在将消息序列化和计算分区之前会调用 生产者拦截器 的 onSend()方法来对消息进行相应 的定制化操作。KafkaProducer 会在消息被应答( Acknowledgement )之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback 之前执行。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 (发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator,也称为消息收集器〉中。Sender 线程负责从RecordAccumulator 中 获取消息并将其发送到 Kafka 中 。
RecordAccumulator 主要用来缓存消息 以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 。RecordAccumulator 缓存的大 小可以通过生产者客户端参数buffer. memory 配置,默认值为 33554432B ,即32MB 。 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer 的 send()方法调用要么被阻塞,要么抛出异常,这个取决于参数 max. block . ms 的配置,此参数的默认值为 6 0000,即 60 秒 。
Sender 从RecordAccumulator 中 获取缓存的消息之后,会进一 步将原本<分区,Deque<Producer Batch>>的保存形式转变成<Node , List< ProducerBatch>的形式,其中 Node 表示 Kafka集群的 broker 节点 。对于网络连接来说,生产者客户端是与具体 的 broker 节点建立的连接,也就是 向具体的broker 节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言 ,我们只 关注向哪个分区中发送哪些消息,所 以在这里需要做一个应用逻辑层面到网络 1/0 层面的转换。
元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在 AR 、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
1.acks
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。 acks 参数有 3 种类型的值(都是字符串类型)。
acks =1 : 默认值即为l 。生产者发送消息之后,只要分区的leader 副本成功写入消息,那么它就会收到来自服务端的成功响应 。 如果消息无法写入 leader 副本,比如在leader 副本崩溃、重新选举新的leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息 。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息 。 acks 设置为l ,是消息可靠性和吞吐量之间的折中方案。
acks = 0 :生产者发送消 息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。
acks =- l 或 acks =all : 生产者在消 息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all )可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks= l 的情况。
2.max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B ,即lMB 。一般情况下,这个默认值就可以满足大多数的应用场景了。
3.retries 和 retry. backoff.ms
retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于 0 的值,以此通过 内 部重试来恢复而不是一昧地将异常抛给生产者的应用程序。 如果重试达到设定的次数 ,那么生产者就会放弃重试并返回异常。
不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。 重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100 ,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试 。
4.compression.type
这个参数用来指定消息的压缩方式,默认值为“ none ”,即默认情况下,消息不会被压缩。该参数还可以配置为“ gzip ”,“ snappy ” 和“ lz4 ”。 对消息进行压缩可以极大地减少网络传输量 、降低网络 IO ,从而提高整体的性能 。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩 。
5. request.timeout.ms
这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 3 0000( ms )。请求超时之后可以选择进行重试。注意这个参数需要 比 broker 端参数 replica.lag.time.max.ms 的值要大 ,这样可以减少因客户端重试而引起的消息重复的概率。