KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018
Open
brandboat wants to merge 1 commit intoapache:trunkfrom
Open
KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018brandboat wants to merge 1 commit intoapache:trunkfrom
brandboat wants to merge 1 commit intoapache:trunkfrom
Conversation
…syncKafkaConsumer
JiayaoS
reviewed
Apr 10, 2026
Comment on lines
245
to
261
| * <p>Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure | ||
| * our poll timer will not expire while we are polling. | ||
| * | ||
| * <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat | ||
| * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. | ||
| * <p>In the event that heartbeats are currently being skipped (e.g., the member is in | ||
| * {@link MemberState#UNSUBSCRIBED} when using manual assignment), this returns {@code Long.MAX_VALUE} | ||
| * to indicate there is no next heartbeat to wait for, allowing the application thread to block for | ||
| * the full user-specified poll timeout rather than spinning in a busy loop. | ||
| */ | ||
| @Override | ||
| public long maximumTimeToWait(long currentTimeMs) { | ||
| pollTimer.update(currentTimeMs); | ||
| if (membershipManager().shouldSkipHeartbeat()) { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) { | ||
| return 0L; | ||
| } |
Contributor
There was a problem hiding this comment.
I was wondering if we should handle the pollTimer.isExpired() case here, as shouldSkipHeartbeat() being true would cause the logic to return Long.MAX_VALUE even if the timer has expired. However, I noticed that the callers use Math.min() with timer.remainingMs(), so it won't impact production. Ive also run the unit tests locally and everything looks solid.
Contributor
|
I wrote a local test case for this. @Test
public void testAssignBusyLoop() {
time = Time.SYSTEM;
consumer = newConsumer();
TopicPartition tp = new TopicPartition("foo", 3);
AtomicInteger loopCount = new AtomicInteger(0);
doAnswer(invocation -> {
loopCount.incrementAndGet();
return Fetch.empty();
}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ofSeconds(1));
assertTrue(loopCount.get() < 300,
"Busy loop detected! loopCount:" + loopCount.get());
}Unit Test Output: Proves that it indeed causes a busy loop. Subsequently, I used |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When a consumer uses manual assignment (assign()) instead of group
subscription, the member remains in the UNSUBSCRIBED state. In this
state, heartbeats are skipped.
Previously, because the heartbeat interval was initialized to 0, the
maximumTimeToWaitcalculation would return 0 when heartbeats wereskipped. This caused
pollForFetchesto return immediately and enter abusy-loop, consuming excessive CPU.
This patch fixes the issue by ensuring
maximumTimeToWaitreturnsLong.MAX_VALUE whenever
shouldSkipHeartbeat()is true.