安装简要说明
新版本的kafka自带有zookeeper,其实自带的zookeeper完全够用,本篇文章以记录使用自带zookeeper搭建kafka集群。
1、关于kafka下载
kafka下载页面:http://kafka.apache.org/downloads
2、修改kafka配置文件
kafka安装目录下的config文件夹为其配置文件,我们需要修改的有 server.properties和zookeeper.properties。
2.1、修改日志目录
首先修改kafka的日志目录和zookeeper数据目录,因为这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失:
server.properties:
log.dirs=/opt/kafka-logs
修改为
log.dirs=/opt/kafka/logs
zookeeper.properties
dataDir=/opt/zookeeper
修改为
dataDir=/opt/zookeeper/data
2.2、配置kafka
kafka集群为便于推选leader,一般由奇数台服务组件集群,这里以三台为例,分别已xxx.xxx.xxx.xx01、xxx.xxx.xxx.xx02和xxx.xxx.xxx.xx03代表服务的ip。
(服务器IP可使用ifconfig命令查看)
修改server.properties:
1、设置broker.id,保证每个broker唯一,第一台可以不修改默认为0,后面两台需要修改,如改为1和2
2、设置num.partitions,分区数量一般与broker保持一致
3、设置advertised.listeners和listeners,如listeners=PLAINTEXT://xxx.xxx.xxx.xx:9092
4、设置zookeeper.connect,配置三台服务zookeeper连接地址,如zookeeper.connect=xxx.xxx.xxx.xx01:2181,xxx.xxx.xxx.xx02:2181,xxx.xxx.xxx.xx03:2181
修改zookeeper.properties:
1、设置连接参数,添加如下配置
tickTime=2000
initLimit=10
syncLimit=5
2、设置broker Id的服务地址
server.0=xxx.xxx.xxx.xx01:2888:3888 server.1=xxx.xxx.xxx.xx02:2888:3888 server.2=xxx.xxx.xxx.xx03:2888:3888
zookeeper数据目录添加id配置
在各台服务的zookeeper数据目录添加myid文件,写入服务broker.id属性值,如这里的目录是/opt/zookeeper/data
第一台broker.id为0的服务到该目录下执行:echo 0 > myid
3、启动kafka
kafka启动时先启动zookeeper,再启动kafka;关闭时相反,先关闭kafka,再关闭zookeeper
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka:
bin/kafka-server-start.sh config/server.properties &
4、测试kafka集群
4.1、在第一台服务上创建test主题
bin/kafka-topics.sh --create --topic test --zookeeper xxx.xxx.xxx.xx01:2181 --replication-factor 3 --partition 3
4.2、展示主题,确认主题创建成功
bin/kafka-topics.sh --list --zookeeperxxx.xxx.xxx.xx01:2181
4.3、创建生产者
bin/kafka-console-producer.sh --broker-listxxx.xxx.xxx.xxA:9092 --topic test
4.4、创建消费者,再另外两台服务上分别创建消费者
bin/kafka-console-consumer.sh --zookeeper xxx.xxx.xxx.xx02:2181 --topic test --from-beginning
bin/kafka-console-consumer.sh --zookeeper xxx.xxx.xxx.xx03:2181 --topic test --from-beginning
4.5、测试消息发布与消费
在IP为xxx.xxx.xxx.xx01的服务生产中输入消息回车发送,确认在服务02和03中的消费者是否接收到
5、其他
配置完成后需要修改端口或其他配置,但是没有生效,启动报错,可以尝试清楚kafka(/opt/kafka/logs)和zookeeper(/opt/zookeeper/data)缓存目录的内容然后重新启动。
BROKER配置
最为核心的三个配置 broker.id、log.dir、zookeeper.connect
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id =1
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
##提供给客户端响应的端口
port =6667
##消息体的最大大小,单位是字节
message.max.bytes =1000000
num.network.threads =3
num.io.threads =8
background.threads =4
queued.max.requests =500
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name
advertised.host.name
advertised.port
socket.send.buffer.bytes =100*1024
socket.receive.buffer.bytes =100*1024
socket.request.max.bytes =10010241024
------------------------------------------- LOG 相关 -------------------------------------------
log.segment.bytes =102410241024
log.roll.hours =24*7
log.cleanup.policy = delete
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
log.retention.bytes=-1
log.retention.check.interval.ms=5minutes
log.cleaner.enable=false
log.cleaner.threads =1
log.cleaner.io.max.bytes.per.second=None
log.cleaner.dedupe.buffer.size=50010241024
log.cleaner.io.buffer.size=512*1024
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.backoff.ms =15000
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms =1day
log.index.size.max.bytes =1010241024
log.index.interval.bytes =4096
log.flush.interval.messages=None
log.flush.scheduler.interval.ms =3000
log.flush.interval.ms = None
log.delete.delay.ms =60000
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
auto.create.topics.enable =true
default.replication.factor =1
num.partitions =1
实例 --replication-factor3–partitions1–topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
controller.socket.timeout.ms =30000
controller.message.queue.size=10
replica.lag.time.max.ms =10000
replica.lag.max.messages =4000
##follower与leader之间的socket超时时间
replica.socket.timeout.ms=30*1000
replica.socket.receive.buffer.bytes=64*1024
replica.fetch.max.bytes =1024*1024
replica.fetch.wait.max.ms =500
replica.fetch.min.bytes =1
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms =5000
controlled.shutdown.enable =false
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
auto.leader.rebalance.enable =false
leader.imbalance.per.broker.percentage =10
leader.imbalance.check.interval.seconds =300
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181–create --topic my-topic --partitions1–replication-factor1–config max.message.bytes=64000–config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181–alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181–alter --topic my-topic --deleteConfig max.message.bytes
Consumer配置
最为核心的配置是group.id、zookeeper.connect
group.id
consumer.id
client.id = group id value
zookeeper.connect=localhost:2182
zookeeper.session.timeout.ms =6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
auto.offset.reset = largest
socket.timeout.ms=30*1000
socket.receive.buffer.bytes=64*1024
##从每个分区获取的消息大小限制
fetch.message.max.bytes =1024*1024
auto.commit.enable =true
auto.commit.interval.ms =60*1000
queued.max.message.chunks =10
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
rebalance.max.retries =4
rebalance.backoff.ms =2000
refresh.leader.backoff.ms
fetch.min.bytes =1
fetch.wait.max.ms =100
consumer.timeout.ms = -1
转载于:https://blog.csdn.net/weixin_34153893/article/details/94459270