查看Kafka主题列表
$KAFKA_HOME/bin/kafka-topics.sh \ --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ --list
创建一个主题(Topic)
$KAFKA_HOME/bin/kafka-topics.sh \ --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ --create \ --replication-factor 3 \ --partitions 1 \ --topic TOPIC-01
参数说明:
--create 参数命令:创建 --replication-factor 指定副本数量(副本数小于等于集群数) --partitions 指定分区数量 --topic 指定主题名字
执行输出:
[root@centos7-02 ~]# $KAFKA_HOME/bin/kafka-topics.sh \ > --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ > --create \ > --replication-factor 3 \ > --partitions 1 \ > --topic TOPIC-01 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Created topic "TOPIC-01".
删除主题
$KAFKA_HOME/bin/kafka-topics.sh \ --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ --delete \ --topic TOPIC-01
向主题发送消息和消费消息
窗口1发送
# 发送消息 (终端占用) $KAFKA_HOME/bin/kafka-console-producer.sh \ --broker-list centos7-02:9092,centos7-03:9092,centos7-04:9092 \ --topic TOPIC-01
窗口2消费
# 消费消息 $KAFKA_HOME/bin/kafka-console-consumer.sh \ --bootstrap-server centos7-02:9092,centos7-03:9092,centos7-04:9092 \ --topic TOPIC-01
全部获取消费
# 消费消息 格式3 # --from-beginning:会把主题中以往所有的数据都读取出来。 $KAFKA_HOME/bin/kafka-console-consumer.sh \ --bootstrap-server centos7-02:9092,centos7-03:9092,centos7-04:9092 \ --from-beginning \ --topic TOPIC-01
查看具体主题的状态:
$KAFKA_HOME/bin/kafka-topics.sh \ --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ --describe \ --topic TOPIC-01
执行输出:
[root@centos7-02 ~]# $KAFKA_HOME/bin/kafka-topics.sh \ > --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ > --describe \ > --topic TOPIC-01 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Topic:TOPIC-01 PartitionCount:1 ReplicationFactor:3 Configs: Topic: TOPIC-01 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
更改主题的分区数量
$KAFKA_HOME/bin/kafka-topics.sh \ --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ --alter \ --partitions 2 \ --topic TOPIC-01
执行输出:
[root@centos7-02 ~]# $KAFKA_HOME/bin/kafka-topics.sh \ > --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ > --alter \ > --partitions 2 \ > --topic TOPIC-01 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
注意,分区的数量只能增加,不能减少,改小了报错
改小的执行输出:
[root@centos7-02 ~]# $KAFKA_HOME/bin/kafka-topics.sh \ > --zookeeper centos7-02:2181,centos7-03:2181,centos7-04:2181 \ > --alter \ > --partitions 1 \ > --topic TOPIC-01 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Error while executing topic command : The number of partitions for a topic can only be increased. Topic TOPIC-01 currently has 2 partitions, 1 would not be an increase. [2022-01-31 16:14:02,014] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic TOPIC-01 currently has 2 partitions, 1 would not be an increase. (kafka.admin.TopicCommand$)