Skip to content

Commit

Permalink
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607
Browse files Browse the repository at this point in the history
)

Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax committed Jan 21, 2025
1 parent 379049f commit 9f64ef2
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ private void maybeCompleteRestoration(final StreamTask task,
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
Expand All @@ -691,6 +690,7 @@ private void addToRestoredTasks(final StreamTask task) {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {
Expand Down

0 comments on commit 9f64ef2

Please sign in to comment.