Skip to content

Commit 52ab0ad

Browse files
C0uranterhauch
authored andcommitted
KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once (#8973)
Add logic to reset the existing `canReadConfigs` in `DistributedHerder` once the herder is able to successfully read the configs again. Added unit test to verify the functionality. Author: Chris Egerton <[email protected]> Reviewer: Nigel Liang <[email protected]>, Randall Hauch <[email protected]>
1 parent e3e6e5f commit 52ab0ad

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,13 @@ public void tick() {
306306
try {
307307
// if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
308308
// Joining and immediately leaving for failure to read configs is exceedingly impolite
309-
if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
310-
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
309+
if (!canReadConfigs) {
310+
if (readConfigToEnd(workerSyncTimeoutMs)) {
311+
canReadConfigs = true;
312+
} else {
313+
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us
314+
}
315+
}
311316

312317
member.ensureActive();
313318
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
@@ -1033,7 +1038,9 @@ private boolean handleRebalanceCompleted() {
10331038
// we timed out. This should only happen if we failed to read configuration for long enough,
10341039
// in which case giving back control to the main loop will prevent hanging around indefinitely after getting kicked out of the group.
10351040
// We also indicate to the main loop that we failed to readConfigs so it will check that the issue was resolved before trying to join the group
1036-
if (!readConfigToEnd(workerSyncTimeoutMs)) {
1041+
if (readConfigToEnd(workerSyncTimeoutMs)) {
1042+
canReadConfigs = true;
1043+
} else {
10371044
canReadConfigs = false;
10381045
needsRejoin = true;
10391046
}

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,11 @@ public void testJoinLeaderCatchUpFails() throws Exception {
14711471
member.poll(EasyMock.anyInt());
14721472
PowerMock.expectLastCall();
14731473

1474+
// one more tick, to make sure we don't keep trying to read to the config topic unnecessarily
1475+
expectRebalance(1, Collections.emptyList(), Collections.emptyList());
1476+
member.poll(EasyMock.anyInt());
1477+
PowerMock.expectLastCall();
1478+
14741479
PowerMock.replayAll();
14751480

14761481
long before = time.milliseconds();
@@ -1488,6 +1493,10 @@ public void testJoinLeaderCatchUpFails() throws Exception {
14881493
time.sleep(2000L);
14891494
assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
14901495

1496+
// tick once more to ensure that the successful read to the end of the config topic was
1497+
// tracked and no further unnecessary attempts were made
1498+
herder.tick();
1499+
14911500
PowerMock.verifyAll();
14921501
}
14931502

0 commit comments

Comments
 (0)