kafka的rebalance

zz/2023/6/4 15:40:09

rebalance的出现

订阅Topic的分区数发生变化

简单地说,就是之前 topic 有 10 个分区,现在变成了 20 个,那么多出来的 10 个分区的数据就没人消费了。那么此时就需要进行重平衡,将新增的 10 个分区分给消费组内的消费者进行消费。所以在这个情况下,会发生重平衡。

订阅的Topic个数发生变化

简单地说,一个 consumer group 如果之前只订阅了 A topic,那么其组内的 consumer 知会消费 A topic 的消息。而如果现在新增订阅了 B topic,那么 kafka 就需要把 B topic 的 partition 分配给组内的 consumer 进行消费。这个分配的过程,其实也是一个 rebalance 的过程。

消费组内成员个数发生变化

下面我们一起分析一下消费组内成员个数发生变化的3种情况:

新成员加入
组成员主动离开
组成员崩溃

新成员加入

新成员入组是指组处于 Stable 状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。现在,我用一张时序图来说明协调者一端是如何处理新成员入组的。

img

组成员主动离开

何谓主动离组?就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员,因此我就不再赘述了,还是直接用一张图来说明。

img

组成员崩溃

崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。

也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的,我们来看看下面这张图。

img

注意:

正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。还是老办法,我们使用一张图来说明。

img

rebalance问题处理思路

前面我们讲过 rebalance 一般会有 3 种情况。对于新成员加入、组成员主动离开都是我们主动触发的,能比较好地控制。但是组成员崩溃则是我们预料不到的,遇到问题的时候也比较不好排查。但对于组成员崩溃也是有一些通用的排查思路。

新成员加入

组成员主动离开

组成员崩溃

要学会处理 rebalance 问题,我们需要先搞清楚 kafaka 消费者配置的四个参数:

  • session.timeout.ms 设置了超时时间
  • heartbeat.interval.ms 心跳时间间隔
  • max.poll.interval.ms 每次消费的处理时间
  • max.poll.records 每次消费的消息数

session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。

heartbeat.interval.ms 表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。

max.poll.interval.ms 表示 consumer 每两次 poll 消息的时间间隔。简单地说,其实就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么市场就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。

max.poll.records 表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。

简单来说,会导致崩溃的几个点是:

  • 消费者心跳超时,导致 rebalance。
  • 消费者处理时间过长,导致 rebalance。

消费者心跳超时

我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。

而 kafka 的消费者参数设置中,跟心跳相关的两个参数为:

  • session.timeout.ms 设置了超时时间
  • heartbeat.interval.ms 心跳时间间隔

这时候需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持心跳。一般来说,超时时间应该是心跳间隔的 3 倍时间。即 session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。为什么要这么设置超时时间应该是心跳间隔的 3 倍时间?因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。

消费者处理时间过长

如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起重平衡。而 kafka 的消费者参数设置中,跟消费处理的两个参数为:

  • max.poll.interval.ms 每次消费的处理时间
  • max.poll.records 每次消费的消息数

对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。

除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,其是单线程去消费消息和执行心跳的,如果线程卡在处理消息,那么这时候即使到时间要心跳了,还是没有线程可以去执行心跳操作。很多同学在处理问题的时候,明明设置了很长的 session.timeout.ms 时间,但最终还是心跳超时了,就是因为没有处理好这两个参数的关联。

对于 rebalance 类问题,简单总结就是:处理好心跳超时问题和消费处理超时问题

  • 对于心跳超时问题。一般是调高心跳超时时间(session.timeout.ms),调整超时时间(session.timeout.ms)和心跳间隔时间(heartbeat.interval.ms)的比例。阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
  • 对于消费处理超时问题。一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)。阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力(records < 单个线程每秒消费的条数 x 消费线程的个数 x session.timeout的秒数)。
http://www.ngui.cc/zz/2389885.html

相关文章

hadoop rebalance

