Skip to content

Commit

Permalink
KAFKA-10199: Add missing catch for lock exception (apache#14403)
Browse files Browse the repository at this point in the history
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.

In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.

Reviewer: Lucas Brutschy <[email protected]>
  • Loading branch information
cadonna authored Sep 26, 2023
1 parent 9c2e5da commit a46da90
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,7 @@ private void recycleTaskFromStateUpdater(final Task task,
newTask = task.isActive() ?
convertActiveToStandby((StreamTask) task, inputPartitions) :
convertStandbyToActive((StandbyTask) task, inputPartitions);
newTask.initializeIfNeeded();
stateUpdater.add(newTask);
addTaskToStateUpdater(newTask);
} catch (final RuntimeException e) {
final TaskId taskId = task.id();
final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
Expand Down Expand Up @@ -843,13 +842,7 @@ private void addTasksToStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
for (final Task task : tasks.drainPendingTasksToInit()) {
try {
task.initializeIfNeeded();
stateUpdater.add(task);
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
addTaskToStateUpdater(task);
} catch (final RuntimeException e) {
// need to add task back to the bookkeeping to be handled by the stream thread
tasks.addTask(task);
Expand All @@ -860,6 +853,18 @@ private void addTasksToStateUpdater() {
maybeThrowTaskExceptions(taskExceptions);
}

private void addTaskToStateUpdater(final Task task) {
try {
task.initializeIfNeeded();
stateUpdater.add(task);
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
}
}

public void handleExceptionsFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,18 +778,57 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException)
.when(task00).initializeIfNeeded();
doThrow(lockException).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

Mockito.verify(task00).initializeIfNeeded();
Mockito.verify(task01).initializeIfNeeded();
Mockito.verify(tasks).addPendingTasksToInit(Collections.singleton(task00));
Mockito.verify(tasks).addPendingTasksToInit(
Mockito.argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
Mockito.verify(stateUpdater, never()).add(task00);
Mockito.verify(stateUpdater).add(task01);
}

@Test
public void shouldRetryInitializationWhenLockExceptionAfterRecyclingInStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final StandbyTask task00Converted = standbyTask(taskId00, taskId00Partitions)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
.withInputPartitions(taskId01Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions,
consumer)).thenReturn(task01Converted);
when(standbyTaskCreator.createStandbyTaskFromActive(task00, taskId00Partitions))
.thenReturn(task00Converted);
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException).when(task00Converted).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

Mockito.verify(task00Converted).initializeIfNeeded();
Mockito.verify(task01Converted).initializeIfNeeded();
Mockito.verify(tasks).addPendingTasksToInit(
Mockito.argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
);
Mockito.verify(stateUpdater, never()).add(task00Converted);
Mockito.verify(stateUpdater).add(task01Converted);
}

@Test
public void shouldRecycleTasksRemovedFromStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down

0 comments on commit a46da90

Please sign in to comment.