Skip to content

Commit 648ca66

Browse files
authored
Fix potential dataloss (#56)
* Fix dataloss, do not ignore interrupt exceptions while commit thread is running * Disable vertica integration tests due to missing image
1 parent da020ba commit 648ca66

File tree

2 files changed

+2
-3
lines changed

2 files changed

+2
-3
lines changed

stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class RecordBatchingSinker[B <: RecordBatch](
7373

7474
log.info(s"Committing batch $batch to storage")
7575
Metrics.commitDuration.recordCallable(() =>
76-
retryOnFailureIf(retryPolicy)(!batchCommittedAfterFailure(batch)) {
76+
retryOnFailureIf(retryPolicy)(isRunning.get() && !batchCommittedAfterFailure(batch)) {
7777
batchStorage.commitBatch(batch)
7878
}
7979
)
@@ -86,7 +86,7 @@ class RecordBatchingSinker[B <: RecordBatch](
8686
log.warn("Failed discarding batch")
8787
}
8888
} catch {
89-
case e if isInterruptionException(e) =>
89+
case e if isInterruptionException(e) && !isRunning.get() =>
9090
log.debug("Batch commit thread interrupted")
9191
}
9292
},

stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ object Retry extends Logging {
4646
def isInterruptionException(e: Throwable): Boolean = e match {
4747
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException | _: InterruptException =>
4848
true
49-
case e: Throwable if e.getCause != null && isInterruptionException(e.getCause) => true
5049
case _ => false
5150
}
5251

0 commit comments

Comments
 (0)