Kafka 是一款分布式、支持分区、多副本,基于 Zookeeper 协调的发布/订阅消息系统,也就是常说的消息队列。目前市面上绝大多数业务系统使用消息队列的主要原因如下:
- 解耦:当下游系统需要当前系统数据时,无需通过 RPC 或 HTTP 接口的方式传递,直接发送 MQ,下游需要时直接从队列消费即可
- 削峰:对于突发性的流量,通过消息队列将请求先保存在队列,之后从队列根据系统处理速度消费,防止流量一下子把系统打挂
- 健壮:消息队列可以堆积请求,即使下游系统挂掉也不会影响消息的正常处理
- 异步:如缓冲中提到的那样,消息异步处理可以减轻服务端压力
- 冗余:同一份数据可能多个下游系统都需要,广播消息,下游直接消费即可
Kafka 目前常被用在以下场景:
- 日志收集:使用 kafka 收集各种服务的 log,再以统一接口的方式开放给各种 consumer,例如 HBase、Hadoop,Solr 等
- 消息系统:业务逻辑使用,生产者,消费者
- 用户活动跟踪:记录 web 用户或者 app 用户的各种活动,将这些活动发送到统一 topic 中,后序接入到 hadoop、数据仓库中做离线分析和数据挖掘
- 运营指标:收集各种分布式应用的数据,比如报警或者报告
目前市面上常见的消息队列有以下几种:
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
| 吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
| 时效性 | 毫秒(ms) | 微秒(μs) | 毫秒(ms) | 毫秒(ms) |
| 可用性 | 高,主从架构 | 高,主从架构 | 非常高,分布式架构 | 非常高,分布式架构 |
| 可靠性 | 有较低概率丢失 | 基本不丢 | 优化参数,不丢 | 优化参数,不丢 |
由于吞吐量较差的原因,ActiveMQ 和 RabbitMQ 目前市面上用的较少。其中 RabbitMQ 集群的架构如下:
从架构图可以看出,RabbitMQ 中每个队列 queue 包含完整消息数据,无法实现水平扩展,这也是 RabbitMQ 吞吐量低的主要原因,ActiveMQ 吞吐量低和它同理
RocketMQ 诞生的背景:2011年初,Linkedin 开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后,Kafka 无限消息堆积,高效的持久化速度吸引了我们,但同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输,目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
就目前来说,Kafka 和 RocketMQ 主要包含以下区别:
- 开发语言:Kafka 采用 Scala 编写,RocketMQ 采用 Java 编写
- 消息处理模式:Kafka 采用 pull 的方式处理消息,RocketMQ 采用 push 的方式
- 性能:Kafka 单机写入 TPS 约在百万条/秒,RocketMQ 约在 7万条/秒
- 单机支持队列数,Kafka 单机超过64个队列性能明显下降,RocketMQ 单机最多支持5w个,性能也不回发生明显变化
- 消息投递实时性:Kafka 基于消费端拉取频率间隔,RocketMQ 采用长轮训,几乎没有延时
- 消费失败重试:Kafka 不支持,RocketMQ 支持
- 定时消息:Kafka 不支持,RocketMQ 支持
- 消息查询:Kafka 不支持消息查询,RocketMQ 根据消息ID或者消息内容查询
- 消息回溯:Kafka 通过 offset 实现回溯,RocketMQ 根据时间回溯
- 消费并行度:Kafka 基于 topic 分区数,RocketMQ 顺序消费时和 Kafka 一致,乱序消费时基于 Consumer 线程数
- 消息堆积能力:理论上 Kafka 更强,不过 RocketMQ 也满足目前市面上绝大多数业务场景
总的来说,Kafka 在性能上综合表现确实比 RocketMQ 更加优秀,但是从功能性上来说,RocketMQ 提供了丰富的消息处理功能,如:消息检索、事务消息、消费重试、定时消息等
综上所述:对于大数据、追求高吞吐量,如日志采集加工等场景,建议使用 Kafka。对于业务复杂,需要削峰,可靠性、稳定性要求很高,如支付等场景,建议使用 RocketMQ
Kafka 原理总结
Kafka 集群架构图
- Producer:生产者,向 Kafka broker 发送消息的客户端
- Customer:消费者,从 Kafka broker 拉取消息的客户端
- Customer Group:消费者组,每个消费者都属于某个消费者组、组内每个消费者消费不同分区,同一个分区同时只能由组内一个消费者消费,不同组之间没有限制
- Broker:一个 Kafka 实例就是一个 broker,一个 broker 可以包含多个 topic。broker 主要负责消息存储和转发
- Topic:对消息逻辑分类,生产者和消费者根据 topic 生产或消费同一类数据
- Partition:对同一类消息物理分组,一个 topic 可以分为多个 partition 分布在不同 broker 上,每个 partition 是一个有序消息队列
- Replica:副本,保证某个 broker 挂掉后数据不会丢失。每个 topic 的每个 partition 都包含多个副本,一个 leader,若干个 follower
- Leader:主分区,生产者发送和消费者拉取的都在主分区进行
- Follower:从 leader 同步数据,保持和 leader 数据同步。leader 出问题后,从 follower 中选出新的 leader
- offset:消费者消费的位置,防止消费者重启后二次消费或漏消费
- ZooKeeper:帮助 Kafka 存储和管理集群信息
从架构图可以看出,kafka 通过将消息 topic 划分为不同的 partition 并分配到不同的 broker 上实现水平扩展,提高吞吐量
Kafka 消息存储优化
Kafka 中消息按照 topic 进行分类,topic 是一个逻辑概念,一个 topic 对应多个 partition ,partition 是一个物理概念。这里分区主要有以下两个好处:
- 提高并发量:生产和消费都是基于 partition 来实现,线程可以在多个 partition 上并发执行
- 备份:每个分区还包含 follower 节点,当 leader 分区对应 broker 挂掉后,可以从其它 follower 中选出新的 leader,数据不会完全丢失
每个 partition 实际对应一个 log 文件,生产者每次将消息内容追加到文件末尾,消费者从指定 offset 消费。随着业务的进行,log 文件越来越大,严重影响检索速率。为了解决该问题,Kafka 将 partition 分为多个 segment,每个 segment 对应两个文件
同一个 partition 下所有 segment 所对应的文件处于同一个文件夹下,文件夹命名规则为 topic - 分区号,比如 topic king 含有三个分区,那文件夹名分别为 king-0,king-1,king-2。每个 segment 包含两个文件:
- log 文件:存储消息
- index 文件:存储索引信息
每个 .index 索引文件对应一个 .log 消息文件,两者都以 .log 文件中第一条消息的 offset 命名。索引文件记录消息的 offset 和物理地址偏移量,log 文件记录具体消息内容
当我们通过 offset 查找消息时,首先根据索引文件名确定要检索的文件,然后在对应文件中通过二分查找找到对应消息的偏移量,根据偏移量去 .log 文件取得具体消息内容
举个例子:假设现在 topic king 第一个 partition 包含两个 segment,segment 文件名如下:
00000000000000000001.index
00000000000000000001.log
00000000000000000021.index
00000000000000000021.log
当查询 offset 为 20 的消息时,由于第二个索引文件从 offset 21 开始,只能去第一个索引文件查找,在 00000000000000000001.index 索引文件中根据二分查找找到对应偏移量,在去 00000000000000000001.log 根据偏移量获取消息内容
Kafka 消息应答机制
Kafka 生产者将消息封装为 ProducerRecord 对象,该对象核心属性如下:
topic:消息类型
partition(可选):消息分区
offset(可选):每条消息的唯一 id
key(可选):string 类型
value(可选):string 类型
每次发出消息时,具体将消息发送到哪个 partition 满足以下规则:
- 如果指定 partition,直接发送到对应 partition
- 没有指定 partition 但包含 key,将 key 的哈希值和 partition 个数取余得到
- partition 和 key 都没有,第一次随机生成整数(以后每次基于该整数自增)将这个整数和 partition 个数取余得到
为了保证 producer 发送的数据能到达指定 partition,partition 在收到 producer 发送的数据后,需要向 producer 返回 ack(acknowledgement)。producer 只有收到 ack 后才会进行下一轮发送,否则重新发送
由于每个 partition 分 leader 节点和 follower 节点,这里返回 ack 的时机常分以下三种:
- leader 收到消息就返回 ack
- 半数以上的 follower 同步完成才返回 ack
- 所有 follower 同步完成后返回 ack
对于收到消息就返回 ack,优点是效率极高,但存在极大可能丢数据:如果 leader 节点还没有将新增的数据同步到 follower 节点发生宕机,这部分数据直接丢失。
对于所有 follower 同步完成后返回 ack,优点是安全性高,一般不存在丢数据的可能性,缺点是效率较低,producer 每次发数据需要等到所有 follower 节点同步完成。
半数以上的 follower 同步完成返回 ack 属于折中方案,优点是延时低,缺点在于选举新的 leader 节点时,容忍 n 台节点的故障需要 2 * n + 1 个 follower 节点(选举 leader 需要半数以上同意)
Kafka 选择方案三,这种方案容器利用率更高,虽然可能带来些许网络延迟,但网络延迟对于 Kafka 性能影响较小
Kafka 同步副本集机制
因为返回 ack 需要所有 follower 节点同步完数据,假如某个 follower 同步数据失败怎么办?是不是永远不会返回 ack 了?答案是否定的,为了解决该问题,Kakfa 引入 ISR(同步副本集)机制:
Kafka 在每个 leader 节点中维护动态的 ISR 保存目前与 leader 保持同步的 follower 节点集合,如果 ISR 中所有 follower 节点同步完数据就返回 ack,如果某个 follower 节点长期没有同步完成,就把它踢出 ISR 集合,这里根据参数 replica.lag.time.max.ms 控制最大同步时间。当 leader 节点发生故障后,从 ISR 的 follower 节点中选举出新的 leader
由于不同消息的重要性不同,对于可靠性要求不高的数据,完全没必要等到 ISR 中所有 follower 节点同步完成。为了支持不同纬度的消息优先级,Kafka 为用户提供了三种可靠性级别,用户可以根据重要性和延时选择不同的配置,其中 ack 参数配置如下:
- -1:leader 和 ISR 中所有 follower 全部落盘返回 ack,延时最高,可能造成数据重复(follower 同步完成,leader 返回 ack 前故障,producer 认为没收到重发消息)
- 0:producer 不等待 broker 的 ack,broker 接收到还未写入磁盘就返回,延时最低,可靠性最差
- 1:leader 节点成功落盘就返回 ack,如果在 follower 同步完成之前 leader 故障,丢失数据
为了保证 leader 和 ISR 中 follower 节点数据一致,Kafka 提出如下解决方案:
如果 follower 节点出现故障,被临时踢出 ISR,等待 follower 节点恢复后,follower 读取本地磁盘记录的 HW 值,并将日志文件中高于HW 的部分裁掉,从 HW 开始从 leader 同步数据。当自身的 LEO 大于整个 partition 的 HW 值时,表示该 follower 追上 leader 节点,重新加入 ISR
如果 leader 节点出现故障,从 ISR 中重新选举新的 leader 节点。为了保证数据一致:其余 follower 将各自截取掉高于自身 HW 的数据,从新的 leader 节点重新同步
然而上述机制只能保证各节点间数据同步,并不能解决数据丢失或数据重复等问题。ack 设置 -1,每条数据至少发送一次,可能造成数据重复,只能保证 At Least Once、ack 设置 0,每条数据至多发送一次,可能造成数据丢失,可以保证 At Most Once,都无法保证 Exactly Once
Kafka 0.11 之前,对于 At Least Once,只能在下游消费者端实现去重逻辑。在 Kafka 0.11 版本引入幂等性的概念:producer 无论向 server 发送了多少次重复数据,Server 端只会持久化一条。幂等性再结合 At Least Once 就可以保证数据不丢失也不重复
通过在 producer 端配置 enable.idempotence=true 开启幂等性,开启幂等性的 producer 在初始化时分配 PID,所有发送同一个 partition 的消息会附带 Sequence Number,而 broker 端会记录 <PID,partition,SeqNumber> 缓存,对于具有相同主键的消息,broker 只会持久化一条
kafka 消费策略
Kafka 消费者采用 pull 的方式主动从 partition 拉取消息,而不是 partition 主动 push 消息。这样做的好处在于让流量在 customer 端控制,防止 broker 主动推将 customer 推挂
当然 pull 的方式也是有缺陷的:当 Kafka 中没有消息时,customer 需要无限循环从 Kafka 拉取消息。为了解决 customer 一直死循环的情况,Kafka 引入 timeout 机:如果没有消息可消费,customer 会等待一段时间再返回。这样又会导致消息没法及时处理
kafka customer 消息分发策略有以下两种:
- Round-Robin
- Range
Round-Robin 采用轮询的思想:根据字典顺序对 topic 不同的 partition 排序,同时对 customer 排序、之后对 partition 进行循环,遇到订阅自己的消费者就消费,否则向下轮询下一个消费者。也就是根据 partition 轮询 customer,找到就向下轮询新的 partition。
举个例子:假设现在存在三个 topic a、b,c,a 包含一个分区 a-0、b 包含两个分区 b-0,b-1、c 包含三个分区 c-0、c-1、c-2。存在三个消费者 x1,x2,x3,x1 订阅 a、b,x2 订阅 b、c,x3 订阅 a、b、c。根据 Round-Robin 规则,a-0 首先发现 x1 可以消费,b-0 和 b-1 发现 x1 可以消费,c-0、c-1、c-2 发送 x2 可以消费。最终结果:x1 消费 a-0、b-0、b-1,x2 消费 c-0、c-1、c-2,x3 啥都不消费
Round-Robin 会导致各个 customer 承受的压力不同,位于上方的 customer 订阅的 topic 越多,承担的压力越大。位于下方的消费者即使订阅了很多 topic,然而各个分区可能再上面的 customer 已分配完毕,承担的压力相对较少
Range 采用重分配策略,首先计算每个 Customer 将会承担的 partition 的数量,然后将指定数量的 partition 分配给对应 customer
Range 虽然也可能造成前面的 customer 压力更大(除不尽时),但是相比 Round-Robin 而言还算相对均匀
Zookeeper 在 Kafka 中的作用
在 Kafka 0.9 版本前,customer 默认将 offset 信息保存在 Zookeeper。0.9 版本之后,为了削弱 zookeeper 在 kafka 整个集群中的作用,customer 默认将 offset 保存在 Kafka 内置的 topic 中。默认情况下消息不采用广播,同一条消息,只会被组内某一个消费者消费一次
目前 Zookeeper 主要负责 Kafka 集群中 Controller 节点的选举工作,也就是说 Kafka 在削弱 Zookeeper 的作用,但目前仍离不开 Zookeeper
Controller 节点:所有的 Kafka Broker 节点启动时会一起去 Zookeeper 上注册一个临时节点,因为只有一个 Kafka Broker会注册成功,其他的都会失败,所以这个成功在 Zookeeper 上注册临时节点的这个 Kafka Broke r会成为 Kafka Broker Controller,其他的 Kafka broker 叫 Kafka Broker follower。
Controller 监视期间,一旦某一个 broker 宕机了,这个 controller 会读取该宕机 broker 上所有的 partition 在 zookeeper 上的状态,并选取 ISR 列表中的一个 replica 作为 partition leader
- 如果 ISR 列表中的 replica 全挂,选一个幸存的 replica 作为 leader
- 如果该 partition 的所有的 replica 都宕机了,则将新的 leader 设置为-1,等待任意一个 replica 恢复,并将它置为 leader
Kafka 常见问题及解决思路
kafka 吞吐量为什么高?
- Kafka 采用分布式架构,可以通过多 partition 的方式提高并发度,吞吐量自然提高
- 生产者每次写消息时,总是顺序的写入 partition 文件末尾,由于顺序读写的原因,省略掉磁头寻址损耗,效率更高
- Kafka 使用操作系统自身页缓存(Page Cache)优化读写能力
- Kafka 消费者拉取数据时使用零拷贝技术,极大的提高了拉取消息的性能
- kafka partition 有序队列天然分块(segment),并且 segment 使用索引记录每条消息的偏移量,进一步提高查询效率
- Kafka 支持消息批处理,避免频繁单次读取或写入所带来的带宽开销
- Kafka 生产者消费者都只操作主分区,无需考虑主从延时所带来的损耗
kafka 客户端如何找到 leader 分区?
既然 Kafka 生产者、消费者都只操作 leader 分区,所以在每次写消息或者从 Kafka 读消息的时候,必须先找到对应 Leader 及其所在 Broker 地址,这样才可以进行后续的操作。
对于此,Kafka 内部实现了一套基于 TCP 的协议,这里我们主要看 Metadata 协议,该协议主要解决以下问题:
- Kafka 中存在哪些主题?
- 每个主题有几个分区?
- Leader 分区所在的 broker 地址及端口?
- 每个 broker 的地址及端口是多少?
客户端只需要构造相应的请求,并发送到 Broker 端,即可拿到每个 topic 及 partition leader 节点所在地址及端口信息。为了支持不同的编程语言使用 Kafka,Kafka 客户端支持多种语言,包括:C/C++、PHP、Go 和 Ruby 等
目前 Metadata 协议是唯一一个可以向任何 Broker 发送的协议,因为每个 Broker 在启动之后都会存储 Metadata 信息。客户端在拿到这些数据后,会缓存在内存中,无需客户端每次发送或者拉取消息前都获取最新 leader 分区地址
为了防止 broker 宕机重新选主导致 Metadata 信息刷新,当出现以下两种情况时客户端重新发送请求:
- 在
meta‐data.max.age.ms参数配置的时间过期之后; - 在往Kafka发送请求是收到 Not a Leader 异常。
这样就可以保证 Kafka 在不需要协调节点的情况下,客户端每次操作总在 leader 节点,提高性能
kafka 如何保证消息不被重复消费,重复消费时又如何保证幂等性?
Kafka 在每条消息写入队列时会给这条数据分配一个序号 offset,consumer 在消费数据时,定期会把自己消费过的 offset 提交一下,告诉消息队列:我已经消费过序号为 X 的 MQ 了,下次从序号大于 X 的最小数据开始消费,Kafka 内部通过这种方式保证 MQ 不被重复消费
然而在这个大前提下仍存在以下两种情况导致重复消费:
- 生产者重复发送,kafka 本身保存了两条一模一样的消息
- 消费者处理完毕后还没来得及上报 offset 就宕机,下次重启时重复消费
对于这两种场景,我们都可以采用在消费端业务逻辑中增加额外保存逻辑解决重复消费问题:
- 如果消费 MQ 写数据库,可以在每次插入前判断一下是否已经插入,如果是就执行 UPDATE 操作
- 如果消费 MQ 写 Redis,不用处理,redis set 操作天然覆盖原数据
- 如果场景比较复杂,可以让生产者对每通消息增加唯一 ID,每次消费 MQ 后将该 ID 保存在缓存中,每次消费 MQ 前判断该 ID 是否被消费
kafka 如何保证消息的可靠性传输?
所谓可靠性传输是指:不能丢数据。常见的丢数据场景有:
- 生产者生产时丢数据
- 消息队列本身丢数据
- 消费者消费时丢数据
对于生产者生产时丢数据,Kafka 可以通过以下两个参数确保生产者传送时不丢失数据:
- acks=all:每条数据写入所有 leader 以及 follower 才会返回 ack
- retries=MAX:确保每次写入失败后,无限重试
对于消息队列本身丢数据,Kafka 自身丢数据主要集中在某个 broker 突然宕机,此时该 broker 上的 leader partition 还没有完全同步到 follower,后续重新选主出的 leader 就会丢失一部分数据。对于这种常见可以通过以下两个配置解决:
- topic 设置 replication.factor,值大于1,要求每个 partition 至少有两个副本
- 给 kafka 设置 min.insync.replicas,值大于1,保证一个 leader 至少感知到一个 follower 跟自己保持一致
对于消费者消费时丢失,关闭 kafka 消费端自动提交 offset,采用手动提交即可。
采用手动提交 offset 时,对于批量处理消息场景,需要保证每次提交 offset 前这一批消息全部处理成功。在 Java 代码中通常使用 CountDownLatch 保证所有消息都处理成功后再提交 offset,一旦有消息处理失败,提交当前处理成功最小的 offset,并且在代码中添加逻辑保证幂等性即可
kafka 如何保证消息的有序性
大多数场景下,消息队列是不要求有序的。只有部分后续 MQ 处理依赖前驱 MQ 数据的场景才需要有序,这里保证有序主要从两个方面展开:
- 消息队列层面
- 消费者处理逻辑层面
对于消息队列层面,Kafka 只需生产者在生产消息时增加关键字 key 即可。通过该关键字保证相同类型的消息分配到同一个 partition 上,每个 partition 可以保证同一组消费者最多只有一个 consumer 连接,保证消息在消费时有序
通常情况下我们使用线程池批量消费消息,由于线程池中的线程执行状态无法预知,我们无法保证消息能够按照提交给线程池的顺序依次完成,对于这种情况有以下几种解决方案:
- 消费端统一单线程消费
- 消费者处理时可以定义多个队列,每个队列对应一种 key 类型的消息。借助 kafka 生产者分配的思路,想同一组消息依次交给同一个线程池
kafka 线上消息积压如何解决
对于线上消息挤压,通常可以采用以下几种思路进行优化:
- 排查消息处理逻辑,优化代码,从处理速度层面解决问题
- 增加消费者线程池配置,提高消费者工作线程
- 临时增加消费者,但是得保证目前消费者数小于 partition 分区数量
- 临时放弃消息,首先保证线上最新消息能够及时处理