Kafka高低版本的心跳(heartbeats)和会话(session)超时机制

问题:

我们用的是kafka0.10.2.1的版本,

#kafka.bootstrap.servers=
kafka.bootstrap.servers=
kafka.group.id=
kafka.topic=
kafka.concurrency=5
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.auto.offset.reset=latest
kafka.enable.auto.commit=true
kafka.auto.commit.interval=100
kafka.max.poll.records=20
kafka.session.timeout.ms=30000
kafka.security.protocol=SASL_PLAINTEXT
kafka.sasl.mechanism=PLAIN

但是会经常出现kafka消费者重新分配的现象


Kafka高低版本的心跳(heartbeats)和会话(session)超时机制

Kafka 0.10.0.0 & Kafka 0.10.1.0

0.10.0.0 的心跳和超时机制:
心跳(heartbeats)与轮询(poll)是耦合在一起的,只提供了 session.timeout.ms 参数,没有独立的的控制 poll 轮询的参数。
假设消费者处理消息需要1分钟,则需要将 session.timeout.ms 设置大于1分钟才行,否则消费者会超时。

session.timeout.ms
The timeout used to detect failures when using Kafka's group management facilities. 
When a consumer's heartbeat is not received within the session timeout, the broker will mark the consumer as failed and rebalance the group. 
Since heartbeats are sent only when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's poll loop at the cost of a longer time to detect hard failures. 
See also max.poll.records for another option to control the processing time in the poll loop. 
Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
官方提到还可以通过 max.poll.records 参数从另外一个维度来控制影响每次 poll 的时间。

heartbeat.interval.ms
The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. 
Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. 
The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. 
It can be adjusted even lower to control the expected time for normal rebalances.


0.10.1.0 的心跳机制:
从该版本开始,heartbeats 与 poll 解耦,每个线程有独立的心跳维护机制。
从该版本开始新增了独立的 max.poll.interval.ms 参数。这样可以单独配置两次 poll 轮训的间隔时间,这就使得可以配置 poll 轮训间隔时间大于 heartbeats 心跳间隔,即消费者处理消息的时间可以独立配置,允许消息处理时间大于心跳时间(会话超时时间 session.timeout.ms)。
session.timeout.ms 用于心跳维护线程,max.poll.interval.ms 用于消费处理线程。该版本存在两个独立的线程。

假设 session.timeout.ms = 30000,即30秒,则消费者心跳线程必须在此超时之前向服务端发送心跳。
另一方面,如果单个消息处理需要1分钟,则可以将 max.poll.interval.ms 设置大于1分钟,以便为消费处理线程提供更多的时间来处理消息。
否则,如果 max.poll.interval.ms < 1分钟,会导致单个消息处理完、等下次 poll 的时候,因为两次 poll 超出了 max.poll.interval.ms 而导致 poll 失败(即使 session 未超时,poll 还是会失败)。

如果处理(poll)线程挂掉,服务端可以通过 max.poll.interval.ms 来检测到。
如果整个消费者(Consumer)挂掉,则只能通过 session.timeout.ms 来检测到。


0.10.1.0 的重大修改:
The new Java Consumer now supports heartbeating from a background thread. 
There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). 
The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. 
Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.


0.10.1.0 版本的官方说明(http://kafka.apache.org/0101/documentation.html
max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

session.timeout.ms
The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

request.timeout.ms
The configuration controls the maximum amount of time the client will wait for the response of a request. 
If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
 

另外注意:

0.10.2.1 版本的默认参数(max.poll.interval.ms)调整为Integer.MAX_VALUE

http://kafka.apache.org/0102/documentation.html

Notable changes in 0.10.2.1
The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. 
The internal Kafka Streams producer retries default value was changed from 0 to 10. 
The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE.

猜你喜欢

转载自blog.csdn.net/qq_32907195/article/details/112801258