Zookeeper安装
#cd /usr/local
#tar xvfz 文件名.tar.gz
进入zookeeper目录下的conf子目录, 创建zoo.cfg(或者直接重命名zoo-sample.cfg):
修改相关参数:
1、tickTime=2000
2、dataDir=/usr/local/zookeeper/data (目录必须存在,否则可能无法启动)
3、dataLogDir=/usr/local/zookeeper/logs (目录必须存在,否则可能无法启动">)
4、clientPort=2181
#/usr/local/zookeeper/bin/zkServer.sh start & (&是为了后台运行zookeeper)
通过zkServer.sh 这个脚本进行服务的相关操作 1. 启动ZK服务: sh bin/zkServer.sh start 2. 查看ZK服务状态: sh bin/zkServer.sh status 3. 停止ZK服务: sh bin/zkServer.sh stop 4. 重启ZK服务: sh bin/zkServer.sh restart
#cd /usr/local
#tar zxvf ">kafka.tgz
修改配置文件:/usr/local/kafka/config/server.properties
broker.id=0
zookeeper.connect=localhost:2181
这里使用本机的zookeeper,上一步zookeeper已启动。
Ø 创建topic,名字为test:
#/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.201.8.62:2181 --replication-factor 1 --partitions 1 --topic test
查看topic列表:
#/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.201.8.62:2181
Ø 发送消息(启动一个生产者)
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.201.8.62:9092 --sync --topic test
Ø 消费消息(启动一个消费者)
打开一个新的终端,输入命令:
# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.201.8.62:2181 --topic test --from-beginning
这时候从原终端输入消息,新终端上就可以收到消息:
kafka信息查询语句:
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.201.8.62:9092 --list
Ø 停止kafka
Ø 打开zookeeper客户端删除topics
# /usr/local/zookeeper/bin/zkCli.sh
ls /brokers/topics (查看">topics)
rmr /brokers/topics/topics名字 (删除">topics)
Ø 到kafka的配置文件">server.properties中配置的日志记录路径下(">log.dirs=/tmp/kafka-logs)删除相关">topics的文件夹
Ø 停止zookeeper
1、启动">kafka时提示java.lang.UnsupportedClassVersionError,可能是jdk版本过低。
2、springboot 集成远程服务器的kafka,报错localhost/127.0.0.1:9092) could not be established. Broker may not be availa
解决方法:修改远程服务器的kafka config下的配置文件server.properties.
修改文件下的advertised.listeners=PLAINTEXT://(这里为你的服务器的ip地址):9092 。