之前一直没做过rebalance&#xff0c;以为速度很快&#xff0c;结果大意了&#xff0c;等到磁盘达到90%的时候&#xff0c;才开始做rebalance。 默认的从日志中可以看到总共需要迁移1.89T&#xff0c;但是每次只移动40G大小的量。 然后查看40G的数据量从15:45分到15:48分&#…

【Kafka】Kafka的Rebalance机制可能造成的影响及解决方案

一、kafka的rebalance机制在Kafka中&#xff0c;当有新消费者加入或者订阅的Topic数发生变化时&#xff0c;会触发Rebalance(再均衡&#xff1a;在同一个消费者组当中&#xff0c;分区的所有权从一个消费者转移到另外一个消费者)机制&#xff0c;Rebalance顾名思义就是重新均衡…

第6章 Rebalance详解

rebalance本质是一种协议&#xff0c;规定了一个consumer group下的所有consumer如何达成一致来分配订阅的topic的每个分区。比如某个group下有20个consumer&#xff0c;它订阅了一个具有100个分区的topic。正常情况下&#xff0c;Kafka平均会为每个consumer分配5个分区。这个分…

电话机器人核心技术之NLP

什么是NLPNLP是神经语言程序学 (Neuro-Linguistic Programming) 的英文缩写; NLP译为“身心语法程式学”或“神经语言程序学”。N&#xff08;Neuro&#xff09;指神经系统&#xff0c;意译为身心。指我们比较稳定的身心素质&#xff0c;结构及比较逸动的身心状态。L&#xff0…

产品用户-缩写

tts : text to speech 语音合成 IVR&#xff1a;(Interactive Voice Response)即互动式语音应答&#xff0c;您只须用电话即可进入服务中心&#xff0c;可以根据操作提示收听手机娱乐产品&#xff0c;也可以根据用户输入的内容播放有关的信息。 CTI&#xff1a;一般指计算机电…

java 常用缩写简表_java 常用缩写

java EE -->Java Platform Enterprise Edition //平台企业版此版本中主要包括如下技术&#xff1a;EJB 3.0、Java Persistance Architecture (JPA)、Web 服务、JAX-WS。.包含J2SE 中的类&#xff0c;并且还包含用于开发企业级应用的类。比如&#xff1a;EJB、servlet、JS…

Android--控件的单位(px,pt,dp,sp)

px&#xff1a;代表像素&#xff0c;即在屏幕中可显示的最小单位元素&#xff0c;应用程序中任何控件都是由一个像素点组成的&#xff0c;分辨率越高的手机&#xff0c;屏幕的像素点就越多。因此&#xff0c;如果使用px控制控件的大小&#xff0c;在分辨率不同的手机控件显示的…

web前端常用长度单位(px,em,rem,pt)

px像素&#xff08;Pixel&#xff09;。相对长度单位。像素px是相对于显示器屏幕分辨率而言的。 em是相对长度单位。相对于当前对象内文本的字体尺寸。如当前对行内文本的字体尺寸未被人为设置&#xff0c;则相对于浏览器的默认字体尺寸&#xff0c;最初是指字母M的宽度&…

css font size 单位,css中font-size的单位总结:px、em、pt

px:基于像素的单位.像素是一种有用的单位,因为在任何媒体上都可以保证一个像素的差别确实是可见的.em :一般用来测量长度的通用单位(例如元素周转的页边空白和填充),当用于指定字体大小时,em单位是指父元素的字体大小.比如这里的字是24px如果使用em来指定填充,填充的宽度是相对…

探究px像素与pt磅,mm毫米之间的换算

文章目录DPI(dots per inch)dpcm&#xff08;dots per centimeter&#xff09;dppx&#xff08;dots per px 别名为&#xff1a;x&#xff09;px像素(Pixel)emrem (font size of the root element)in英寸(Inch)pt磅(Point)mm毫米(Millimeter)cm厘米(Centimeter)pt与px换算公式m…