本文介绍了Kafka消息队列入门的相关知识,包括Kafka的基本概念、主要特性、应用场景及与其他消息队列的比较。文章详细讲解了Kafka的安装配置、生产者与消费者的基本使用方法以及一些实战操作技巧。全文内容丰富,适合新手快速了解和掌握Kafka消息队列的使用。
Apache Kafka 是一个分布式的、可扩展的、高吞吐量的消息系统。它最初由LinkedIn开发,并捐赠给Apache软件基金会。Kafka 被设计用于处理实时数据流,它具有非常高的并发量和数据吞吐量,可以作为消息中间件支持实时的数据管道。
Kafka 的应用场景丰富多样,包括但不限于:
Kafka 主要由以下组件构成:
Kafka 与其他消息队列(如 RabbitMQ、ActiveMQ)相比,具有以下优势:
Kafka 的安装和配置相对简单,以下是安装步骤:
config/server.properties
文件,设置 Kafka 的相关配置。bin/kafka-server-start.sh
启动 Kafka 服务。示例配置文件 server.properties
:
# Kafka Server Configuration broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kafka-logs zookeeper.connect=localhost:2181
示例启动 Kafka 服务:
# 启动 Kafka 服务器 bin/kafka-server-start.sh config/server.properties
Kafka 中的 Topic 创建和管理非常简单,以下是示例:
kafka-topics.sh
创建 Topic。bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
kafka-topics.sh
查看已创建的 Topic。bin/kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh
查看 Topic 的详细信息。bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
生产者负责将消息发送到指定的 Topic。以下是使用 Java API 发送消息的示例代码:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key_" + i, "value_" + i); producer.send(record); } producer.close(); } }
from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i in range(10): producer.send('my_topic', value={"key": f"key_{i}", "value": f"value_{i}"}) producer.flush() producer.close()
#include <librdkafka/rdkafka.h> #include <iostream> int main() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_bootstrap_servers(conf, "localhost:9092"); rd_kafka_conf_set_rebalance_cb(conf, NULL); // No rebalance callback needed for this example rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL); rd_kafka_conf_destroy(conf); rd_kafka_topic_t *topic = rd_kafka_topic_new(rk, "my_topic", NULL); for (int i = 0; i < 10; i++) { std::string key = "key_" + std::to_string(i); std::string value = "value_" + std::to_string(i); rd_kafka_produce(topic, RD_KAFKA_PRODUCER, RD_KAFKA_MSG_F_COPY, key.c_str(), key.size(), value.c_str(), value.size(), NULL, 0, 1000, NULL); } rd_kafka_poll(rk, 0); rd_kafka_destroy(rk); return 0; }
消费者负责从 Topic 中消费消息。以下是使用 Java API 消费消息的示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } consumer.close(); } }
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=True, group_id='test', value_deserializer=lambda x: x.decode('utf-8'), key_deserializer=lambda x: x.decode('utf-8')) for message in consumer: print(f"offset = {message.offset}, key = {message.key}, value = {message.value}")
#include <librdkafka/rdkafka.h> #include <iostream> int main() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_bootstrap_servers(conf, "localhost:9092"); rd_kafka_conf_set_rebalance_cb(conf, NULL); // No rebalance callback needed for this example rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL); rd_kafka_conf_destroy(conf); rd_kafka_topic_t *topic = rd_kafka_topic_new(rk, "my_topic", NULL); rd_kafka_consumer_poll(rk, 0); while (true) { rd_kafka_consume_start(rk, topic, RD_KAFKA_OFFSET_STORED); rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000); if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) { std::cout << "offset = " << msg->offset << ", key = " << std::string(msg->key, msg->key_len) << ", value = " << std::string(msg->payload, msg->len) << std::endl; rd_kafka_consume_stop(rk, topic); } else if (msg->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) { continue; } rd_kafka_consume_stop(rk, topic); } rd_kafka_destroy(rk); return 0; }
发送和接收消息是 Kafka 的基本操作,以下是示例代码:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SendMessages { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key_" + i, "value_" + i); producer.send(record); } producer.close(); } }
from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i in range(10): producer.send('my_topic', value={"key": f"key_{i}", "value": f"value_{i}"}) producer.flush() producer.close()
#include <librdkafka/rdkafka.h> #include <iostream> int main() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_bootstrap_servers(conf, "localhost:9092"); rd_kafka_conf_set_rebalance_cb(conf, NULL); // No rebalance callback needed for this example rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL); rd_kafka_conf_destroy(conf); rd_kafka_topic_t *topic = rd_kafka_topic_new(rk, "my_topic", NULL); for (int i = 0; i < 10; i++) { std::string key = "key_" + std::to_string(i); std::string value = "value_" + std::to_string(i); rd_kafka_produce(topic, RD_KAFKA_PRODUCER, RD_KAFKA_MSG_F_COPY, key.c_str(), key.size(), value.c_str(), value.size(), NULL, 0, 1000, NULL); } rd_kafka_poll(rk, 0); rd_kafka_destroy(rk); return 0; }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ReceiveMessages { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } consumer.close(); } }
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=True, group_id='test', value_deserializer=lambda x: x.decode('utf-8'), key_deserializer=lambda x: x.decode('utf-8')) for message in consumer: print(f"offset = {message.offset}, key = {message.key}, value = {message.value}")
#include <librdkafka/rdkafka.h> #include <iostream> int main() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_bootstrap_servers(conf, "localhost:9092"); rd_kafka_conf_set_rebalance_cb(conf, NULL); // No rebalance callback needed for this example rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL); rd_kafka_conf_destroy(conf); rd_kafka_topic_t *topic = rd_kafka_topic_new(rk, "my_topic", NULL); rd_kafka_consumer_poll(rk, 0); while (true) { rd_kafka_consume_start(rk, topic, RD_KAFKA_OFFSET_STORED); rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000); if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) { std::cout << "offset = " << msg->offset << ", key = " << std::string(msg->key, msg->key_len) << ", value = " << std::string(msg->payload, msg->len) << std::endl; rd_kafka_consume_stop(rk, topic); } else if (msg->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) { continue; } rd_kafka_consume_stop(rk, topic); } rd_kafka_destroy(rk); return 0; }
Kafka 支持消息持久化,确保消息不会丢失。分区设置可以提高消息的分布和负载均衡。
持久化通过设置 Topic 的 log.retention.hours
参数来控制。示例配置:
# Kafka Server Configuration log.retention.hours=24
分区设置通过 kafka-topics.sh
命令进行。示例命令:
bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
消费者组可以确保消息被消费一次且仅消费一次。以下是示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerGroupExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } consumer.close(); } }
from kafka import KafkaConsumer consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=True, group_id='test', value_deserializer=lambda x: x.decode('utf-8'), key_deserializer=lambda x: x.decode('utf-8')) for message in consumer: print(f"offset = {message.offset}, key = {message.key}, value = {message.value}")
#include <librdkafka/rdkafka.h> #include <iostream> int main() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_bootstrap_servers(conf, "localhost:9092"); rd_kafka_conf_set_rebalance_cb(conf, NULL); // No rebalance callback needed for this example rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL); rd_kafka_conf_destroy(conf); rd_kafka_topic_t *topic = rd_kafka_topic_new(rk, "my_topic", NULL); rd_kafka_consumer_poll(rk, 0); while (true) { rd_kafka_consume_start(rk, topic, RD_KAFKA_OFFSET_STORED); rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000); if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) { std::cout << "offset = " << msg->offset << ", key = " << std::string(msg->key, msg->key_len) << ", value = " << std::string(msg->payload, msg->len) << std::endl; rd_kafka_consume_stop(rk, topic); } else if (msg->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) { continue; } rd_kafka_consume_stop(rk, topic); } rd_kafka_destroy(rk); return 0; }
Kafka 提供了多种方式来控制消费者组,例如使用 kafka-consumer-groups.sh
命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
幂等性确保消息被消费者组消费一次且仅消费一次,偏移量提交确保消费者可以精确地从上次消费的位置继续消费。以下是示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class IdempotentConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.idempotence", "true"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } consumer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class OffsetCommitExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } consumer.close(); } }
消息重试与死信队列可以处理消息处理失败的情况。以下是示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class RetryExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } catch (Exception e) { System.err.println("Error processing message. Retrying..."); consumer.seek(record); } } consumer.commitSync(); } consumer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class DeadLetterQueueExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } catch (Exception e) { System.err.println("Error processing message. Sending to DLQ..."); // Send to Dead Letter Queue } } consumer.commitSync(); } consumer.close(); } }
Kafka 支持消息事务,确保消息的一致性。以下是示例代码:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.TransactionManager; import java.util.Properties; public class TransactionExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transaction.timeout.ms", 60000); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); TransactionManager transactionManager = producer.beginTransactionManager(); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key_" + i, "value_" + i); producer.send(record); } transactionManager.commitTransaction(); producer.close(); } }
消费者组偏移量提交策略可以控制消费者如何维护和提交偏移量。以下是示例代码:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class AutoCommitOffsetExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "true"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } consumer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ManualCommitOffsetExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my_topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } consumer.close(); } }
Kafka 在运行过程中可能会遇到一些常见的错误和异常,以下是常见的错误及其解决方案:
bootstrap.servers
和 zookeeper.connect
参数是否正确。log.retention.hours
参数是否设置过短,确保消息不会被过早删除。group.id
参数是唯一的,并且没有其他消费者组已使用该 ID。import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class HandleConnectionFailure { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value"); producer.send(record); } catch (Exception e) { System.err.println("Connection failed: " + e.getMessage()); } finally { if (producer != null) { producer.close(); } } } }
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class EnableCompression { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("compression.type", "gzip"); // 启用 gzip 压缩 KafkaProducer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value"); producer.send(record); producer.close(); } }
Kafka 配置文件中的参数非常多,以下是一些常见的配置参数:
Kafka 的官方文档非常全面,包含了从入门到高级配置的所有内容。官方社区活跃,提供了大量的技术支持和经验分享。以下是访问 Kafka 官方文档和社区的链接:
Kafka 可以与多种技术栈集成,形成更强大的实时数据处理系统。以下是 Kafka 与一些常见技术栈的集成示例: