From 9f64ef255b4d1f9b3c5bec6fc5718e2ba28bbe1a Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 21 Jan 2025 12:33:48 -0800 Subject: [PATCH] KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607) Reviewers: Bruno Cadonna , Lucas Brutschy --- .../kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 3ab94f689f77c..d4d90246ca779 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -675,7 +675,6 @@ private void maybeCompleteRestoration(final StreamTask task, measureCheckpointLatency(() -> task.maybeCheckpoint(true)); changelogReader.unregister(changelogPartitions); addToRestoredTasks(task); - updatingTasks.remove(task.id()); log.info("Stateful active task " + task.id() + " completed restoration"); transitToUpdateStandbysIfOnlyStandbysLeft(); } @@ -691,6 +690,7 @@ private void addToRestoredTasks(final StreamTask task) { restoredActiveTasksLock.lock(); try { restoredActiveTasks.add(task); + updatingTasks.remove(task.id()); log.debug("Active task " + task.id() + " was added to the restored tasks"); restoredActiveTasksCondition.signalAll(); } finally {