已复制
全屏展示
复制代码

Kafka 原理与架构完全总结


· 20 min read

一. 概要介绍

Kafka是一个高性能的消息发送与高性能的消息消费的消息引擎系统,由Scala和Java编写,随着Kafka的不断演进,目前Kafka的标准定为是分布式流试处理平台,而不仅仅是消息系统了,Kafka目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台,基本架构注意包括:

  • 生产者发送消息给Kafka服务器。
  • 消费者从Kafka服务器读取消息。
  • Kafka服务器依托zookeeper进行服务的协调管理。

二. 核心概念

message、broker、topic、partition、offset、replica、leader、follower、ISR

2.1 broker

一台kafka服务器就是一个broker,一个集群由一个或多个broker组成。在kafka的配置文件server.properties中,broker.id=0表示当前服务器的broker ID号,该ID号在集群内是全局唯一的。如果在节点紧缺的时候,一台服务器也可以启动多个broker。

2.2 message

Kafka中的消息格式由很多字段组成,其中的很多字段都是用于管理消息的元数据字段,它使用紧凑的二进制字节数组来保存这些字段。日常使用到的有三个字段:key、value、timestamp。

  • key:表示消息键,对消息做partition时使用,即决定消息被保存到某topic下的哪个partition。
  • value:表示实际的消息内容。
  • timestamp:表示消息发送时间戳,用于依赖时间的处理语义,如果不指定则取当前时间。
  • headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入在消息体里面。Header的格式包含key和value,一个Record里面可以包含0至多个Header。

2.3 topic 和 partition

从概念上来说,topic只是一个逻辑概念,一个topic代表了一类消息,通常我们使用topic来区分实际业务,比如业务A使用topicA,业务B使用topicB。

Kafka中topic通常都会被多个消费者订阅,因此出于性能的考量,Kafka并不是topic-message的 两级结构,而是采用了 topic-partition-message 的三级结构来分散负载,从本质上来说,每个topic都由若干个partition组成。

topic由多个partition组成,而partition是不可修改的有序消息序列,按顺序写入partition。每个partition有自己的专属partition号,从0开始。用户对partition唯一能做的操作就是在消息序列的尾部追加消息。其实partition实际上并没有太多的业务含义,它的引入就是单纯地为了提升系统的吞吐量,因此在创建topic时根据实际情况设置合理的partition数,实现整体性能的最大化。

2.4 offset

partition上的每一条消息都会被分配一个唯一的序列号,这个序列号叫做offset,该offset的值为从0开始顺序递增的整数,位移信息可以唯一确定到具体的某一条消息(注意:offset每个分区都从0开始递增)。

在 Kafka 消费者端也有位移offset的概念,和服务端offset概念不同,在服务端每条消息的offset是固定的,消费者端的offset即记录消费的进度,如果消费者不幸挂掉,等它重启的时候会知道自己上次消费到哪里了,这个记录可以保存在broker端(即通过group消费),也可以客户端自己维护。另外,消费者可以自己指定从服务端的那个offset处开始消费。

综上总结,Kafka中的一条消息其实就是一个(topic, partition, offset)三元组,通过该元组我们可以在kafka集群中找到唯一对应的那条消息。

2.5 replica

Kafka保证同一个partition的多个replica一定不会分配在同一台broker上,毕竟如果同一个broker上有同一个partition的多个replica,那么将无法实现备份冗余效果。replica的唯一目的就是防止数据的丢失,一个topic中至少有一个partition,而一个partition至少有一个replica,就是partition自己。

2.6 leader 和 follower

其实replica副本是partition的另一种叫法,所有的partition都叫副本,只不过扮演的角色不同而已。副本分为两类:leader replica和follower replica, leader为客户端提供消息的写入和消费服务,follower被动地向leader获取数据,保持和leader的同步,这些follower的唯一价值是充当leader的候补,而一旦leader所在的broker宕机,kafka会从众多follower当中选举出新的leader。

2.6 ISR

ISR的全称是 in-sync replica,就是与leader replica保持同步的replica集合。kafka为partition维护一个replica集合,该集合中的所有replica保存的消息日志都与leader replica 保持同步状态,只有集合中replica才能被选举为leader,也只有该集合中的所有replica都收到了同一条消息,kafka才会将该消息置于“已提交”状态,即认为这条消息发送成功。

