kafka rebalance 总结(更新中)

发布时间 2023-07-26 16:45:08作者: 偶尔发呆

KAFKA 2.3 以后,consumer 分为 dynamic 和 static,以是否设置了 group.instance.id 属性区分。

以默认的 consumer 为例,即 dynamic consumer,以下图描述其正常的生命周期:

依赖 FindCoordinator, JoinGroup, SyncGroup, Heatbeat, LeaveGroup 等接口,kafka consumer  和 broker 联合完成了 group 管理。

协议命令表:

命令 consumer broker
FindCoordinator consumer 启动时,从配置的 bootstrap server 中选择一个负载较小的 broker,向其发送查找 coordinator 的请求  
JoinGroup

consumer 查询到 coordinator 后,开始加入组,join group 时携带了客户端的配置信息,具体内容参见 JoinGroupRequestData

 
SyncGroup

consumer 接收到 join group 响应后,如果当前实例分配到了 leader,则按照分配策略分配分区给各消费者,并将分配结果以 SyncGroup 请求发送给 broker。如果当前实例非 leader,则发送一个不带内容的 SyncGroup 请求。具体内容参见 SyncGroupRequestData

 
Heatbeat  从某种程度上看,一次 poll 算是一次心跳,poll 时会更新心跳计时器的时刻,如果 consumer 一直没有 poll 动作,HeartbeatThread 则自己发起心跳请求  
LeaveGroup    

 

配置信息表:

属性 默认值 别名 consumer broker
group.id   groupId    
group.instance.id   groupInstanceId 如果提供了值, 则为 static consumer,当 consumer close 时,不会主动 leave group。可避免滚动发布时,频繁的 rebalance  
max.poll.interval.ms 300000 rebalanceTimeoutMs HeartbeatThread 检查 consumer 两次 poll 的时间间隔,如果超过了配置值,则主动 leave group  
session.timeout.ms 10000 sessionTimeoutMs HeartbeatThread 检查 session 超过了配置的值,则标记 coordinator 为 unkonwn,并主动断开连接,重新开始下一轮 group 流程,即 rebalance  
heartbeat.interval.ms 3000 heartbeatIntervalMs 以默认值为例,假定 consumer 在没有 poll 行为且还没超过 rebalance 间隔时,HeartbeatThread 每隔 3 秒向 broker 发送一次心跳,成功接收到心跳响应后,则更新 seesion 的时刻  
partition.assignment.strategy RangeAssignor   consumer 在发送 join group 请求时,会携带分配的策略  

梳理完上面的表格,得出结论:在 HeartbeatThread 无限循环中, consumer 利用心跳来维持 session,当 session 过期时触发 rebalance,当 poll 时间过期时,触发 rebalance

分析 2 种常见的异常情况:
1. consumer 发送心跳,由于网络原因,或者 coordinator broker 宕机,consumer 一直没有接收到心跳响应,则 session 随之会过期,会触发重新加入组,即 reblance
2. consumer 和 coordinator 心跳正常,但是 consumer 一直没有 poll 动作,此时 consumer 会主动离开 group