Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.JobInterruptedException;
import org.springframework.batch.core.listener.ChunkListener;
import org.springframework.batch.core.listener.CompositeChunkListener;
Expand Down Expand Up @@ -89,6 +88,7 @@
* @param <O> type of output items
* @author Mahmoud Ben Hassine
* @author Andrey Litvitski
* @author xeounxzxu
* @since 6.0
*/
public class ChunkOrientedStep<I, O> extends AbstractStep {
Expand Down Expand Up @@ -377,6 +377,8 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
});

getJobRepository().update(stepExecution);
this.compositeItemStream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
}
}

Expand Down Expand Up @@ -427,10 +429,8 @@ private void processChunkConcurrently(TransactionStatus status, StepContribution
throw new FatalStepExecutionException("Unable to process chunk", e);
}
finally {
// apply contribution and update streams
// apply contribution
stepExecution.apply(contribution);
this.compositeItemStream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
}

}
Expand Down Expand Up @@ -458,10 +458,8 @@ private void processChunkSequentially(TransactionStatus status, StepContribution
throw new FatalStepExecutionException("Unable to process chunk", e);
}
finally {
// apply contribution and update streams
// apply contribution
stepExecution.apply(contribution);
compositeItemStream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
}
}

Expand Down Expand Up @@ -779,4 +777,4 @@ boolean moreItems() {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.infrastructure.item.ItemWriter;
import org.springframework.batch.infrastructure.item.*;
import org.springframework.batch.infrastructure.item.support.ListItemReader;
import org.springframework.batch.infrastructure.item.support.ListItemWriter;
import org.springframework.batch.infrastructure.support.transaction.ResourcelessTransactionManager;
Expand All @@ -52,6 +50,7 @@
/**
* @author Mahmoud Ben Hassine
* @author Andrey Litvitski
* @author xeounxzxu
*/
public class ChunkOrientedStepTests {

Expand Down Expand Up @@ -319,4 +318,52 @@ class SkippableException extends RuntimeException {
assertEquals(1, stepExecution.getSkipCount());
}

@Test
void testItemStreamUpdateStillOccursWhenChunkRollsBack_bugReproduction() throws Exception {
// given: tracking stream to capture update invocations
TrackingItemStream trackingItemStream = new TrackingItemStream();
ItemReader<String> reader = new ListItemReader<>(List.of("item1"));
ItemWriter<String> writer = chunk -> {
throw new RuntimeException("Simulated failure");
};
JobRepository jobRepository = new ResourcelessJobRepository();
ChunkOrientedStep<String, String> step = new ChunkOrientedStep<>("step", 1, reader, writer, jobRepository);
step.registerItemStream(trackingItemStream);
step.afterPropertiesSet();
JobInstance jobInstance = new JobInstance(1L, "job");
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);

// when: execute step (writer causes chunk rollback)
step.execute(stepExecution);

// then: due to current bug the stream update count becomes 1 although chunk
// rolled back
assertEquals(0, trackingItemStream.getUpdateCount(),
"ItemStream should not be updated when chunk transaction fails (bug reproduction)");
}

private static final class TrackingItemStream implements ItemStream {

private int updateCount;

@Override
public void open(ExecutionContext executionContext) {
}

@Override
public void update(ExecutionContext executionContext) {
this.updateCount++;
}

@Override
public void close() {
}

int getUpdateCount() {
return this.updateCount;
}

}

}