开发的时候经常使用kafka来解耦自己的代码,运维中的kafka 大多数也是稳定就不管了,大致记录一下一些基本概念和常见的优化方案,以及代码的常规使用方式。
以为rhel7 为例
yum -y install java-1.8.0* # 安装jdk # 部署ZK mkdir -p /opt/zookeeper wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz tar -zcvf zookeeper-3.4.12.tar.gz mv zookeeper-3.4.12/* /opt/zookeeper cd /opt/zookeeper && cp -a zoo_sample.cfg zoo.cfg
修改默认的zk配置:
tickTime=2000 # 心跳发送时间,单位是ms initLimit=10 # follower在启动过程中,会从leader同步所有最新数据,然后确定自己能够对外服务的起始状态。leader允许F在 initLimit 时间内完成这个工作, 时间为initlimit 个 ticktime syncLimit=5 # flower 和 leader间通信能容忍的最大失链次数 dataDir=/opt/zookeeper/zkdata # 快照日志的存储路径 dataLogDir=/opt/zookeeper/zkdatalog # 事物日志的存储路径 clientPort=12181 # 客户端端口 server.1=192.168.7.100:12888:13888 server.2=192.168.7.101:12888:13888 server.3=192.168.7.107:12888:13888 #server.1 这个1是服务器的标识也可以是其他的数字,表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里 maxClientCnxns=20 # 单个客户端与单台服务器之间的连接数的限制
更多配置可以参考这篇文章: https://www.cnblogs.com/xiohao/p/5541093.html
根据之前zoo.cfg 里的server.X的ID 写入节点的myid
echo '1' > /opt/zookeeper/zkdata/myid
启动服务
cd bin/ && ./zkServer.sh start
mkdir -p /opt/kafka wget https://mirrors.aliyun.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz tar -zxvf kafka_2.12-1.1.0.tgz mv kafka_2.12-1.1.0/* /opt/kafka/ cd /opt/kafka/conf
修改kafka配置(server.properties):
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 port=19092 #当前kafka对外提供服务的端口默认是9092 host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
启动服务
./kafka-server-start.sh -daemon ../config/server.properties
# 创建topic, 创建一个副本数为2,partitions 为1 ,名字是mytopic001的topic ./kafka-topics.sh --create --zookeeper 192.168.7.100:12181 --replication-factor 2 --partitions 1 --topic mytopic001 # 查看所有topic ./kafka-topics.sh --list --zookeeper localhost:2181 # 发送消息到指定topic ./kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic001 # 删除topic ./kafka-topics.sh --delete --topic mytopic001 --zookeeper 192.168.7.100:12181 # 查看topic细节 ./kafka-topics.sh --describe --zookeeper 192.168.7.100:12181 --topic mytopic001
资源的预估其实主要还是IO的预估,说白了就是要根据业务需求估算出需要的服务器数量以及磁盘大小, 之前看到有个同事总结了一套公式,感觉非常有用,这里简单记录下:
所谓的延时落后的消费者数量,比如有个消费者每天下午跑批,需要消费上午的一些消息,但这些消息已经不在内存里,而是在磁盘里的,这部分的开销,会影响磁盘IO。
基于磁盘预估资源:
基于上述的信息,可以得出Kafka集群的磁盘吞吐量为: W * R + W * L
然后根据服务器的型号,假设服务器为12块7200转的6T 磁盘,且是raid 10, 则服务器的磁盘IO 大概是300MB/s, 去掉一些不可抗力的因为,要满足集群的磁盘IO吞吐,起码需要 max((W * R + W * L)/300, S/(6*12/2)/1024) 的服务器数量, 但实际使用最好基于上述公式做一些预留,避免服务器宕机,磁盘rebuild等等问题。
基于网络预估资源:
基于上述的信息,可以得出Kafka集群的网络总写入量为: W * R
总读取量为: (R-1+c)*W
那么可以计算出基于网络的资源预估,起码需要的服务器数量为max((W * R)/100MB/s, ((R-1+c)*W)/100MB/s)
echo "5" > /proc/sys/vm/dirty_background_ratio #百分值,保留过期页缓存(脏页缓存)的最大值。是以MmeFree+Cached-Mapped的值为基准的 echo "2000" > /proc/sys/vm/dirty_expire_centisecs #1/100秒。缓存页里数据的过期时间(旧数据),在下一个周期内被写入硬盘。默认30秒是一个很长的时间 echo "500" > /proc/sys/vm/dirty_writeback_centisecs #控制内核的脏数据刷新进程pdflush的运行间隔 # 调大tcp缓冲池,官方有网络参数的推荐配置 net.core.rmem_default=262144 net.core.rmem_max=2097152 net.core.wmem_default=262144 net.core.wmem_max=2097152
# 用于处理网络请求的线程数, 主要处理的是读写缓冲区的数据,可以配置为逻辑cpu数量+1 num.network.threads = xxx # 用于处理磁盘IO操作,建议为cpu数量的2倍,最大不超过3倍 num.io.threads=xxx
# producer写10000条数据时候,再批量写入log,功能和脏页回收的内核参数其实冲突了,topic数据量比较小的情况下可以使用 log.flush.interval.message=10000 # 每间隔1s,刷输入到磁盘,功能和脏页回收的内核参数其实冲突了,topic数据量比较小的情况下可以使用 log.flush.interval.ms=1000 # 日志保留天数 log.retention.hours = 72 # 分段文件,kafka启动收是单线程扫描log.dir 下面的所有数据文件,所以分段文件不建议太小,不然数量会很多,可以使用1G log.segment.bytes = 1073741824
# 创建topic 时候默认创建的replica数量,不建议过少,影响数据的可靠性,太多的话磁盘空间会浪费,建议2-3 default.replication.factir:3 # 控制fetch线程数量,该线程是partition用来同步leader数据到本地 num.replica.fetchers:1