Skip to content

Commit

Permalink
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607
Browse files Browse the repository at this point in the history
)

Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Jan 21, 2025
1 parent 967a197 commit f2f9e84
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ private void maybeCompleteRestoration(final StreamTask task,
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
changelogReader.unregister(changelogPartitions);
addToRestoredTasks(task);
updatingTasks.remove(task.id());
log.info("Stateful active task " + task.id() + " completed restoration");
transitToUpdateStandbysIfOnlyStandbysLeft();
}
Expand All @@ -689,6 +688,7 @@ private void addToRestoredTasks(final StreamTask task) {
restoredActiveTasksLock.lock();
try {
restoredActiveTasks.add(task);
updatingTasks.remove(task.id());
log.debug("Active task " + task.id() + " was added to the restored tasks");
restoredActiveTasksCondition.signalAll();
} finally {
Expand Down

0 comments on commit f2f9e84

Please sign in to comment.