消息队列MQ

Kafka运维&&开发扫盲(1)

本文主要是介绍Kafka运维&&开发扫盲(1),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

开发的时候经常使用kafka来解耦自己的代码,运维中的kafka 大多数也是稳定就不管了,大致记录一下一些基本概念和常见的优化方案,以及代码的常规使用方式。

概念扫盲

  • 基本体系架构: producer + broker + consumer + zk集群
  • producer: 生产者, 用于生产消息
  • Broker: 服务代理,可以理解为kafka集群的服务器,通常一台机器部署一个Kafka实例
  • consumer: 消费者, 用于消费消息
  • consumer group: 多个consumer组成一个consumer group,一条消息只能被一个consumer group 中一个consumer消费
  • topic: 主题, 对外暴露服务的主体
  • partition: 每个topic 可以拥有若干个partition,分布在不同broker上,可以理解为topic物理上的分区
  • offset: 每条消息的表示位,在消息记录到partition的时候已经定死
  • replica: 每个partition又多个replica, 用于消息的容灾
  • leader replica: leader 副本, 默认用来处理所有的读写请求
  • follower replica: follower 副本, 负责同步leader副本消息
  • consumer offset: consumer的消费进度,每个consumer 都有自己的consumer offset
  • log: partition在机器磁盘上以log体现, 以顺序追加的方式进行追加,避免了随机读写的压力, 使用顺序读写,减少压力

搭建

以为rhel7 为例

1. 部署ZK
yum -y install java-1.8.0* # 安装jdk
# 部署ZK
mkdir  -p  /opt/zookeeper 
wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar -zcvf zookeeper-3.4.12.tar.gz
mv zookeeper-3.4.12/* /opt/zookeeper 
cd /opt/zookeeper && cp -a zoo_sample.cfg zoo.cfg

修改默认的zk配置:

tickTime=2000 # 心跳发送时间,单位是ms
initLimit=10 # 	follower在启动过程中,会从leader同步所有最新数据,然后确定自己能够对外服务的起始状态。leader允许F在 initLimit 时间内完成这个工作, 时间为initlimit 个 ticktime
syncLimit=5 # flower 和 leader间通信能容忍的最大失链次数
dataDir=/opt/zookeeper/zkdata # 快照日志的存储路径
dataLogDir=/opt/zookeeper/zkdatalog # 事物日志的存储路径
clientPort=12181 # 客户端端口
server.1=192.168.7.100:12888:13888
server.2=192.168.7.101:12888:13888
server.3=192.168.7.107:12888:13888 #server.1 这个1是服务器的标识也可以是其他的数字,表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
maxClientCnxns=20 # 单个客户端与单台服务器之间的连接数的限制

更多配置可以参考这篇文章: https://www.cnblogs.com/xiohao/p/5541093.html

根据之前zoo.cfg 里的server.X的ID 写入节点的myid

 echo '1' > /opt/zookeeper/zkdata/myid

启动服务

cd bin/ && ./zkServer.sh start 
2. 部署Kafka
mkdir -p /opt/kafka
wget  https://mirrors.aliyun.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
tar -zxvf kafka_2.12-1.1.0.tgz 
mv kafka_2.12-1.1.0/* /opt/kafka/
cd /opt/kafka/conf 

修改kafka配置(server.properties):

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

启动服务

./kafka-server-start.sh -daemon ../config/server.properties
3. 常见操作
# 创建topic, 创建一个副本数为2,partitions 为1 ,名字是mytopic001的topic
./kafka-topics.sh --create --zookeeper 192.168.7.100:12181 --replication-factor 2 --partitions 1 --topic mytopic001
# 查看所有topic
./kafka-topics.sh --list --zookeeper localhost:2181
# 发送消息到指定topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic001
# 删除topic
./kafka-topics.sh --delete --topic mytopic001 --zookeeper 192.168.7.100:12181
# 查看topic细节
./kafka-topics.sh --describe --zookeeper 192.168.7.100:12181  --topic mytopic001

生产部署指北

1. 资源的预估

资源的预估其实主要还是IO的预估,说白了就是要根据业务需求估算出需要的服务器数量以及磁盘大小, 之前看到有个同事总结了一套公式,感觉非常有用,这里简单记录下:

  • 数据存储总量: S GB
  • 集群要求写入IO速率: W MB/s
  • 副本数量: R
  • 消费组数量: C
  • 延时落后的消费者数量: L

所谓的延时落后的消费者数量,比如有个消费者每天下午跑批,需要消费上午的一些消息,但这些消息已经不在内存里,而是在磁盘里的,这部分的开销,会影响磁盘IO。

基于磁盘预估资源:

基于上述的信息,可以得出Kafka集群的磁盘吞吐量为: W * R + W * L

然后根据服务器的型号,假设服务器为12块7200转的6T 磁盘,且是raid 10, 则服务器的磁盘IO 大概是300MB/s, 去掉一些不可抗力的因为,要满足集群的磁盘IO吞吐,起码需要 max((W * R + W * L)/300, S/(6*12/2)/1024) 的服务器数量, 但实际使用最好基于上述公式做一些预留,避免服务器宕机,磁盘rebuild等等问题。

基于网络预估资源:

基于上述的信息,可以得出Kafka集群的网络总写入量为: W * R

总读取量为: (R-1+c)*W
那么可以计算出基于网络的资源预估,起码需要的服务器数量为max((W * R)/100MB/s, ((R-1+c)*W)/100MB/s)

2. os优化
  • 文件描述符越大越好,inode也需要监控起来
  • 内核参数优化主要是内存&&网络这块:
echo "5" > /proc/sys/vm/dirty_background_ratio #百分值,保留过期页缓存(脏页缓存)的最大值。是以MmeFree+Cached-Mapped的值为基准的
echo "2000" > /proc/sys/vm/dirty_expire_centisecs #1/100秒。缓存页里数据的过期时间(旧数据),在下一个周期内被写入硬盘。默认30秒是一个很长的时间
echo "500" > /proc/sys/vm/dirty_writeback_centisecs #控制内核的脏数据刷新进程pdflush的运行间隔
# 调大tcp缓冲池,官方有网络参数的推荐配置
net.core.rmem_default=262144
net.core.rmem_max=2097152
net.core.wmem_default=262144
net.core.wmem_max=2097152
3. 配置优化
  • broker相关配置优化:
# 用于处理网络请求的线程数, 主要处理的是读写缓冲区的数据,可以配置为逻辑cpu数量+1
num.network.threads = xxx
# 用于处理磁盘IO操作,建议为cpu数量的2倍,最大不超过3倍
num.io.threads=xxx
  • log相关配置优化
# producer写10000条数据时候,再批量写入log,功能和脏页回收的内核参数其实冲突了,topic数据量比较小的情况下可以使用
log.flush.interval.message=10000
# 每间隔1s,刷输入到磁盘,功能和脏页回收的内核参数其实冲突了,topic数据量比较小的情况下可以使用
log.flush.interval.ms=1000
# 日志保留天数
log.retention.hours = 72
# 分段文件,kafka启动收是单线程扫描log.dir 下面的所有数据文件,所以分段文件不建议太小,不然数量会很多,可以使用1G
log.segment.bytes = 1073741824
  • replica相关配置
# 创建topic 时候默认创建的replica数量,不建议过少,影响数据的可靠性,太多的话磁盘空间会浪费,建议2-3
default.replication.factir:3
# 控制fetch线程数量,该线程是partition用来同步leader数据到本地
num.replica.fetchers:1
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流

在这里插入图片描述

这篇关于Kafka运维&&开发扫盲(1)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!