Skip to content

Commit

Permalink
KAFKA-17402: DefaultStateUpdated should transite task atomically (apa…
Browse files Browse the repository at this point in the history
…che#18607)

Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax committed Jan 27, 2025
1 parent 7a42aa2 commit 3292e35
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ private void maybeCompleteRestoration(final StreamTask task,
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
Expand All @@ -689,6 +688,7 @@ private void addToRestoredTasks(final StreamTask task) {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {
Expand Down

0 comments on commit 3292e35

Please sign in to comment.