Kafka承诺只要这个集合中至少存在一个replica,那些“已提交”状态的消息就不会丢失,这句话有两个关键点:1. ISR中至少存在一个“活着的”replica;2.消息是“已提交”状态,所有必须要满足这两个条件,才能保证消息不会丢失。

ISR是动态调整的,正常情况下,partition的所有follower replica都应该与leader replica保持同步,即所有replica都在ISR中,因为各种各样的原因,一小部分replica开始落后于leader replica的进度。当滞后到一定程度时,Kafka会将这些replica踢出ISR,当这些replica重新追上leader的进度时,Kafka会将他们加回到ISR中。这一切都是自动维护的,不需要 用户进行人工干预,因而在保证了消息交付语义的同时还简化了用户的操作成本。

三. 工作流程及文件存储机制

Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件目录(topic名称-分区序号),该log文件目录中存储的就是producer生产的数据。

Producer生产的数据会被不断追加到该log文件目录中的文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

Kafka采取分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——".index"文件和".log"文件。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。每个分区的文件名如下。

# ls -l first-0
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
leader-epoch-checkpoint
partition.metadata
文件命名规则
  • partition第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值+1,数值大小为64位,20位数字字符长度,没有数字用0填充。
稀疏索引
  • Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销。
根据索引查找消息
  • ".index"文件存储大量的索引信息,".log"文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。

四. Kafka生产者

4.1 分区方法

  • 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  • 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

4.2 数据可靠性保证

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

leader写入成功后,同步给ISR集合中的所有follower之后,才会向Producer发送ack消息,因为这样才能确保如果leader挂掉后其他的follower才能被选举成为leader。

如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由 replica.lag.time.max.ms 参数设定

4.3 ACKS应答机制

在客户端生产消息的时候,可以指定broker的应答方式,对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功,acks的取值有三种。

  • 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就会返回,当broker故障时有可能丢失数据;
  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。解决这个有两个方法
  • a、从消费者角度解决:消费者使用幂等的消费模式(消息本身的消费使用幂等方式处理)。
  • b、Broker端解决:在0.11版本之后Kafka引入了幂等性机制(idempotent),将enable.idempotence属性设置为true,kafka自动将acks属性设为-1,解决幂等性。

五. 高水位High Watermark

5.1 定义

  • leader副本:响应clients端读写请求的副本,leader副本保存了所有follower副本的LEO值。
  • follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
  • ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本。

每个Kafka副本对象都有两个重要的属性:LEO和HW

  • LEO:日志末端位移(log end offset),记录了该副本日志(log)中下一条消息的位移值。如果LEO=10,表示该副本保存了10条消息,offset为[0, 9]。
  • HW:表示位置信息即位移(offset)high watermak。对于同一个副本而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是 "已备份"的。Kafka 使用 Leader 副本的高水位来定义所在Partition的高水位。换句话说,Partition的高水位就是其 Leader 副本的高水位。

5.2 高水位作用

1)定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。

2)帮助 Kafka 完成副本同步。

5.3 Follower 端拉取数据逻辑

从 Leader 拉取消息的处理逻辑如下:

  • 请求 Leader,告诉Leader当前Follower的LEO,从Leader拉取offset为LEO的消息(包含Leader的HW、消息本身),然后写入消息到本地磁盘。
  • 更新 Follower 副本 LEO 值。
  • 获取 Leader 发送的高水位值为 HW。
  • 获取 Follower 副本更新后的 LEO 值。
  • 更新 Follower 高水位为 min(HW, LEO)。

5.4 Leader 接收 Producer 消息逻辑

从 Producer 接收消息的处理逻辑如下:

  • 写入消息到本地磁盘。
  • 更新 Leader 副本的 LEO。
  • 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,LEO-n)。
  • 获取 Leader 副本当前高水位值 HW。
  • 更新 HW = max{HW, min(LEO-1, LEO-2,LEO-n)}。

5.5 Leader 处理 Follower 拉取消息逻辑

  • 读取 Leader 磁盘(或页缓存)中的消息数据。
  • 使用 Follower 副本发送请求中的位移值更新本地broker的远程副本 LEO 值。
  • 更新Leader高水位值,即分区高水位值(具体步骤与处理生产者请求的步骤相同)。

