Skip to content

Commit 0868c02

Browse files
committed
Fix race condition in StepContribution.filterCount
Change filterCount field from long to AtomicLong to ensure thread-safety when incrementing filter count in concurrent chunk processing. Resolves #5188 Signed-off-by: mugeon <pos04167@kakao.com>
1 parent 2cc7890 commit 0868c02

File tree

2 files changed

+136
-61
lines changed

2 files changed

+136
-61
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.Serializable;
1919
import java.util.Objects;
20+
import java.util.concurrent.atomic.AtomicLong;
2021

2122
import org.springframework.batch.core.ExitStatus;
2223

@@ -36,7 +37,7 @@ public class StepContribution implements Serializable {
3637

3738
private long writeCount = 0;
3839

39-
private long filterCount = 0;
40+
private final AtomicLong filterCount = new AtomicLong(0);
4041

4142
private final long parentSkipCount;
4243

@@ -86,7 +87,7 @@ public void incrementFilterCount() {
8687
* @param count The {@code long} amount to increment by.
8788
*/
8889
public void incrementFilterCount(long count) {
89-
filterCount += count;
90+
filterCount.addAndGet(count);
9091
}
9192

9293
/**
@@ -125,7 +126,7 @@ public long getWriteCount() {
125126
* @return the filter counter.
126127
*/
127128
public long getFilterCount() {
128-
return filterCount;
129+
return filterCount.get();
129130
}
130131

131132
/**
@@ -220,7 +221,7 @@ public StepExecution getStepExecution() {
220221

221222
@Override
222223
public String toString() {
223-
return "[StepContribution: read=" + readCount + ", written=" + writeCount + ", filtered=" + filterCount
224+
return "[StepContribution: read=" + readCount + ", written=" + writeCount + ", filtered=" + filterCount.get()
224225
+ ", readSkips=" + readSkipCount + ", writeSkips=" + writeSkipCount + ", processSkips="
225226
+ processSkipCount + ", exitStatus=" + exitStatus.getExitCode() + "]";
226227
}
@@ -229,15 +230,16 @@ public String toString() {
229230
public boolean equals(Object o) {
230231
if (!(o instanceof StepContribution that))
231232
return false;
232-
return readCount == that.readCount && writeCount == that.writeCount && filterCount == that.filterCount
233-
&& parentSkipCount == that.parentSkipCount && readSkipCount == that.readSkipCount
234-
&& writeSkipCount == that.writeSkipCount && processSkipCount == that.processSkipCount
235-
&& Objects.equals(stepExecution, that.stepExecution) && Objects.equals(exitStatus, that.exitStatus);
233+
return readCount == that.readCount && writeCount == that.writeCount
234+
&& filterCount.get() == that.filterCount.get() && parentSkipCount == that.parentSkipCount
235+
&& readSkipCount == that.readSkipCount && writeSkipCount == that.writeSkipCount
236+
&& processSkipCount == that.processSkipCount && Objects.equals(stepExecution, that.stepExecution)
237+
&& Objects.equals(exitStatus, that.exitStatus);
236238
}
237239

238240
@Override
239241
public int hashCode() {
240-
return Objects.hash(stepExecution, readCount, writeCount, filterCount, parentSkipCount, readSkipCount,
242+
return Objects.hash(stepExecution, readCount, writeCount, filterCount.get(), parentSkipCount, readSkipCount,
241243
writeSkipCount, processSkipCount, exitStatus);
242244
}
243245

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java

Lines changed: 125 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import org.junit.jupiter.api.Assertions;
22+
import org.junit.jupiter.api.RepeatedTest;
2223
import org.junit.jupiter.api.Test;
2324

2425
import org.springframework.batch.core.ExitStatus;
@@ -58,14 +59,14 @@ public class ChunkOrientedStepTests {
5859
@Test
5960
void testInheritedPropertiesOnBuild() {
6061
ChunkOrientedStep<String, String> step = new StepBuilder("step", new ResourcelessJobRepository())
61-
.<String, String>chunk(5)
62-
.reader(new ListItemReader<>(List.of("foo", "bar")))
63-
.writer(items -> {
64-
})
65-
// inherited properties from StepBuilderHelper
66-
.allowStartIfComplete(true)
67-
.startLimit(5)
68-
.build();
62+
.<String, String>chunk(5)
63+
.reader(new ListItemReader<>(List.of("foo", "bar")))
64+
.writer(items -> {
65+
})
66+
// inherited properties from StepBuilderHelper
67+
.allowStartIfComplete(true)
68+
.startLimit(5)
69+
.build();
6970

7071
Assertions.assertTrue(step.isAllowStartIfComplete());
7172
Assertions.assertEquals(5, step.getStartLimit());
@@ -74,23 +75,23 @@ void testInheritedPropertiesOnBuild() {
7475
@Test
7576
void testFaultTolerantChunkOrientedStepSetupWithDefaultSkipLimit() {
7677
Assertions.assertDoesNotThrow(() -> new StepBuilder(mock()).chunk(5)
77-
.reader(new ListItemReader<>(List.of("item1", "item2")))
78-
.writer(items -> {
79-
})
80-
.faultTolerant()
81-
.skip(Exception.class)
82-
.build());
78+
.reader(new ListItemReader<>(List.of("item1", "item2")))
79+
.writer(items -> {
80+
})
81+
.faultTolerant()
82+
.skip(Exception.class)
83+
.build());
8384
}
8485

8586
@Test
8687
void testFaultTolerantChunkOrientedStepSetupWithDefaultRetryLimit() {
8788
Assertions.assertDoesNotThrow(() -> new StepBuilder(mock()).chunk(5)
88-
.reader(new ListItemReader<>(List.of("item1", "item2")))
89-
.writer(items -> {
90-
})
91-
.faultTolerant()
92-
.retry(Exception.class)
93-
.build());
89+
.reader(new ListItemReader<>(List.of("item1", "item2")))
90+
.writer(items -> {
91+
})
92+
.faultTolerant()
93+
.retry(Exception.class)
94+
.build());
9495
}
9596

9697
@Test
@@ -147,14 +148,14 @@ void testRetryLimitWithoutRetryDoesNotRetryErrors() throws Exception {
147148

148149
ChunkOrientedStep<String, String> step = new ChunkOrientedStepBuilder<String, String>(
149150
new ResourcelessJobRepository(), 2)
150-
.reader(new ListItemReader<>(List.of("item1")))
151-
.processor(processor)
152-
.writer(items -> {
153-
})
154-
.transactionManager(new ResourcelessTransactionManager())
155-
.faultTolerant()
156-
.retryLimit(3)
157-
.build();
151+
.reader(new ListItemReader<>(List.of("item1")))
152+
.processor(processor)
153+
.writer(items -> {
154+
})
155+
.transactionManager(new ResourcelessTransactionManager())
156+
.faultTolerant()
157+
.retryLimit(3)
158+
.build();
158159

159160
JobInstance jobInstance = new JobInstance(1L, "job");
160161
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
@@ -182,13 +183,13 @@ void testRetryLimitWithoutRetryRetriesExceptions() throws Exception {
182183
ListItemWriter<String> listItemWriter = new ListItemWriter<>();
183184
ChunkOrientedStep<String, String> step = new ChunkOrientedStepBuilder<String, String>(
184185
new ResourcelessJobRepository(), 2)
185-
.reader(listItemReader)
186-
.processor(processor)
187-
.writer(listItemWriter)
188-
.transactionManager(new ResourcelessTransactionManager())
189-
.faultTolerant()
190-
.retryLimit(3)
191-
.build();
186+
.reader(listItemReader)
187+
.processor(processor)
188+
.writer(listItemWriter)
189+
.transactionManager(new ResourcelessTransactionManager())
190+
.faultTolerant()
191+
.retryLimit(3)
192+
.build();
192193

193194
JobInstance jobInstance = new JobInstance(1L, "job");
194195
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
@@ -216,14 +217,14 @@ void testExplicitRetryConfigurationTakesPrecedence() throws Exception {
216217

217218
ChunkOrientedStep<String, String> step = new ChunkOrientedStepBuilder<String, String>(
218219
new ResourcelessJobRepository(), 2)
219-
.reader(listItemReader)
220-
.processor(processor)
221-
.writer(listItemWriter)
222-
.transactionManager(new ResourcelessTransactionManager())
223-
.faultTolerant()
224-
.retry(IllegalStateException.class)
225-
.retryLimit(3)
226-
.build();
220+
.reader(listItemReader)
221+
.processor(processor)
222+
.writer(listItemWriter)
223+
.transactionManager(new ResourcelessTransactionManager())
224+
.faultTolerant()
225+
.retry(IllegalStateException.class)
226+
.retryLimit(3)
227+
.build();
227228

228229
JobInstance jobInstance = new JobInstance(1L, "job");
229230
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
@@ -258,7 +259,7 @@ void testDoSkipInProcessShouldThrowNonSkippableProcessExceptionWhenSkipPolicyRet
258259
step.setItemProcessor(processor);
259260
step.setFaultTolerant(true);
260261
step.setRetryPolicy(RetryPolicy.withMaxRetries(1)); // retry once (initial + 1
261-
// retry)
262+
// retry)
262263
step.setSkipPolicy(new NeverSkipItemSkipPolicy()); // never skip
263264
step.afterPropertiesSet();
264265

@@ -297,14 +298,14 @@ class SkippableException extends RuntimeException {
297298

298299
JobRepository jobRepository = new ResourcelessJobRepository();
299300
ChunkOrientedStep<String, String> step = new StepBuilder("step", jobRepository).<String, String>chunk(1)
300-
.reader(reader)
301-
.writer(writer)
302-
.faultTolerant()
303-
.retry(SkippableException.class)
304-
.retryLimit(1)
305-
.skip(SkippableException.class)
306-
.skipLimit(1)
307-
.build();
301+
.reader(reader)
302+
.writer(writer)
303+
.faultTolerant()
304+
.retry(SkippableException.class)
305+
.retryLimit(1)
306+
.skip(SkippableException.class)
307+
.skipLimit(1)
308+
.build();
308309

309310
JobInstance jobInstance = new JobInstance(1L, "job");
310311
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
@@ -319,4 +320,76 @@ class SkippableException extends RuntimeException {
319320
assertEquals(1, stepExecution.getSkipCount());
320321
}
321322

323+
/**
324+
* Test to verify that filterCount is accurate in concurrent mode. This test detects
325+
* race condition in StepContribution.incrementFilterCount().
326+
*/
327+
@RepeatedTest(5)
328+
void testFilterCountAccuracyInConcurrentMode() throws Exception {
329+
// given
330+
int itemCount = 10000;
331+
AtomicInteger readCounter = new AtomicInteger(0);
332+
333+
// Thread-safe reader
334+
ItemReader<Integer> reader = () -> {
335+
int current = readCounter.incrementAndGet();
336+
return current <= itemCount ? current : null;
337+
};
338+
339+
ItemProcessor<Integer, Integer> filteringProcessor = item -> null;
340+
341+
ItemWriter<Integer> writer = chunk -> {
342+
};
343+
344+
JobRepository jobRepository = new ResourcelessJobRepository();
345+
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
346+
step.setItemProcessor(filteringProcessor);
347+
step.setTaskExecutor(new SimpleAsyncTaskExecutor()); // Enable concurrent mode
348+
step.afterPropertiesSet();
349+
350+
JobInstance jobInstance = new JobInstance(1L, "job");
351+
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
352+
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);
353+
354+
// when
355+
step.execute(stepExecution);
356+
357+
// then
358+
assertEquals(itemCount, stepExecution.getFilterCount(), "Race condition detected! Expected " + itemCount
359+
+ " filtered items, but got " + stepExecution.getFilterCount());
360+
}
361+
362+
@Test
363+
void testFilterCountAccuracyInSequentialMode() throws Exception {
364+
// given
365+
int itemCount = 10000;
366+
AtomicInteger readCounter = new AtomicInteger(0);
367+
368+
ItemReader<Integer> reader = () -> {
369+
int current = readCounter.incrementAndGet();
370+
return current <= itemCount ? current : null;
371+
};
372+
373+
ItemProcessor<Integer, Integer> filteringProcessor = item -> null;
374+
ItemWriter<Integer> writer = chunk -> {
375+
};
376+
377+
JobRepository jobRepository = new ResourcelessJobRepository();
378+
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
379+
step.setItemProcessor(filteringProcessor);
380+
// No TaskExecutor - sequential mode
381+
step.afterPropertiesSet();
382+
383+
JobInstance jobInstance = new JobInstance(1L, "job");
384+
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
385+
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);
386+
387+
// when
388+
step.execute(stepExecution);
389+
390+
// then
391+
assertEquals(itemCount, stepExecution.getFilterCount(), "Sequential mode should have accurate filter count");
392+
}
393+
322394
}
395+

0 commit comments

Comments
 (0)