Skip to content

Commit 3bc2df7

Browse files
authored
KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354)
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Boyang Chen <[email protected]>
1 parent ad17ea1 commit 3bc2df7

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ private synchronized void resetStateAndRejoin() {
948948
synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
949949
log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
950950

951-
resetState();
951+
resetStateAndRejoin();
952952
}
953953

954954
synchronized void resetGenerationOnLeaveGroup() {

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import java.util.concurrent.TimeUnit;
121121
import java.util.concurrent.TimeoutException;
122122
import java.util.concurrent.atomic.AtomicBoolean;
123+
import java.util.concurrent.atomic.AtomicInteger;
123124
import java.util.concurrent.atomic.AtomicReference;
124125
import java.util.regex.Pattern;
125126
import java.util.stream.Collectors;
@@ -1842,7 +1843,7 @@ public void testRebalanceException() {
18421843
}
18431844

18441845
@Test
1845-
public void testReturnRecordsDuringRebalance() {
1846+
public void testReturnRecordsDuringRebalance() throws InterruptedException {
18461847
Time time = new MockTime(1L);
18471848
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
18481849
ConsumerMetadata metadata = createMetadata(subscription);
@@ -1857,15 +1858,13 @@ public void testReturnRecordsDuringRebalance() {
18571858
Node node = metadata.fetch().nodes().get(0);
18581859
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
18591860

1860-
// a first poll with zero millisecond would not complete the rebalance
1861-
consumer.poll(Duration.ZERO);
1861+
// a poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
1862+
TestUtils.waitForCondition(() -> {
1863+
consumer.poll(Duration.ofMillis(100L));
1864+
return consumer.assignment().equals(Utils.mkSet(tp0, t2p0));
1865+
}, "Does not complete rebalance in time");
18621866

18631867
assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
1864-
assertEquals(Collections.emptySet(), consumer.assignment());
1865-
1866-
// a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
1867-
consumer.poll(Duration.ofMillis(100L));
1868-
18691868
assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
18701869

18711870
// prepare a response of the outstanding fetch so that we have data available on the next poll
@@ -1918,7 +1917,6 @@ public void testReturnRecordsDuringRebalance() {
19181917

19191918
// mock rebalance responses
19201919
client.respondFrom(joinGroupFollowerResponse(assignor, 2, "memberId", "leaderId", Errors.NONE), coordinator);
1921-
client.prepareResponseFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator);
19221920

19231921
// we need to poll 1) for getting the join response, and then send the sync request;
19241922
// 2) for getting the sync response
@@ -1934,12 +1932,19 @@ public void testReturnRecordsDuringRebalance() {
19341932
fetches1.put(tp0, new FetchInfo(3, 1));
19351933
client.respondFrom(fetchResponse(fetches1), node);
19361934

1937-
records = consumer.poll(Duration.ZERO);
1935+
// now complete the rebalance
1936+
client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator);
1937+
1938+
AtomicInteger count = new AtomicInteger(0);
1939+
TestUtils.waitForCondition(() -> {
1940+
ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L));
1941+
return consumer.assignment().equals(Utils.mkSet(tp0, t3p0)) && count.addAndGet(recs.count()) == 1;
1942+
1943+
}, "Does not complete rebalance in time");
19381944

19391945
// should have t3 but not sent yet the t3 records
19401946
assertEquals(Utils.mkSet(topic, topic3), consumer.subscription());
19411947
assertEquals(Utils.mkSet(tp0, t3p0), consumer.assignment());
1942-
assertEquals(1, records.count());
19431948
assertEquals(4L, consumer.position(tp0));
19441949
assertEquals(0L, consumer.position(t3p0));
19451950

@@ -1948,10 +1953,13 @@ public void testReturnRecordsDuringRebalance() {
19481953
fetches1.put(t3p0, new FetchInfo(0, 100));
19491954
client.respondFrom(fetchResponse(fetches1), node);
19501955

1951-
records = consumer.poll(Duration.ZERO);
1956+
count.set(0);
1957+
TestUtils.waitForCondition(() -> {
1958+
ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L));
1959+
return count.addAndGet(recs.count()) == 101;
1960+
1961+
}, "Does not complete rebalance in time");
19521962

1953-
// should have t3 but not sent yet the t3 records
1954-
assertEquals(101, records.count());
19551963
assertEquals(5L, consumer.position(tp0));
19561964
assertEquals(100L, consumer.position(t3p0));
19571965

0 commit comments

Comments
 (0)