六. Leader Epoch

问题:Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。当leader故障恢复是,由于Follower的HW和挂掉的Leader的HW不一致,导致数据不一致。

6.1 定义

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  • Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  • 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

七. 故障处理

7.1 follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

7.2 leader故障

leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

八. 分区分配策略

首先kafka设定了默认的消费逻辑:一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。

进行分区分配的条件,只有满足了这三个条件的任意一个,才会进行分区分配。

  • 同一个 Consumer Group 内新增消费者;
  • topic 新增分区;
  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes。

kafka提供了消费者客户端参数 partition.assignment.strategy 用来设置消费者与topic之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用range分配策略。除此之外,Kafka中还提供了roundrobin分配策略和sticky分区分配策略。partition.asssignment.strategy可以配置多个分配策略,把它们以逗号分隔就可以了。

8.1 Range分配策略

Range分配策略是面向单个主题的,首先对同一个主题里面的分区按照序号进行排序,然后把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

比如现有一个topic,包含7个分区(p0~p6),使用3个线程(c0~c2)来消费,7除以3等于2余1,所以每个线程首先分配2个,多余的一个给第一个线程消费,所以最终结果是:c0(p0~p2)  c1(p3~p4)  c2(p5~P6)

缺点:一般在实际生产环境下,会有多个主题,在这种情况下,第一个线程就会普遍有更多的分区,导致线程1负载更高。

8.2 RoundRobin分配策略

RoundRobin策略是面向多个主题的,原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(比如t0p0、t0p1、t0p2、t1p0、t1p1、t1p2),然后通过轮询将分区以此分配给排好序的每个消费者。

  • 情况1:如果同一消费组内,所有的消费者订阅的消息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。
  • 情况2:如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候,此消费者将不会分配到这个 topic 的任何分区。

因此在使用RoundRobin分配策略时,为了保证得均匀的分区分配结果,需要满足两个条件,如果无法满足,那最好不要使用RoundRobin分配策略。

  • 同一个消费组里的每个消费者订阅的主题必须相同;
  • 同一个消费者组里面的所有消费者的 num.streams 必须相等;

8.3 Sticky分配策略

在kafka的0.11.X版本才开始引入,是目前最复杂也是最优秀的分配策略。Sticky分配策略的原理比较复杂,它的设计主要实现了两个目的:

  • 分区的分配要尽可能的均匀;
  • 分区的分配尽可能的与上次分配的保持相同。

如果这两个目的发生了冲突,优先实现第一个目的。第一个目的和RoundRobin的目的相似,不同点在于重新分配的时候,消费过程中某一个消费者down掉了,这时需要重新分配,如果使用RoundRobin重新分配,分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生。

九. Kafka读写性能

9.1 页缓存+顺序写

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。而顺序(追加文件)写磁盘,则是为了减少机械盘磁臂的移动寻找消耗。

  • 写的时候写 操作系统的页缓存,而不是直接写磁盘。
  • 刷写磁盘时采用顺序写入提高性能。

9.2 零拷贝

零拷贝技术指的是消费在读取消息时,采用零拷贝技术,能提升消费性能,正常情况下的消费数据流向如下所示,经过了4次拷贝。

  • 操作系统 将数据从 磁盘文件 中读取到 内核空间;
  • 应用程序 将数据从 内核空间 读入 用户空间缓冲区;
  • 应用程序 将读到数据写回 内核空间并放入网络缓冲区;
  • 操作系统 将数据从 网络缓冲区 复制到 网卡接口发送出去。

Kafka的零拷贝技术只是减少了拷贝的次数,并不是真的没有拷贝。零拷贝技术存在两次拷贝

  • 操作系统 将数据从 磁盘文件 中读取到 内核空间;
  • 操作系统 将数据从 内核空间的页面缓存 复制到 网卡接口发送出去。

通常情况下,Kafka的消息会有多个订阅者,生产者发布的消息会被不同的消费者多次消费,如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到内核空间的页面缓存,10次表示10个消费者各自读取一次内核空间的页面缓存。

参考文档

🔗

文章推荐