Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于 zookeeper 协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 的批处理系统、低延迟的实时系统、storm/Spark 流式处理引擎,web/nginx 日志、访问日志,消息服务等等,用 scala 语言编写,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战:
如何收集这些巨大的信息;
如何分析它;
如何及时做到如上两点;
以上几个挑战形成了一个业务需求模型,即 生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统 。从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。
Kafka 一个分布式消息系统应运而生:
Kafka-由 linked-in 开源;
kafka-即是解决上述这类问题的一个框架,它实现了生产者和消费者之间的无缝连接;
kafka-高产出的分布式消息系统(A high-throughput distributed messaging system);
解耦 :
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
冗余 :
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
扩展性 :
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
灵活性 & 峰值处理能力 :
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性 :
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证 :
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
缓冲 :
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
异步通信 :
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
producer :消息生产者,发布消息到 kafka 集群的终端或服务。
broker :kafka 集群中包含的服务器。
topic :每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
partition :partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
consumer :从 kafka 集群中消费消息的终端或服务。
consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
replica :partition 的副本,保障 partition 的高可用。
leader :replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
follower :replica 中的一个角色,从 leader 中复制数据。
controller :kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
zookeeper :kafka 通过 zookeeper 来存储集群的 meta 信息。
高吞吐量、低延迟 :kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
可扩展性 :kafka集群支持热扩展;
持久性、可靠性 :消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
容错性 :允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
高并发 :支持数千个客户端同时读写;
consumergroup :各个 consumer 可以组成一个组,每个消息只能被组中的一个 consumer 消费,如果一个消息可以被多个 consumer 消费的话,那么这些 consumer 必须在不同的组。
消息状态 :在 Kafka 中,消息的状态被保存在 consumer 中,broker 不会关心哪个消息被消费了被谁消费了,只记录一个 offset 值(指向 partition 中下一个要被消费的消息位置),这就意味着如果 consumer 处理不好的话,broker 上的一个消息可能会被消费多次。
消息持久化 :Kafka 中会把消息持久化到本地文件系统中,并且保持极高的效率。
消息有效期 :Kafka 会长久保留其中的消息,以便 consumer 可以多次消费,当然其中很多细节是可配置的。
批量发送 :Kafka 支持以消息集合为单位进行批量发送,以提高 push 效率。
push-and-pull : Kafka 中的 Producer 和 consumer 采用的是 push-and-pull 模式,即 Producer 只管向 broker push 消息,consumer 只管从 broker pull 消息,两者对消息的生产和消费是异步的。Kafka集群中 broker 之间的关系:不是主从关系,各个 broker 在集群中地位一样,我们可以随意的增加或删除任何一个 broker 节点。
负载均衡方面 :Kafka 提供了一个 metadata API 来管理 broker 之间的负载(对 Kafka 0.8.x 而言,对于 0.7.x 主要靠 zookeeper 来实现负载均衡)。
同步异步 :Producer 采用异步 push 方式,极大提高 Kafka 系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
分区机制 partition :Kafka 的 broker 端支持消息分区,Producer 可以决定把消息发到哪个分区,在一个分区中消息的顺序就是 Producer 发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
离线数据装载 :Kafka 由于对可拓展的数据持久化的支持,它也非常适合向 Hadoop 或者数据仓库中进行数据装载。
插件支持 :现在不少活跃的社区已经开发出不少插件来拓展 Kafka 的功能,如用来配合 Storm、Hadoop、flume 相关的插件。
日志收集 :一个公司可以用Kafka可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等。
消息系统 :解耦和生产者和消费者、缓存消息等。
用户活动跟踪 :Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
运营指标 :Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理 :比如 spark streaming 和 storm
如上图所示,点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
如上图所示,发布订阅模式是一个基于消息送的消息传送模型,改模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是 consumer1、consumer2、consumer3 由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是 8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则 consumer3 无法承受!如果队列推送的速度为 2M/s,则 consumer1、consumer2 会出现资源的极大浪费!
作为一个消息系统, Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息 。一些日志收集系统 (logging-centric system),比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适。pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义 。
我们看上面的架构图中,producer 就是生产者,是数据的入口。注意看图中的红色箭头, Producer 在写入数据的时候永远的找 leader,不会直接将数据写入 follower !那 leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:
先从集群获取分区的 leader;
producer 将消息发送给 leader;
Leader 将消息写入本地文件;
followers 从l eader 拉取消息;
followers 将消息写入本地后向 leader 发送 ACK 确认;
leader 收到所有副本的 ACK 后向 producer 发送 ACK 确认;
需要注意的一点是,消息写入 leader 后,follower 是主动的去 leader 进行同步的!producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的 !写入示意图如下:
上面说到数据会写入到不同的分区,那 kafka 为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
方便扩展 :因为一个 topic 可以有多个 partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
提高并发 :以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在 kafka 中,如果某个 topic 有多个 partition,producer 又怎么知道该将数据发往哪个 partition 呢?kafka 中有几个原则:
partition 在写入的时候可以指定需要写入的 partition,如果有指定,则写入对应的 partition;
如果没有指定 partition,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition;
如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition;
保证消息不丢失是一个消息队列中间件的基本保证,那 producer 在向 kafka 写入消息的时候, 怎么保证消息不丢失呢 ?其实上面的写入流程图中有描述出来, 那就是通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据,这个参数可设置的值为 0、1、all 。
0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保 leader 发送成功。
all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条,确保 leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的 topic 写数据,能不能写入成功呢?kafka 会自动创建 topic,分区和副本的数量根据默认配置都是 1。
Producer 将数据写入 kafka 后,集群就需要对数据进行保存了!kafka 将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka 初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个 topic 都可以分为一个或多个 partition,如果你觉得 topic 比较抽象,那 partition 就是比较具体的东西了!Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 的文件夹下面会有多组 segment 文件,每组 segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件, log 文件就实际是存储 message 的地方,而 index 和 timeindex 文件为索引文件,用于检索消息。
如上图,这个 partition 有三组 segment 文件,每个 log 文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message 大小不一致)。文件的命名是以该 segment 最小 offset 来命名的,如 000.index 存储 offset 为 0~368795 的消息, kafka 就是利用分段+索引的方式来解决查找效率的问题 。
上面说到 log 文件就实际是存储 message 的地方,我们在 producer 往 kafka 写入的也是一条一条的 message,那存储在 log 中的 message 是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型...我们重点需要知道的是下面三个:
offset :offset 是一个占 8byte 的有序 id 号,它可以唯一确定每条消息在 parition 内的位置;
消息大小 :消息大小占用 4byte,用于描述消息的大小;
消息体 :消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
无论消息是否被消费,kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?
基于时间 ,默认配置是 168 小时(7天);
基于大小 ,默认配置是 1073741824。
需要注意的是, kafka 读取特定消息的时间复杂度是 O(1) O ( 1 ) ,所以这里删除过期的文件并不会提高 kafka 的性能 !
消息存储在 log 文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka 采用的是发布订阅模式,消费者主动的去 kafka 集群拉取消息,与 producer 相同的是,消费者在拉取消息的时候也是找 leader 去拉取 。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组 id!同一个消费组者的消费者可以消费同一 topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!我们看下图:
图示是消费者组内的消费者小于 partition 数量的情况,所以会出现某个消费者消费多个 partition 数据的情况,消费的速度也就不及只处理一个 partition 的消费者的处理速度! 如果是消费者组的消费者多于 partition 的数量,那会不会出现多个消费者消费同一个 partition 的数据呢 ?上面已经提到过不会出现这种情况! 多出来的消费者不消费任何 partition 的数据 。 所以在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量一致 !
在保存数据的小节里面,我们聊到了 partition 划分为多组 segment,每个 segment 又包含 .log、.index、.timeindex 文件,存放的每条 message 包含 offset、消息大小、消息体……我们多次提到 segment 和 offset,查找消息的时候是怎么利用 segment+offset 配合查找的呢?假如现在需要查找一个 offset 为 368801 的 message 是什么样的过程呢?我们先看看下面的图:
先找到 offset 的 368801 message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件。
打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796+5=368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为 5 的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset为 4 的这个索引。
根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 offset 为 368801 的那条 Message。
这套机制是建立在 offset 为有序的基础上,利用 segment+有序offset+稀疏索引+二分查找+顺序查找 等多种手段来高效的查找数据。至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的 offset 维护 zookeeper 中,consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 offset 已经直接维护在kafka 集群的 consumer_offsets 这个 topic 中了。