File tree Expand file tree Collapse file tree 1 file changed +3
-2
lines changed
beam/core/src/main/java/cz/o2/proxima/beam/io Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -122,7 +122,8 @@ public void startBundle() {
122122 public void finishBundle () {
123123 long startTime = System .currentTimeMillis ();
124124 while (missingResponses .get () > 0 ) {
125- if (System .currentTimeMillis () - startTime > bundleFinalizeTimeoutMs ) {
125+ long elapsed = System .currentTimeMillis () - startTime ;
126+ if (elapsed >= bundleFinalizeTimeoutMs ) {
126127 throw new IllegalStateException ("Failed to flush bundle within timeout of 5s" );
127128 }
128129 // clone to avoid ConcurrentModificationException
@@ -136,7 +137,7 @@ public void finishBundle() {
136137 .map (
137138 f ->
138139 ExceptionUtils .uncheckedFactory (
139- () -> f .get (bundleFinalizeTimeoutMs , TimeUnit .MILLISECONDS )))
140+ () -> f .get (bundleFinalizeTimeoutMs - elapsed , TimeUnit .MILLISECONDS )))
140141 .filter (p -> !p .getFirst ())
141142 // this will be retried
142143 .filter (p -> !(p .getSecond () instanceof TransactionRejectedException ))
You can’t perform that action at this time.
0 commit comments