Skip to content

Commit 23ec0ae

Browse files
authored
AsyncOperator#isFinished must never return true on failure (elastic#104029) (elastic#104070)
Enrich IT tests can return OK with some missing results instead of Failure when the enrich lookup hits circuit breakers. This is due to a race condition in isFinished and onFailure within the AsyncOperator. When an async lookup fails, we set the exception and then discard pages. Unfortunately, in the isFinished method, we perform the checks in the same order: first, we check for failure, and then we check for outstanding pages. If there is a long pause between these steps, isFinished might not detect the failure but see no outstanding pages, leading it to return true despite the presence of a failure. This change swaps the order of the checks.
1 parent 5f8c653 commit 23ec0ae

File tree

3 files changed

+60
-2
lines changed

3 files changed

+60
-2
lines changed

docs/changelog/104029.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 104029
2+
summary: '`AsyncOperator#isFinished` must never return true on failure'
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,12 @@ public void finish() {
181181

182182
@Override
183183
public boolean isFinished() {
184-
checkFailure();
185-
return finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo();
184+
if (finished && checkpoint.getPersistedCheckpoint() == checkpoint.getMaxSeqNo()) {
185+
checkFailure();
186+
return true;
187+
} else {
188+
return false;
189+
}
186190
}
187191

188192
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java

+49
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Iterator;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.concurrent.CyclicBarrier;
47+
import java.util.concurrent.TimeUnit;
4648
import java.util.concurrent.atomic.AtomicBoolean;
4749
import java.util.concurrent.atomic.AtomicInteger;
4850
import java.util.stream.LongStream;
@@ -270,6 +272,53 @@ protected void doClose() {
270272
}
271273
}
272274

275+
public void testIsFinished() {
276+
int iters = iterations(10, 10_000);
277+
BlockFactory blockFactory = blockFactory();
278+
for (int i = 0; i < iters; i++) {
279+
DriverContext driverContext = new DriverContext(blockFactory.bigArrays(), blockFactory);
280+
CyclicBarrier barrier = new CyclicBarrier(2);
281+
AsyncOperator asyncOperator = new AsyncOperator(driverContext, between(1, 10)) {
282+
@Override
283+
protected void performAsync(Page inputPage, ActionListener<Page> listener) {
284+
ActionRunnable<Page> command = new ActionRunnable<>(listener) {
285+
@Override
286+
protected void doRun() {
287+
try {
288+
barrier.await(10, TimeUnit.SECONDS);
289+
} catch (Exception e) {
290+
throw new AssertionError(e);
291+
}
292+
listener.onFailure(new ElasticsearchException("simulated"));
293+
}
294+
};
295+
threadPool.executor(ESQL_TEST_EXECUTOR).execute(command);
296+
}
297+
298+
@Override
299+
protected void doClose() {
300+
301+
}
302+
};
303+
asyncOperator.addInput(new Page(blockFactory.newConstantIntBlockWith(randomInt(), between(1, 10))));
304+
asyncOperator.finish();
305+
try {
306+
barrier.await(10, TimeUnit.SECONDS);
307+
} catch (Exception e) {
308+
throw new AssertionError(e);
309+
}
310+
int numChecks = between(10, 100);
311+
while (--numChecks >= 0) {
312+
try {
313+
assertFalse("must not finished or failed", asyncOperator.isFinished());
314+
} catch (ElasticsearchException e) {
315+
assertThat(e.getMessage(), equalTo("simulated"));
316+
break;
317+
}
318+
}
319+
}
320+
}
321+
273322
static class LookupService {
274323
private final ThreadPool threadPool;
275324
private final Map<Long, String> dict;

0 commit comments

Comments
 (0)