Skip to content

KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018

Open
brandboat wants to merge 1 commit intoapache:trunkfrom
brandboat:KAFKA-20426
Open

KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018
brandboat wants to merge 1 commit intoapache:trunkfrom
brandboat:KAFKA-20426

Conversation

@brandboat
Copy link
Copy Markdown
Member

@brandboat brandboat commented Apr 10, 2026

When a consumer uses manual assignment (assign()) instead of group
subscription, the member remains in the UNSUBSCRIBED state. In this
state, heartbeats are skipped.

Previously, because the heartbeat interval was initialized to 0, the
maximumTimeToWait calculation would return 0 when heartbeats were
skipped. This caused pollForFetches to return immediately and enter a
busy-loop, consuming excessive CPU.

This patch fixes the issue by ensuring maximumTimeToWait returns
Long.MAX_VALUE whenever shouldSkipHeartbeat() is true.

Copy link
Copy Markdown
Contributor

@JiayaoS JiayaoS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM.

Comment on lines 245 to 261
* <p>Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive.
* <p>In the event that heartbeats are currently being skipped (e.g., the member is in
* {@link MemberState#UNSUBSCRIBED} when using manual assignment), this returns {@code Long.MAX_VALUE}
* to indicate there is no next heartbeat to wait for, allowing the application thread to block for
* the full user-specified poll timeout rather than spinning in a busy loop.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
pollTimer.update(currentTimeMs);
if (membershipManager().shouldSkipHeartbeat()) {
return Long.MAX_VALUE;
}
if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) {
return 0L;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we should handle the pollTimer.isExpired() case here, as shouldSkipHeartbeat() being true would cause the logic to return Long.MAX_VALUE even if the timer has expired. However, I noticed that the callers use Math.min() with timer.remainingMs(), so it won't impact production. Ive also run the unit tests locally and everything looks solid.

@brandboat brandboat changed the title [WIP] KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer Apr 10, 2026
@JiayaoS
Copy link
Copy Markdown
Contributor

JiayaoS commented Apr 10, 2026

I wrote a local test case for this.

@Test
    public void testAssignBusyLoop() {
        time = Time.SYSTEM;
        consumer = newConsumer();
        TopicPartition tp = new TopicPartition("foo", 3);
        AtomicInteger loopCount = new AtomicInteger(0);
        doAnswer(invocation -> {
            loopCount.incrementAndGet();
            return Fetch.empty();
        }).when(fetchCollector).collectFetch(any(FetchBuffer.class));
        doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
        completeAssignmentChangeEventSuccessfully();
        consumer.assign(singleton(tp));
        completeAsyncPollEventSuccessfully();
        consumer.poll(Duration.ofSeconds(1));
        assertTrue(loopCount.get() < 300,
                "Busy loop detected! loopCount:" + loopCount.get());
    }

Unit Test Output: Busy loop detected! loopCount:78668

Proves that it indeed causes a busy loop.

Subsequently, I used
doAnswer(invocation -> Long.MAX_VALUE).when(applicationEventHandler).maximumTimeToWait();
to mock of maximumTimeToWait() return Long.MAX_VALUE, and it worked perfectly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants