Skip to content

StepContribution counters are not thread-safe during parallel chunk processing #5188

@KMGeon

Description

@KMGeon

Bug Description

When ChunkOrientedStep runs in concurrent mode (with TaskExecutor configured), StepContribution.incrementFilterCount() and StepContribution.incrementProcessSkipCount() suffer from race conditions, causing counts to be under-counted. This is because multiple worker threads concurrently call these methods on the same StepContribution instance, and both filterCount and processSkipCount fields use non-thread-safe long types with += or ++ operations.

Environment

  • Spring Batch version: 6.0.1
  • Java version: 22
  • OS: macOS (also reproducible on Linux/Windows)

Steps to Reproduce

  1. Configure a ChunkOrientedStep with a TaskExecutor to enable concurrent mode
  2. Use an ItemProcessor that filters all items (returns null)
  3. Process a large number of items
  4. Check StepExecution.getFilterCount() and StepExecution.getProcessSkipCount() after execution

Expected Behavior

filterCount and processSkipCount should accurately reflect the number of filtered/skipped items.

Root Cause Analysis

The issue is in StepContribution.java:

// Current implementation - NOT thread-safe
private long filterCount = 0;
private long processSkipCount = 0;

public void incrementFilterCount(long count) {
    filterCount += count; // Race condition!
}

public void incrementProcessSkipCount() {
    processSkipCount++; // Race condition!
}

In ChunkOrientedStep.processChunkConcurrently(), multiple tasks are submitted to the TaskExecutor, all sharing the same contribution object:

Future itemProcessingFuture = this.taskExecutor.submit(
    () -> processItem(item, contribution) // Same contribution shared
);

Each worker thread calls contribution.incrementFilterCount() or contribution.incrementProcessSkipCount() when processing items, causing the race condition.

The Problem with long += and ++ operations

These operations are not atomic and consist of multiple steps:

  1. Read current value
  2. Increment by count
  3. Write back to memory

Multiple threads performing these steps simultaneously can cause lost updates:

Thread 1: Read (100) → Increment → Write (101)
Thread 2: Read (100) → Increment → Write (101)  ← Lost update!
Expected: 102, Actual: 101

Proposed Solution

Change filterCount and processSkipCount to AtomicLong:

private final AtomicLong filterCount = new AtomicLong(0);
private final AtomicLong processSkipCount = new AtomicLong(0);

public void incrementFilterCount(long count) {
    filterCount.addAndGet(count);
}

public void incrementProcessSkipCount() {
    processSkipCount.incrementAndGet();
}

public long getFilterCount() {
    return filterCount.get();
}

public long getProcessSkipCount() {
    return processSkipCount.get();
}

Alternative Approaches Considered

Approach 1: Partial AtomicLong Application (Current Implementation)

This approach changes only filterCount and processSkipCount to AtomicLong in the StepContribution class, which maintains lock-free performance and API compatibility with minimal code changes, but having only some fields as AtomicLong while others remain long may reduce code consistency and cause confusion for other developers.

Approach 2: Aggregate Results in Main Thread

Modify processChunkConcurrently() to collect processing results and update counts in the main thread only.

record ProcessingResult<O>(@Nullable O item, boolean filtered, boolean skipped) {}

private void processChunkConcurrently(...) {
    List<Future<ProcessingResult<O>>> tasks = new LinkedList<>();

    for (...) {
        Future<ProcessingResult<O>> future = this.taskExecutor.submit(
            () -> processItemWithResult(item)  // Don't pass contribution
        );
        tasks.add(future);
    }

    // Aggregate results in main thread - no race condition
    for (Future<ProcessingResult<O>> future : tasks) {
        ProcessingResult<O> result = future.get();
        if (result.filtered()) ...
        if (result.skipped())...
        if (result.item() != null) ...
    }
}

I believe Approach 1 (AtomicLong) is the simplest and most practical solution, but I'm open to feedback. If you prefer Approach 2 or have other suggestions, please let me know and I'll revise accordingly.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions