将kafka添加到环境变量中
vim /etc/profile export KAFKA_HOME=/opt/iDataFusion/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile
创建topic:
--create: 指定创建topic动作 --topic:指定新建topic的名称 --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样 --config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration --partitions:指定当前创建的kafka分区数量,默认为1个 --replication-factor:指定每个分区的复制因子个数,默认1个 kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic my-kafka-topic 在创建Topic的时候,有两个参数是需要填写的,那就是partions和replication-factor。 partions 主题分区数。kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。 消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。 分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。 如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。 replication-factor 用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
生产者操作:
kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic
消费者操作:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic
查看所有topic列表:
kafka-topics.sh --zookeeper localhost:2181 --list
查看指定topic信息:
kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-kafka-topic
控制台向topic生产数据:
kafka-console-producer.sh --broker-list node86:9092 --topic my-kafka-topic
控制台消费topic的数据:
kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-kafka-topic --from-beginning 增加topic分区数(Kafka分区数量只允许增加,不允许减少) 为topic my-kafka-topic 增加10个分区 kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-kafka-topic --partitions 10
删除topic(确定后再谨慎使用)
默认情况下Kafka的Topic是没法直接删除的,需要进行相关参数配置 kafka-topics.sh --delete --topic test0 --zookeeper ocalhost:2181 Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式: 方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可 方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除
查看topic消费进度
kafka-consumer-groups.sh --bootstrap-server test1:9092 --list kafka-consumer-groups.sh --bootstrap-server test1:9092 --describe --group groupname