Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import org.springframework.batch.core.ExitStatus;

Expand All @@ -36,15 +37,15 @@ public class StepContribution implements Serializable {

private long writeCount = 0;

private long filterCount = 0;
private final AtomicLong filterCount = new AtomicLong(0);

private final long parentSkipCount;

private long readSkipCount;

private long writeSkipCount;

private long processSkipCount;
private final AtomicLong processSkipCount = new AtomicLong(0);

private ExitStatus exitStatus = ExitStatus.EXECUTING;

Expand Down Expand Up @@ -86,7 +87,7 @@ public void incrementFilterCount() {
* @param count The {@code long} amount to increment by.
*/
public void incrementFilterCount(long count) {
filterCount += count;
filterCount.addAndGet(count);
}

/**
Expand Down Expand Up @@ -125,23 +126,23 @@ public long getWriteCount() {
* @return the filter counter.
*/
public long getFilterCount() {
return filterCount;
return filterCount.get();
}

/**
* @return the sum of skips accumulated in the parent {@link StepExecution} and this
* <code>StepContribution</code>.
*/
public long getStepSkipCount() {
return readSkipCount + writeSkipCount + processSkipCount + parentSkipCount;
return readSkipCount + writeSkipCount + processSkipCount.get() + parentSkipCount;
}

/**
* @return the number of skips collected in this <code>StepContribution</code> (not
* including skips accumulated in the parent {@link StepExecution}).
*/
public long getSkipCount() {
return readSkipCount + writeSkipCount + processSkipCount;
return readSkipCount + writeSkipCount + processSkipCount.get();
}

/**
Expand Down Expand Up @@ -179,11 +180,11 @@ public void incrementWriteSkipCount(long count) {
*
*/
public void incrementProcessSkipCount() {
processSkipCount++;
processSkipCount.incrementAndGet();
}

public void incrementProcessSkipCount(long count) {
processSkipCount += count;
processSkipCount.addAndGet(count);
}

/**
Expand All @@ -207,7 +208,7 @@ public long getWriteSkipCount() {
* @return the process skip count.
*/
public long getProcessSkipCount() {
return processSkipCount;
return processSkipCount.get();
}

/**
Expand All @@ -220,25 +221,26 @@ public StepExecution getStepExecution() {

@Override
public String toString() {
return "[StepContribution: read=" + readCount + ", written=" + writeCount + ", filtered=" + filterCount
return "[StepContribution: read=" + readCount + ", written=" + writeCount + ", filtered=" + filterCount.get()
+ ", readSkips=" + readSkipCount + ", writeSkips=" + writeSkipCount + ", processSkips="
+ processSkipCount + ", exitStatus=" + exitStatus.getExitCode() + "]";
+ processSkipCount.get() + ", exitStatus=" + exitStatus.getExitCode() + "]";
}

@Override
public boolean equals(Object o) {
if (!(o instanceof StepContribution that))
return false;
return readCount == that.readCount && writeCount == that.writeCount && filterCount == that.filterCount
&& parentSkipCount == that.parentSkipCount && readSkipCount == that.readSkipCount
&& writeSkipCount == that.writeSkipCount && processSkipCount == that.processSkipCount
return readCount == that.readCount && writeCount == that.writeCount
&& filterCount.get() == that.filterCount.get() && parentSkipCount == that.parentSkipCount
&& readSkipCount == that.readSkipCount && writeSkipCount == that.writeSkipCount
&& processSkipCount.get() == that.processSkipCount.get()
&& Objects.equals(stepExecution, that.stepExecution) && Objects.equals(exitStatus, that.exitStatus);
}

@Override
public int hashCode() {
return Objects.hash(stepExecution, readCount, writeCount, filterCount, parentSkipCount, readSkipCount,
writeSkipCount, processSkipCount, exitStatus);
return Objects.hash(stepExecution, readCount, writeCount, filterCount.get(), parentSkipCount, readSkipCount,
writeSkipCount, processSkipCount.get(), exitStatus);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,147 @@ class SkippableException extends RuntimeException {
assertEquals(1, stepExecution.getSkipCount());
}

@Test
void testFilterCountAccuracyInConcurrentMode() throws Exception {
// given
int itemCount = 10;
AtomicInteger readCounter = new AtomicInteger(0);

ItemReader<Integer> reader = () -> {
int current = readCounter.incrementAndGet();
return current <= itemCount ? current : null;
};

ItemProcessor<Integer, Integer> filteringProcessor = item -> null;

ItemWriter<Integer> writer = chunk -> {
};

JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
step.setItemProcessor(filteringProcessor);
step.setTaskExecutor(new SimpleAsyncTaskExecutor());
step.afterPropertiesSet();

JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when
step.execute(stepExecution);

// then
assertEquals(itemCount, stepExecution.getFilterCount(), "Race condition detected! Expected " + itemCount
+ " filtered items, but got " + stepExecution.getFilterCount());
}

@Test
void testFilterCountAccuracyInSequentialMode() throws Exception {
// given
int itemCount = 10;
AtomicInteger readCounter = new AtomicInteger(0);

ItemReader<Integer> reader = () -> {
int current = readCounter.incrementAndGet();
return current <= itemCount ? current : null;
};

ItemProcessor<Integer, Integer> filteringProcessor = item -> null;
ItemWriter<Integer> writer = chunk -> {
};

JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
step.setItemProcessor(filteringProcessor);
step.afterPropertiesSet();

JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when
step.execute(stepExecution);

// then
assertEquals(itemCount, stepExecution.getFilterCount(), "Sequential mode should have accurate filter count");
}

@Test
void testProcessSkipCountAccuracyInConcurrentMode() throws Exception {
// given
int itemCount = 10;
AtomicInteger readCounter = new AtomicInteger(0);

ItemReader<Integer> reader = () -> {
int current = readCounter.incrementAndGet();
return current <= itemCount ? current : null;
};

ItemProcessor<Integer, Integer> failingProcessor = item -> {
throw new RuntimeException("Simulated processing failure");
};

ItemWriter<Integer> writer = chunk -> {
};

JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
step.setItemProcessor(failingProcessor);
step.setTaskExecutor(new SimpleAsyncTaskExecutor());
step.setFaultTolerant(true);
step.setRetryPolicy(RetryPolicy.withMaxRetries(1));
step.setSkipPolicy((throwable, skipCount) -> throwable instanceof RuntimeException);

step.afterPropertiesSet();

JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when
step.execute(stepExecution);

// then
assertEquals(itemCount, stepExecution.getProcessSkipCount(), "Race condition detected! Expected " + itemCount
+ " process skips, but got " + stepExecution.getProcessSkipCount());
}

@Test
void testProcessSkipCountAccuracyInSequentialMode() throws Exception {
// given
int itemCount = 10;
AtomicInteger readCounter = new AtomicInteger(0);

ItemReader<Integer> reader = () -> {
int current = readCounter.incrementAndGet();
return current <= itemCount ? current : null;
};

ItemProcessor<Integer, Integer> failingProcessor = item -> {
throw new RuntimeException("Simulated processing failure");
};

ItemWriter<Integer> writer = chunk -> {
};

JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<Integer, Integer> step = new ChunkOrientedStep<>("step", 100, reader, writer, jobRepository);
step.setItemProcessor(failingProcessor);
step.setFaultTolerant(true);
step.setRetryPolicy(RetryPolicy.withMaxRetries(1));
step.setSkipPolicy((throwable, skipCount) -> throwable instanceof RuntimeException);
step.afterPropertiesSet();

JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when
step.execute(stepExecution);

// then
assertEquals(itemCount, stepExecution.getProcessSkipCount(),
"Sequential mode should have accurate process skip count");
}

}