diff --git a/docs/changelog/125631.yaml b/docs/changelog/125631.yaml new file mode 100644 index 0000000000000..32917bb1da060 --- /dev/null +++ b/docs/changelog/125631.yaml @@ -0,0 +1,5 @@ +pr: 125631 +summary: Add `documents_found` and `values_loaded` +area: ES|QL +type: enhancement +issues: [] diff --git a/docs/reference/esql/esql-rest.asciidoc b/docs/reference/esql/esql-rest.asciidoc index c4160e1d65954..2c8ea36fb4adb 100644 --- a/docs/reference/esql/esql-rest.asciidoc +++ b/docs/reference/esql/esql-rest.asciidoc @@ -194,6 +194,8 @@ Which returns: { "took": 28, "is_partial": false, + "documents_found": 5, + "values_loaded": 20, "columns": [ {"name": "author", "type": "text"}, {"name": "name", "type": "text"}, diff --git a/docs/reference/esql/functions/description/knn.asciidoc b/docs/reference/esql/functions/description/knn.asciidoc new file mode 100644 index 0000000000000..9420b29dc25d5 --- /dev/null +++ b/docs/reference/esql/functions/description/knn.asciidoc @@ -0,0 +1,5 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Description* + +Finds the k nearest vectors to a query vector, as measured by a similarity metric. knn function finds nearest vectors through approximate search on indexed dense_vectors. diff --git a/docs/reference/esql/functions/examples/knn.asciidoc b/docs/reference/esql/functions/examples/knn.asciidoc new file mode 100644 index 0000000000000..c9dfe41e8c21d --- /dev/null +++ b/docs/reference/esql/functions/examples/knn.asciidoc @@ -0,0 +1,21 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Examples* + +[source.merge.styled,esql] +---- +include::{esql-specs}/knn-function.csv-spec[tag=knn-function] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/knn-function.csv-spec[tag=knn-function-result] +|=== +[source.merge.styled,esql] +---- +include::{esql-specs}/knn-function.csv-spec[tag=knn-function-options] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/knn-function.csv-spec[tag=knn-function-options-result] +|=== + diff --git a/docs/reference/esql/functions/functionNamedParams/knn.asciidoc b/docs/reference/esql/functions/functionNamedParams/knn.asciidoc new file mode 100644 index 0000000000000..40232c8bb8eb1 --- /dev/null +++ b/docs/reference/esql/functions/functionNamedParams/knn.asciidoc @@ -0,0 +1,13 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Supported function named parameters* + +[%header.monospaced.styled,format=dsv,separator=|] +|=== +name | types | description +num_candidates | [integer] | The number of nearest neighbor candidates to consider per shard while doing knn search. Cannot exceed 10,000. Increasing num_candidates tends to improve the accuracy of the final results. Defaults to 1.5 * k +boost | [float] | Floating point number used to decrease or increase the relevance scores of the query.Defaults to 1.0. +k | [integer] | The number of nearest neighbors to return from each shard. Elasticsearch collects k results from each shard, then merges them to find the global top results. This value must be less than or equal to num_candidates. Defaults to 10. +rescore_oversample | [double] | Applies the specified oversampling for rescoring quantized vectors. See [oversampling and rescoring quantized vectors](docs-content://solutions/search/vector/knn.md#dense-vector-knn-search-rescoring) for details. +similarity | [double] | The minimum similarity required for a document to be considered a match. The similarity value calculated relates to the raw similarity used, not the document score. +|=== diff --git a/docs/reference/esql/functions/functionNamedParams/to_ip.asciidoc b/docs/reference/esql/functions/functionNamedParams/to_ip.asciidoc new file mode 100644 index 0000000000000..d58085d36dbbb --- /dev/null +++ b/docs/reference/esql/functions/functionNamedParams/to_ip.asciidoc @@ -0,0 +1,9 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Supported function named parameters* + +[%header.monospaced.styled,format=dsv,separator=|] +|=== +name | types | description +leading_zeros | [keyword] | What to do with leading 0s in IPv4 addresses. +|=== diff --git a/docs/reference/esql/functions/layout/knn.asciidoc b/docs/reference/esql/functions/layout/knn.asciidoc new file mode 100644 index 0000000000000..146080148ff6c --- /dev/null +++ b/docs/reference/esql/functions/layout/knn.asciidoc @@ -0,0 +1,18 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +[discrete] +[[esql-knn]] +=== `KNN` + +preview::["Do not use on production environments. This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features."] + +*Syntax* + +[.text-center] +image::esql/functions/signature/knn.svg[Embedded,opts=inline] + +include::../parameters/knn.asciidoc[] +include::../description/knn.asciidoc[] +include::../types/knn.asciidoc[] +include::../functionNamedParams/knn.asciidoc[] +include::../examples/knn.asciidoc[] diff --git a/docs/reference/esql/functions/parameters/knn.asciidoc b/docs/reference/esql/functions/parameters/knn.asciidoc new file mode 100644 index 0000000000000..46e15d7975711 --- /dev/null +++ b/docs/reference/esql/functions/parameters/knn.asciidoc @@ -0,0 +1,12 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Parameters* + +`field`:: +Field that the query will target. + +`query`:: +Vector value to find top nearest neighbours for. + +`options`:: +(Optional) kNN additional options as <>. See <> for more information. diff --git a/docs/reference/esql/functions/signature/knn.svg b/docs/reference/esql/functions/signature/knn.svg new file mode 100644 index 0000000000000..2d985113fbcd9 --- /dev/null +++ b/docs/reference/esql/functions/signature/knn.svg @@ -0,0 +1 @@ +KNN(field,query,options) \ No newline at end of file diff --git a/docs/reference/esql/functions/types/knn.asciidoc b/docs/reference/esql/functions/types/knn.asciidoc new file mode 100644 index 0000000000000..9bcd043f9d127 --- /dev/null +++ b/docs/reference/esql/functions/types/knn.asciidoc @@ -0,0 +1,9 @@ +// This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it. + +*Supported types* + +[%header.monospaced.styled,format=dsv,separator=|] +|=== +field | query | options | result +boolean +|=== diff --git a/docs/reference/esql/multivalued-fields.asciidoc b/docs/reference/esql/multivalued-fields.asciidoc index 00d9df04a0bc4..4cd9abbea33f8 100644 --- a/docs/reference/esql/multivalued-fields.asciidoc +++ b/docs/reference/esql/multivalued-fields.asciidoc @@ -28,6 +28,8 @@ Multivalued fields come back as a JSON array: { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 5, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "long"} @@ -80,6 +82,8 @@ And {esql} sees that removal: { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 5, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "keyword"} @@ -125,6 +129,8 @@ And {esql} also sees that: { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 7, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "long"} @@ -169,6 +175,8 @@ POST /_query { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 7, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "keyword"} @@ -203,6 +211,8 @@ POST /_query { "took": 28, "is_partial": false, + "documents_found": 1, + "values_loaded": 2, "columns": [ { "name": "a", "type": "long"}, ], @@ -247,6 +257,8 @@ POST /_query { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 5, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "long"}, @@ -271,7 +283,7 @@ Work around this limitation by converting the field to single value with one of: * <> * <> -[source,console,esql-multivalued-fields-mv-into-null] +[source,console,esql-multivalued-fields-mv-min] ---- POST /_query { @@ -285,6 +297,8 @@ POST /_query { "took": 28, "is_partial": false, + "documents_found": 2, + "values_loaded": 5, "columns": [ { "name": "a", "type": "long"}, { "name": "b", "type": "long"}, diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ce1016cd01c74..c69a7b6c7c991 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -253,6 +253,7 @@ static TransportVersion def(int id) { public static final TransportVersion SPARSE_VECTOR_FIELD_PRUNING_OPTIONS_8_19 = def(8_841_0_58); public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59); public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60); + public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/common/Strings.java b/server/src/main/java/org/elasticsearch/common/Strings.java index 54deb0c26a96c..0f0004a43eb37 100644 --- a/server/src/main/java/org/elasticsearch/common/Strings.java +++ b/server/src/main/java/org/elasticsearch/common/Strings.java @@ -818,7 +818,7 @@ public static String toString(ChunkedToXContent chunkedToXContent, boolean prett * Allows to configure the params. * Allows to control whether the outputted json needs to be pretty printed and human readable. */ - private static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) { + public static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) { try { XContentBuilder builder = createBuilder(pretty, human); if (toXContent.isFragment()) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 3d67555c7d692..8070661865afc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2704,8 +2704,13 @@ protected static MapMatcher getProfileMatcher() { .entry("drivers", instanceOf(List.class)); } - protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) { + protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) { MapMatcher mapMatcher = matchesMap(); + if (includeDocumentsFound) { + // Older versions may not return documents_found and values_loaded. + mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0)); + mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0)); + } if (includeMetadata) { mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0)); } @@ -2720,7 +2725,7 @@ protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean in * Create empty result matcher from result, taking into account all metadata items. */ protected static MapMatcher getResultMatcher(Map result) { - return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial")); + return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"), result.containsKey("documents_found")); } /** diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java index c9fb8309a8e09..b6cb024534818 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java @@ -83,7 +83,11 @@ public int getPositionCount() { @Override public int getTotalValueCount() { - throw new UnsupportedOperationException("Composite block"); + int totalValueCount = 0; + for (Block b : blocks) { + totalValueCount += b.getTotalValueCount(); + } + return totalValueCount; } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index 2ab3c52205518..11280746fc592 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -437,6 +437,11 @@ public Map partitioningStrategies return partitioningStrategies; } + @Override + public long documentsFound() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 06ee933c90f00..ca24a86bc3087 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -46,6 +46,8 @@ import java.util.function.IntFunction; import java.util.function.Supplier; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; + /** * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator} * and outputs them to a new column. @@ -112,6 +114,7 @@ public record ShardContext(IndexReader reader, Supplier newSourceL private final BlockFactory blockFactory; private final Map readersBuilt = new TreeMap<>(); + private long valuesLoaded; int lastShard = -1; int lastSegment = -1; @@ -158,6 +161,9 @@ public int get(int i) { } } success = true; + for (Block b : blocks) { + valuesLoaded += b.getTotalValueCount(); + } return page.appendBlocks(blocks); } catch (IOException e) { throw new UncheckedIOException(e); @@ -548,7 +554,7 @@ public String toString() { @Override protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { - return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); } /** @@ -593,21 +599,34 @@ public static class Status extends AbstractPageMappingOperator.Status { ); private final Map readersBuilt; - - Status(Map readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + private final long valuesLoaded; + + Status( + Map readersBuilt, + long processNanos, + int pagesProcessed, + long rowsReceived, + long rowsEmitted, + long valuesLoaded + ) { super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); this.readersBuilt = readersBuilt; + this.valuesLoaded = valuesLoaded; } Status(StreamInput in) throws IOException { super(in); readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt); + valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeMap(readersBuilt, StreamOutput::writeVInt); + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + out.writeVLong(valuesLoaded); + } } @Override @@ -619,6 +638,11 @@ public Map readersBuilt() { return readersBuilt; } + @Override + public long valuesLoaded() { + return valuesLoaded; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -627,6 +651,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(e.getKey(), e.getValue()); } builder.endObject(); + builder.field("values_loaded", valuesLoaded); innerToXContent(builder); return builder.endObject(); } @@ -635,12 +660,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public boolean equals(Object o) { if (super.equals(o) == false) return false; Status status = (Status) o; - return readersBuilt.equals(status.readersBuilt); + return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), readersBuilt); + return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded); } @Override @@ -750,6 +775,4 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int return factory.newAggregateMetricDoubleBlockBuilder(count); } } - - // TODO tests that mix source loaded fields and doc values in the same block } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index ca0ca865ea4ed..03531932f3b19 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -77,7 +77,7 @@ public class Driver implements Releasable, Describable { private final DriverContext driverContext; private final Supplier description; private final List activeOperators; - private final List statusOfCompletedOperators = new ArrayList<>(); + private final List statusOfCompletedOperators = new ArrayList<>(); private final Releasable releasable; private final long statusNanos; @@ -343,7 +343,7 @@ private void closeEarlyFinishedOperators() { Iterator itr = finishedOperators.iterator(); while (itr.hasNext()) { Operator op = itr.next(); - statusOfCompletedOperators.add(new DriverStatus.OperatorStatus(op.toString(), op.status())); + statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status())); op.close(); itr.remove(); } @@ -570,7 +570,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. prev.iterations() + extraIterations, status, statusOfCompletedOperators, - activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList(), + activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(), sleeps ); }); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java new file mode 100644 index 0000000000000..538380fe7cd89 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Information returned when one of more {@link Driver}s is completed. + * @param documentsFound The number of documents found by all lucene queries performed by these drivers. + * @param valuesLoaded The number of values loaded from lucene for all drivers. This is + * roughly the number of documents times the number of + * fields per document. Except {@code null} values don't count. + * And multivalued fields count as many times as there are values. + * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but + * not free so this will be empty if the {@code profile} option was not set in + * the request. + */ +public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List collectedProfiles) implements Writeable { + + /** + * Completion info we use when we didn't properly complete any drivers. + * Usually this is returned with an error, but it's also used when receiving + * responses from very old nodes. + */ + public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of()); + + /** + * Build a {@link DriverCompletionInfo} for many drivers including their profile output. + */ + public static DriverCompletionInfo includingProfiles(List drivers) { + long documentsFound = 0; + long valuesLoaded = 0; + List collectedProfiles = new ArrayList<>(drivers.size()); + for (Driver d : drivers) { + DriverProfile p = d.profile(); + for (OperatorStatus o : p.operators()) { + documentsFound += o.documentsFound(); + valuesLoaded += o.valuesLoaded(); + } + collectedProfiles.add(p); + } + return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + } + + /** + * Build a {@link DriverCompletionInfo} for many drivers excluding their profile output. + */ + public static DriverCompletionInfo excludingProfiles(List drivers) { + long documentsFound = 0; + long valuesLoaded = 0; + for (Driver d : drivers) { + DriverStatus s = d.status(); + assert s.status() == DriverStatus.Status.DONE; + for (OperatorStatus o : s.completedOperators()) { + documentsFound += o.documentsFound(); + valuesLoaded += o.valuesLoaded(); + } + } + return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of()); + } + + public DriverCompletionInfo(StreamInput in) throws IOException { + this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(documentsFound); + out.writeVLong(valuesLoaded); + out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o)); + } + + public static class Accumulator { + private long documentsFound; + private long valuesLoaded; + private final List collectedProfiles = new ArrayList<>(); + + public void accumulate(DriverCompletionInfo info) { + this.documentsFound += info.documentsFound; + this.valuesLoaded += info.valuesLoaded; + this.collectedProfiles.addAll(info.collectedProfiles); + } + + public DriverCompletionInfo finish() { + return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + } + } + + public static class AtomicAccumulator { + private final AtomicLong documentsFound = new AtomicLong(); + private final AtomicLong valuesLoaded = new AtomicLong(); + private final List collectedProfiles = Collections.synchronizedList(new ArrayList<>()); + + public void accumulate(DriverCompletionInfo info) { + this.documentsFound.addAndGet(info.documentsFound); + this.valuesLoaded.addAndGet(info.valuesLoaded); + this.collectedProfiles.addAll(info.collectedProfiles); + } + + public DriverCompletionInfo finish() { + return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java index 6ce691aa1369d..9fb4c1ad74f14 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java @@ -63,7 +63,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject { /** * Status of each {@link Operator} in the driver when it finished. */ - private final List operators; + private final List operators; private final DriverSleeps sleeps; @@ -74,7 +74,7 @@ public DriverProfile( long tookNanos, long cpuNanos, long iterations, - List operators, + List operators, DriverSleeps sleeps ) { this.taskDescription = taskDescription; @@ -107,7 +107,7 @@ public DriverProfile(StreamInput in) throws IOException { this.cpuNanos = 0; this.iterations = 0; } - this.operators = in.readCollectionAsImmutableList(DriverStatus.OperatorStatus::new); + this.operators = in.readCollectionAsImmutableList(OperatorStatus::readFrom); this.sleeps = DriverSleeps.read(in); } @@ -176,7 +176,7 @@ public long iterations() { /** * Status of each {@link Operator} in the driver when it finished. */ - public List operators() { + public List operators() { return operators; } @@ -202,6 +202,8 @@ public Iterator toXContentChunked(ToXContent.Params params if (b.humanReadable()) { b.field("cpu_time", TimeValue.timeValueNanos(cpuNanos)); } + b.field("documents_found", operators.stream().mapToLong(OperatorStatus::documentsFound).sum()); + b.field("values_loaded", operators.stream().mapToLong(OperatorStatus::valuesLoaded).sum()); b.field("iterations", iterations); return b; }); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java index 22f32a1bef403..91ab480b843e0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java @@ -12,14 +12,11 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -121,11 +118,11 @@ public DriverStatus(StreamInput in) throws IOException { this.iterations = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; this.status = Status.read(in); if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { - this.completedOperators = in.readCollectionAsImmutableList(OperatorStatus::new); + this.completedOperators = in.readCollectionAsImmutableList(OperatorStatus::readFrom); } else { this.completedOperators = List.of(); } - this.activeOperators = in.readCollectionAsImmutableList(OperatorStatus::new); + this.activeOperators = in.readCollectionAsImmutableList(OperatorStatus::readFrom); this.sleeps = DriverSleeps.read(in); } @@ -241,6 +238,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (builder.humanReadable()) { builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos)); } + builder.field("documents_found", documentsFound()); + builder.field("values_loaded", valuesLoaded()); builder.field("iterations", iterations); builder.field("status", status, params); builder.startArray("completed_operators"); @@ -296,71 +295,31 @@ public String toString() { } /** - * Status of an {@link Operator}. + * The number of documents found by this driver. */ - public static class OperatorStatus implements Writeable, ToXContentObject { - /** - * String representation of the {@link Operator}. Literally just the - * {@link Object#toString()} of it. - */ - private final String operator; - /** - * Status as reported by the {@link Operator}. - */ - @Nullable - private final Operator.Status status; - - public OperatorStatus(String operator, Operator.Status status) { - this.operator = operator; - this.status = status; + public long documentsFound() { + long documentsFound = 0; + for (OperatorStatus s : completedOperators) { + documentsFound += s.documentsFound(); } - - OperatorStatus(StreamInput in) throws IOException { - operator = in.readString(); - status = in.readOptionalNamedWriteable(Operator.Status.class); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(operator); - out.writeOptionalNamedWriteable(status != null && VersionedNamedWriteable.shouldSerialize(out, status) ? status : null); - } - - public String operator() { - return operator; - } - - public Operator.Status status() { - return status; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("operator", operator); - if (status != null) { - builder.field("status", status); - } - return builder.endObject(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OperatorStatus that = (OperatorStatus) o; - return operator.equals(that.operator) && Objects.equals(status, that.status); + for (OperatorStatus s : activeOperators) { + documentsFound += s.documentsFound(); } + return documentsFound; + } - @Override - public int hashCode() { - return Objects.hash(operator, status); + /** + * The number of values loaded by this operator. + */ + public long valuesLoaded() { + long valuesLoaded = 0; + for (OperatorStatus s : completedOperators) { + valuesLoaded += s.valuesLoaded(); } - - @Override - public String toString() { - return Strings.toString(this); + for (OperatorStatus s : activeOperators) { + valuesLoaded += s.valuesLoaded(); } + return valuesLoaded; } public enum Status implements Writeable, ToXContentFragment { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java index 46e85bec693e8..0a382a40c809c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java @@ -105,5 +105,21 @@ interface OperatorFactory extends Describable { /** * Status of an {@link Operator} to be returned by the tasks API. */ - interface Status extends ToXContentObject, VersionedNamedWriteable {} + interface Status extends ToXContentObject, VersionedNamedWriteable { + /** + * The number of documents found by this operator. Most operators + * don't find documents and will return {@code 0} here. + */ + default long documentsFound() { + return 0; + } + + /** + * The number of values loaded by this operator. Most operators + * don't load values and will return {@code 0} here. + */ + default long valuesLoaded() { + return 0; + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java new file mode 100644 index 0000000000000..dbabc4fb5e972 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OperatorStatus.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Status of an {@link Operator}. + * + * @param operator String representation of the {@link Operator}. + * @param status Status as reported by the {@link Operator}. + */ +public record OperatorStatus(String operator, @Nullable Operator.Status status) implements Writeable, ToXContentObject { + + public static OperatorStatus readFrom(StreamInput in) throws IOException { + return new OperatorStatus(in.readString(), in.readOptionalNamedWriteable(Operator.Status.class)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(operator); + out.writeOptionalNamedWriteable( + status != null && out.getTransportVersion().onOrAfter(status.getMinimalSupportedVersion()) ? status : null + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("operator", operator); + if (status != null) { + builder.field("status", status); + } + return builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + /** + * The number of documents found by this operator. Most operators + * don't find documents and will return {@code 0} here. + */ + public long documentsFound() { + if (status == null) { + return 0; + } + return status.documentsFound(); + } + + /** + * The number of values loaded by this operator. Most operators + * don't load values and will return {@code 0} here. + */ + public long valuesLoaded() { + if (status == null) { + return 0; + } + return status.valuesLoaded(); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java index f3773e0bb9803..964e61d69e0db 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/CompositeBlockTests.java @@ -12,6 +12,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; @@ -23,19 +24,29 @@ public class CompositeBlockTests extends ComputeTestCase { ) .toList(); - public static CompositeBlock randomCompositeBlock(BlockFactory blockFactory, int numBlocks, int positionCount) { + public static CompositeBlock randomCompositeBlock( + BlockFactory blockFactory, + Supplier randomElementType, + boolean nullAllowed, + int numBlocks, + int positionCount, + int minValuesPerPosition, + int maxValuesPerPosition, + int minDupsPerPosition, + int maxDupsPerPosition + ) { Block[] blocks = new Block[numBlocks]; for (int b = 0; b < numBlocks; b++) { - ElementType elementType = randomFrom(supportedSubElementTypes); + ElementType elementType = randomElementType.get(); blocks[b] = RandomBlock.randomBlock( blockFactory, elementType, positionCount, - elementType == ElementType.NULL || randomBoolean(), - 0, - between(1, 2), - 0, - between(1, 2) + nullAllowed && (elementType == ElementType.NULL || randomBoolean()), + minValuesPerPosition, + maxValuesPerPosition, + minDupsPerPosition, + maxDupsPerPosition ).block(); } return new CompositeBlock(blocks); @@ -45,7 +56,19 @@ public void testFilter() { final BlockFactory blockFactory = blockFactory(); int numBlocks = randomIntBetween(1, 1000); int positionCount = randomIntBetween(1, 1000); - try (CompositeBlock origComposite = randomCompositeBlock(blockFactory, numBlocks, positionCount)) { + try ( + CompositeBlock origComposite = randomCompositeBlock( + blockFactory, + () -> randomFrom(supportedSubElementTypes), + true, + numBlocks, + positionCount, + 0, + between(1, 2), + 0, + between(1, 2) + ) + ) { int[] selected = new int[randomIntBetween(0, positionCount * 3)]; for (int i = 0; i < selected.length; i++) { selected[i] = randomIntBetween(0, positionCount - 1); @@ -61,4 +84,25 @@ public void testFilter() { } } } + + public void testTotalValueCount() { + final BlockFactory blockFactory = blockFactory(); + int numBlocks = randomIntBetween(1, 1000); + int positionCount = randomIntBetween(1, 1000); + try ( + CompositeBlock composite = randomCompositeBlock( + blockFactory, + () -> randomValueOtherThan(ElementType.NULL, () -> randomFrom(supportedSubElementTypes)), + false, + numBlocks, + positionCount, + 1, + 1, + 0, + 0 + ) + ) { + assertThat(composite.getTotalValueCount(), equalTo(numBlocks * positionCount)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java index 4303137f74bb3..af1463b88c62c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java @@ -20,7 +20,7 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { public static ValuesSourceReaderOperator.Status simple() { - return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222); + return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000); } public static String simpleToJson() { @@ -29,6 +29,7 @@ public static String simpleToJson() { "readers_built" : { "ReaderType" : 3 }, + "values_loaded" : 1000, "process_nanos" : 1022323, "process_time" : "1ms", "pages_processed" : 123, @@ -53,6 +54,7 @@ public ValuesSourceReaderOperator.Status createTestInstance() { randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong() ); } @@ -73,14 +75,16 @@ protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOpe int pagesProcessed = instance.pagesProcessed(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 4)) { + long valuesLoaded = instance.valuesLoaded(); + switch (between(0, 5)) { case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt); case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index a39aa10af5f31..3a0db28562ebe 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -34,8 +34,8 @@ public void testToXContent() { 10000, 12, List.of( - new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()), - new DriverStatus.OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple()) + new OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()), + new OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple()) ), new DriverSleeps( Map.of("driver time", 1L), @@ -54,6 +54,8 @@ public void testToXContent() { "took_time" : "10micros", "cpu_nanos" : 10000, "cpu_time" : "10micros", + "documents_found" : 222, + "values_loaded" : 1000, "iterations" : 12, "operators" : [ { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index 83deb57a3ba7c..f8e4dc8d86abb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -39,10 +39,10 @@ public void testToXContent() { 55L, DriverStatus.Status.RUNNING, List.of( - new DriverStatus.OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()), - new DriverStatus.OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple()) + new OperatorStatus("LuceneSource", LuceneSourceOperatorStatusTests.simple()), + new OperatorStatus("ValuesSourceReader", ValuesSourceReaderOperatorStatusTests.simple()) ), - List.of(new DriverStatus.OperatorStatus("ExchangeSink", ExchangeSinkOperatorStatusTests.simple())), + List.of(new OperatorStatus("ExchangeSink", ExchangeSinkOperatorStatusTests.simple())), new DriverSleeps( Map.of("driver time", 1L), List.of(new DriverSleeps.Sleep("driver time", 1, 1)), @@ -57,6 +57,8 @@ public void testToXContent() { "last_updated" : "1973-11-29T09:27:23.214Z", "cpu_nanos" : 123213, "cpu_time" : "123.2micros", + "documents_found" : 222, + "values_loaded" : 1000, "iterations" : 55, "status" : "running", "completed_operators" : [ @@ -140,18 +142,18 @@ private DriverStatus.Status randomStatus() { return randomFrom(DriverStatus.Status.values()); } - static List randomOperatorStatuses() { + static List randomOperatorStatuses() { return randomList(0, 5, DriverStatusTests::randomOperatorStatus); } - private static DriverStatus.OperatorStatus randomOperatorStatus() { + private static OperatorStatus randomOperatorStatus() { Supplier status = randomFrom( new LuceneSourceOperatorStatusTests()::createTestInstance, new ValuesSourceReaderOperatorStatusTests()::createTestInstance, new ExchangeSinkOperatorStatusTests()::createTestInstance, () -> null ); - return new DriverStatus.OperatorStatus(randomAlphaOfLength(3), status.get()); + return new OperatorStatus(randomAlphaOfLength(3), status.get()); } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 791f5dacdce64..4ea413e4fcd3b 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -159,7 +159,11 @@ private Map runEsql(RestEsqlTestCase.RequestObjectBuilder reques } private void assertResultMap(boolean includeCCSMetadata, Map result, C columns, V values, boolean remoteOnly) { - MapMatcher mapMatcher = getResultMatcher(ccsMetadataAvailable(), result.containsKey("is_partial")); + MapMatcher mapMatcher = getResultMatcher( + ccsMetadataAvailable(), + result.containsKey("is_partial"), + result.containsKey("documents_found") + ); if (includeCCSMetadata) { mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index f8d3109182092..69d746e7abbe9 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -668,7 +668,9 @@ public static MapMatcher commonProfile() { .entry("cpu_nanos", greaterThan(0L)) .entry("took_nanos", greaterThan(0L)) .entry("operators", instanceOf(List.class)) - .entry("sleeps", matchesMap().extraOk()); + .entry("sleeps", matchesMap().extraOk()) + .entry("documents_found", greaterThanOrEqualTo(0)) + .entry("values_loaded", greaterThanOrEqualTo(0)); } /** @@ -698,7 +700,8 @@ private String checkOperatorProfile(Map o) { .entry("processing_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")) .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD")); - case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); + case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0)) + .entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("rows_received", greaterThan(0)) .entry("rows_emitted", greaterThan(0)) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java index c289d7b7cdd05..46b692b04ffe5 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -108,8 +108,7 @@ private void testQuery(Double percent, String query, int documentsFound, boolean Map result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC); assertMap( result, - matchesMap() - // .entry("documents_found", documentsFound) Backport incoming maybe + matchesMap().entry("documents_found", documentsFound) .entry( "profile", matchesMap().entry("drivers", instanceOf(List.class)) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 28e4e81e1ab9e..e21594e0c729b 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -59,6 +59,7 @@ import static java.util.Map.entry; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC; @@ -270,12 +271,19 @@ public static RequestObjectBuilder jsonBuilder() throws IOException { public void testGetAnswer() throws IOException { Map answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2")); - assertEquals(4, answer.size()); + assertEquals(6, answer.size()); assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0)); Map colA = Map.of("name", "a", "type", "integer"); Map colB = Map.of("name", "b", "type", "integer"); - assertEquals(List.of(colA, colB), answer.get("columns")); - assertEquals(List.of(List.of(1, 2)), answer.get("values")); + assertMap( + answer, + matchesMap().entry("took", greaterThanOrEqualTo(0)) + .entry("is_partial", any(Boolean.class)) + .entry("documents_found", 0) + .entry("values_loaded", 0) + .entry("columns", List.of(colA, colB)) + .entry("values", List.of(List.of(1, 2))) + ); } public void testUseUnknownIndex() throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index c4da0bf32ef96..19157b636dffc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.compute.operator.DriverStatus; +import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; @@ -350,7 +350,7 @@ public void testProfile() { assertNotNull(profile); List drivers = profile.drivers(); assertThat(drivers.size(), greaterThanOrEqualTo(2)); - List enrichOperators = drivers.stream() + List enrichOperators = drivers.stream() .flatMap(d -> d.operators().stream()) .filter(status -> status.operator().startsWith("EnrichOperator")) .toList(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 595c251231e5b..75c89c17a657e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; @@ -104,7 +105,7 @@ public void testTaskContents() throws Exception { DriverStatus status = (DriverStatus) task.status(); assertThat(status.sessionId(), not(emptyOrNullString())); String taskDescription = status.taskDescription(); - for (DriverStatus.OperatorStatus o : status.activeOperators()) { + for (OperatorStatus o : status.activeOperators()) { logger.info("status {}", o); if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) { assertThat(taskDescription, equalTo("data")); @@ -134,6 +135,7 @@ public void testTaskContents() throws Exception { matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1)) ); assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); + assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L)); valuesSourceReaders++; continue; } @@ -180,6 +182,19 @@ public void testTaskContents() throws Exception { \\_ProjectOperator[projection = [0]] \\_LimitOperator[limit = 1000] \\_OutputOperator[columns = [sum(pause_me)]]""")); + + for (TaskInfo task : dataTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), greaterThan(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), greaterThan(0L)); + } + for (TaskInfo task : nodeReduceTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L)); + } + for (TaskInfo task : coordinatorTasks(foundTasks)) { + assertThat(((DriverStatus) task.status()).documentsFound(), equalTo(0L)); + assertThat(((DriverStatus) task.status()).valuesLoaded(), equalTo(0L)); + } } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index a7752df411c19..6d418df3c86cc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -864,6 +864,12 @@ public enum Cap { */ REPORT_ORIGINAL_TYPES, + /** + * Are the {@code documents_found} and {@code values_loaded} fields available + * in the response and profile? + */ + DOCUMENTS_FOUND_AND_VALUES_LOADED, + /** * When creating constant null blocks in {@link org.elasticsearch.compute.lucene.ValuesSourceReaderOperator}, we also handed off * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index fedabe0dd2756..e11b16ebc5335 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -34,6 +34,8 @@ import java.util.Objects; import java.util.Optional; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; + public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse implements ChunkedToXContentObject, @@ -46,6 +48,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action. private final List columns; private final List pages; + private final long documentsFound; + private final long valuesLoaded; private final Profile profile; private final boolean columnar; private final String asyncExecutionId; @@ -57,6 +61,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action. public EsqlQueryResponse( List columns, List pages, + long documentsFound, + long valuesLoaded, @Nullable Profile profile, boolean columnar, @Nullable String asyncExecutionId, @@ -66,6 +72,8 @@ public EsqlQueryResponse( ) { this.columns = columns; this.pages = pages; + this.valuesLoaded = valuesLoaded; + this.documentsFound = documentsFound; this.profile = profile; this.columnar = columnar; this.asyncExecutionId = asyncExecutionId; @@ -77,12 +85,14 @@ public EsqlQueryResponse( public EsqlQueryResponse( List columns, List pages, + long documentsFound, + long valuesLoaded, @Nullable Profile profile, boolean columnar, boolean isAsync, EsqlExecutionInfo executionInfo ) { - this(columns, pages, profile, columnar, null, false, isAsync, executionInfo); + this(columns, pages, documentsFound, valuesLoaded, profile, columnar, null, false, isAsync, executionInfo); } /** @@ -108,6 +118,8 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { } List columns = in.readCollectionAsList(ColumnInfoImpl::new); List pages = in.readCollectionAsList(Page::new); + long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; + long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { profile = in.readOptionalWriteable(Profile::new); } @@ -116,7 +128,18 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { executionInfo = in.readOptionalWriteable(EsqlExecutionInfo::new); } - return new EsqlQueryResponse(columns, pages, profile, columnar, asyncExecutionId, isRunning, isAsync, executionInfo); + return new EsqlQueryResponse( + columns, + pages, + documentsFound, + valuesLoaded, + profile, + columnar, + asyncExecutionId, + isRunning, + isAsync, + executionInfo + ); } @Override @@ -128,6 +151,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeCollection(columns); out.writeCollection(pages); + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + out.writeVLong(documentsFound); + out.writeVLong(valuesLoaded); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeOptionalWriteable(profile); } @@ -160,6 +187,14 @@ public Iterator column(int columnIndex) { return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages); } + public long documentsFound() { + return documentsFound; + } + + public long valuesLoaded() { + return valuesLoaded; + } + public Profile profile() { return profile; } @@ -200,6 +235,8 @@ public Iterator toXContentChunked(ToXContent.Params params } b.field("is_running", isRunning); } + b.field("documents_found", documentsFound); + b.field("values_loaded", valuesLoaded); if (executionInfo != null) { long tookInMillis = executionInfo.overallTook() == null ? executionInfo.tookSoFar().millis() @@ -261,6 +298,8 @@ public boolean equals(Object o) { && Objects.equals(isRunning, that.isRunning) && columnar == that.columnar && Iterators.equals(values(), that.values(), (row1, row2) -> Iterators.equals(row1, row2, Objects::equals)) + && documentsFound == that.documentsFound + && valuesLoaded == that.valuesLoaded && Objects.equals(profile, that.profile) && Objects.equals(executionInfo, that.executionInfo); } @@ -271,8 +310,11 @@ public int hashCode() { asyncExecutionId, isRunning, columns, - Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)), columnar, + Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)), + documentsFound, + valuesLoaded, + profile, executionInfo ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java index f896a25317102..4d7565a5d7863 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java @@ -44,6 +44,7 @@ public EsqlExecutionInfo executionInfo() { @Override public EsqlQueryResponse getCurrentResult() { - return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo); + // TODO it'd be nice to have the number of documents we've read from completed drivers here + return new EsqlQueryResponse(List.of(), List.of(), 0, 0, null, false, getExecutionId().getEncoded(), true, true, executionInfo); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 192bebe3d8551..cd4ff13700515 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -11,7 +11,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -75,7 +75,7 @@ void startComputeOnRemoteCluster( RemoteCluster cluster, Runnable cancelQueryOnFailure, EsqlExecutionInfo executionInfo, - ActionListener> listener + ActionListener listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); @@ -93,7 +93,7 @@ void startComputeOnRemoteCluster( receivedResults ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SKIPPED, e ); - l.onResponse(List.of()); + l.onResponse(DriverCompletionInfo.EMPTY); } else { l.onFailure(e); } @@ -121,15 +121,15 @@ void startComputeOnRemoteCluster( onGroupFailure = computeService.cancelQueryOnFailure(groupTask); l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); } - try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> { + try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(completionInfo -> { updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); - return profiles; + return completionInfo; }))) { var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); final ActionListener clusterListener = computeListener.acquireCompute().map(r -> { finalResponse.set(r); - return r.getProfiles(); + return r.getCompletionInfo(); }); transportService.sendChildRequest( cluster.connection, @@ -290,7 +290,7 @@ void runComputeOnRemoteCluster( cancelQueryOnFailure, computeListener.acquireCompute().map(r -> { finalResponse.set(r); - return r.getProfiles(); + return r.getCompletionInfo(); }) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index c8b8e84fd2478..856f131cb5645 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -10,15 +10,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.compute.EsqlRefCountingListener; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.ResponseHeadersCollector; import org.elasticsearch.core.Releasable; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - /** * A variant of {@link RefCountingListener} with the following differences: * 1. Automatically cancels sub tasks on failure (via runOnTaskFailure) @@ -27,19 +23,18 @@ * 4. Collects failures and returns the most appropriate exception to the caller. */ final class ComputeListener implements Releasable { + private final DriverCompletionInfo.AtomicAccumulator completionInfoAccumulator = new DriverCompletionInfo.AtomicAccumulator(); private final EsqlRefCountingListener refs; - private final List collectedProfiles; private final ResponseHeadersCollector responseHeaders; private final Runnable runOnFailure; - ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener> delegate) { + ComputeListener(ThreadPool threadPool, Runnable runOnFailure, ActionListener delegate) { this.runOnFailure = runOnFailure; this.responseHeaders = new ResponseHeadersCollector(threadPool.getThreadContext()); - this.collectedProfiles = Collections.synchronizedList(new ArrayList<>()); // listener that executes after all the sub-listeners refs (created via acquireCompute) have completed this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> { responseHeaders.finish(); - delegate.onResponse(collectedProfiles.stream().toList()); + delegate.onResponse(completionInfoAccumulator.finish()); })); } @@ -60,13 +55,11 @@ ActionListener acquireAvoid() { /** * Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute */ - ActionListener> acquireCompute() { + ActionListener acquireCompute() { final ActionListener delegate = acquireAvoid(); - return ActionListener.wrap(profiles -> { + return ActionListener.wrap(info -> { responseHeaders.collect(); - if (profiles != null && profiles.isEmpty() == false) { - collectedProfiles.addAll(profiles); - } + completionInfoAccumulator.accumulate(info); delegate.onResponse(null); }, e -> { responseHeaders.collect(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java index 86008c5a6b235..7a236869dfcbf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.TimeValue; import org.elasticsearch.transport.TransportResponse; @@ -18,11 +19,13 @@ import java.io.IOException; import java.util.List; +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; + /** * The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest} */ final class ComputeResponse extends TransportResponse { - private final List profiles; + private final DriverCompletionInfo completionInfo; // for use with ClusterComputeRequests (cross-cluster searches) private final TimeValue took; // overall took time for a specific cluster in a cross-cluster search @@ -32,12 +35,12 @@ final class ComputeResponse extends TransportResponse { public final int failedShards; public final List failures; - ComputeResponse(List profiles) { - this(profiles, null, null, null, null, null, List.of()); + ComputeResponse(DriverCompletionInfo completionInfo) { + this(completionInfo, null, null, null, null, null, List.of()); } ComputeResponse( - List profiles, + DriverCompletionInfo completionInfo, TimeValue took, Integer totalShards, Integer successfulShards, @@ -45,7 +48,7 @@ final class ComputeResponse extends TransportResponse { Integer failedShards, List failures ) { - this.profiles = profiles; + this.completionInfo = completionInfo; this.took = took; this.totalShards = totalShards == null ? 0 : totalShards.intValue(); this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue(); @@ -56,14 +59,16 @@ final class ComputeResponse extends TransportResponse { ComputeResponse(StreamInput in) throws IOException { super(in); - if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { + if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + completionInfo = new DriverCompletionInfo(in); + } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { if (in.readBoolean()) { - profiles = in.readCollectionAsImmutableList(DriverProfile::new); + completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new)); } else { - profiles = null; + completionInfo = DriverCompletionInfo.EMPTY; } } else { - profiles = null; + completionInfo = DriverCompletionInfo.EMPTY; } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { this.took = in.readOptionalTimeValue(); @@ -87,13 +92,11 @@ final class ComputeResponse extends TransportResponse { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { - if (profiles == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeCollection(profiles); - } + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + completionInfo.writeTo(out); + } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { + out.writeBoolean(true); + out.writeCollection(completionInfo.collectedProfiles()); } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeOptionalTimeValue(took); @@ -107,8 +110,8 @@ public void writeTo(StreamOutput out) throws IOException { } } - public List getProfiles() { - return profiles; + public DriverCompletionInfo getCompletionInfo() { + return completionInfo; } public TimeValue getTook() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 5d5c6fc3f060d..a05258ebde153 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -19,7 +19,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.Driver; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.exchange.ExchangeService; @@ -184,10 +184,14 @@ public void execute( ); updateShardCountForCoordinatorOnlyQuery(execInfo); try ( - var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); - return new Result(physicalPlan.output(), collectedPages, profiles, execInfo); - })) + var computeListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + listener.map(completionInfo -> { + updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); + return new Result(physicalPlan.output(), collectedPages, completionInfo, execInfo); + }) + ) ) { runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute()); return; @@ -216,10 +220,16 @@ public void execute( ); listener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); exchangeService.addExchangeSourceHandler(sessionId, exchangeSource); - try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements - return new Result(outputAttributes, collectedPages, profiles, execInfo); - }))) { + try ( + var computeListener = new ComputeListener( + transportService.getThreadPool(), + cancelQueryOnFailure, + listener.map(completionInfo -> { + execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements + return new Result(outputAttributes, collectedPages, completionInfo, execInfo); + }) + ) + ) { try (Releasable ignored = exchangeSource.addEmptySink()) { // run compute on the coordinator final AtomicBoolean localClusterWasInterrupted = new AtomicBoolean(); @@ -227,7 +237,7 @@ public void execute( var localListener = new ComputeListener( transportService.getThreadPool(), cancelQueryOnFailure, - computeListener.acquireCompute().delegateFailure((l, profiles) -> { + computeListener.acquireCompute().delegateFailure((l, completionInfo) -> { if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> { var tookTime = execInfo.tookSoFar(); @@ -242,7 +252,7 @@ public void execute( return builder.build(); }); } - l.onResponse(profiles); + l.onResponse(completionInfo); }) ) ) { @@ -285,7 +295,7 @@ public void execute( .setFailures(r.failures) .build() ); - dataNodesListener.onResponse(r.getProfiles()); + dataNodesListener.onResponse(r.getCompletionInfo()); }, e -> { if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) { execInfo.swapCluster( @@ -294,7 +304,7 @@ public void execute( EsqlExecutionInfo.Cluster.Status.PARTIAL ).setFailures(List.of(new ShardSearchFailure(e))).build() ); - dataNodesListener.onResponse(List.of()); + dataNodesListener.onResponse(DriverCompletionInfo.EMPTY); } else { dataNodesListener.onFailure(e); } @@ -373,7 +383,7 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn } } - void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener> listener) { + void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts())); List contexts = new ArrayList<>(context.searchContexts().size()); for (int i = 0; i < context.searchContexts().size(); i++) { @@ -437,9 +447,9 @@ public SourceProvider createSourceProvider() { } ActionListener listenerCollectingStatus = listener.map(ignored -> { if (context.configuration().profile()) { - return drivers.stream().map(Driver::profile).toList(); + return DriverCompletionInfo.includingProfiles(drivers); } else { - return List.of(); + return DriverCompletionInfo.excludingProfiles(drivers); } }); listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index a31a22ea3e327..f6216684a971d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; @@ -191,7 +191,7 @@ protected void sendRequest( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> { nodeResponseRef.set(r); - return r.profiles(); + return r.completionInfo(); }), DataNodeComputeResponse::new, esqlExecutor) ); final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection); @@ -256,15 +256,15 @@ private void runBatch(int startBatchIndex) { final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size()); final AtomicInteger pagesProduced = new AtomicInteger(); List shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex); - ActionListener> batchListener = new ActionListener<>() { - final ActionListener> ref = computeListener.acquireCompute(); + ActionListener batchListener = new ActionListener<>() { + final ActionListener ref = computeListener.acquireCompute(); @Override - public void onResponse(List result) { + public void onResponse(DriverCompletionInfo info) { try { onBatchCompleted(endBatchIndex); } finally { - ref.onResponse(result); + ref.onResponse(info); } } @@ -274,7 +274,7 @@ public void onFailure(Exception e) { for (ShardId shardId : shardIds) { addShardLevelFailure(shardId, e); } - onResponse(List.of()); + onResponse(DriverCompletionInfo.EMPTY); } else { // TODO: add these to fatal failures so we can continue processing other shards. try { @@ -288,7 +288,7 @@ public void onFailure(Exception e) { acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME); if (searchContexts.isEmpty()) { - batchListener.onResponse(List.of()); + batchListener.onResponse(DriverCompletionInfo.EMPTY); return; } var computeContext = new ComputeContext( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java index b1eb41ffc99d8..2477b515f66b2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java @@ -9,52 +9,63 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.Objects; + +import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; /** * The compute result of {@link DataNodeRequest} */ final class DataNodeComputeResponse extends TransportResponse { - private final List profiles; + private final DriverCompletionInfo completionInfo; private final Map shardLevelFailures; - DataNodeComputeResponse(List profiles, Map shardLevelFailures) { - this.profiles = profiles; + DataNodeComputeResponse(DriverCompletionInfo completionInfo, Map shardLevelFailures) { + this.completionInfo = completionInfo; this.shardLevelFailures = shardLevelFailures; } DataNodeComputeResponse(StreamInput in) throws IOException { + if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + this.completionInfo = new DriverCompletionInfo(in); + this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); + return; + } if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) { - this.profiles = in.readCollectionAsImmutableList(DriverProfile::new); + this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new)); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); - } else { - this.profiles = Objects.requireNonNullElse(new ComputeResponse(in).getProfiles(), List.of()); - this.shardLevelFailures = Map.of(); + return; } + this.completionInfo = new ComputeResponse(in).getCompletionInfo(); + this.shardLevelFailures = Map.of(); } @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { + completionInfo.writeTo(out); + out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); + return; + } if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) { - out.writeCollection(profiles, (o, v) -> v.writeTo(o)); + out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o)); out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); - } else { - if (shardLevelFailures.isEmpty() == false) { - throw new IllegalStateException("shard level failures are not supported in old versions"); - } - new ComputeResponse(profiles).writeTo(out); + return; + } + if (shardLevelFailures.isEmpty() == false) { + throw new IllegalStateException("shard level failures are not supported in old versions"); } + new ComputeResponse(completionInfo).writeTo(out); } - List profiles() { - return profiles; + public DriverCompletionInfo completionInfo() { + return completionInfo; } Map shardLevelFailures() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index d118221ec026a..ea100ba76c51d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; @@ -130,8 +130,8 @@ final void startComputeOnDataNodes(Set concreteIndices, Runnable runOnTa transportService.getThreadPool(), runOnTaskFailure, listener.map( - profiles -> new ComputeResponse( - profiles, + completionInfo -> new ComputeResponse( + completionInfo, timeValueNanos(System.nanoTime() - startTimeInNanos), targetShards.totalShards(), targetShards.totalShards() - shardFailures.size() - skippedShards.get(), @@ -263,15 +263,15 @@ private List selectFailures() { } private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) { - final ActionListener> listener = computeListener.acquireCompute(); + final ActionListener listener = computeListener.acquireCompute(); sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() { - void onAfter(List profiles) { + void onAfter(DriverCompletionInfo info) { nodePermits.get(request.node).release(); if (concurrentRequests != null) { concurrentRequests.release(); } trySendingRequestsForPendingShards(targetShards, computeListener); - listener.onResponse(profiles); + listener.onResponse(info); } @Override @@ -287,7 +287,7 @@ public void onResponse(DataNodeComputeResponse response) { trackShardLevelFailure(shardId, false, e.getValue()); pendingShardIds.add(shardId); } - onAfter(response.profiles()); + onAfter(response.completionInfo()); } @Override @@ -296,7 +296,7 @@ public void onFailure(Exception e, boolean receivedData) { trackShardLevelFailure(shardId, receivedData, e); pendingShardIds.add(shardId); } - onAfter(List.of()); + onAfter(DriverCompletionInfo.EMPTY); } @Override @@ -305,7 +305,7 @@ public void onSkip() { if (rootTask.isCancelled()) { onFailure(new TaskCancelledException("null"), true); } else { - onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } } }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 5a5e858fb7c3f..12bc18e1153c6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -333,7 +333,9 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config } return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); }).toList(); - EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null; + EsqlQueryResponse.Profile profile = configuration.profile() + ? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles()) + : null; threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { String asyncExecutionId = asyncTask.getExecutionId().getEncoded(); @@ -341,6 +343,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config return new EsqlQueryResponse( columns, result.pages(), + result.completionInfo().documentsFound(), + result.completionInfo().valuesLoaded(), profile, request.columnar(), asyncExecutionId, @@ -349,7 +353,16 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config result.executionInfo() ); } - return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async(), result.executionInfo()); + return new EsqlQueryResponse( + columns, + result.pages(), + result.completionInfo().documentsFound(), + result.completionInfo().valuesLoaded(), + profile, + request.columnar(), + request.async(), + result.executionInfo() + ); } /** @@ -401,6 +414,8 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) { return new EsqlQueryResponse( List.of(), List.of(), + 0, + 0, null, false, asyncExecutionId, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 17b27d7616c16..5f7066369524a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; @@ -78,7 +79,7 @@ abstract static class CssPartialErrorsActionListener implements ActionListener 0) { // code-path to execute subplans - executeSubPlan(new ArrayList<>(), physicalPlan, iterator, executionInfo, runner, listener); + executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener); } else { // execute main plan runner.run(physicalPlan, listener); @@ -245,7 +245,7 @@ private void executeSubPlans( } private void executeSubPlan( - List profileAccumulator, + DriverCompletionInfo.Accumulator completionInfoAccumulator, PhysicalPlan plan, Iterator subPlanIterator, EsqlExecutionInfo executionInfo, @@ -256,7 +256,7 @@ private void executeSubPlan( runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> { try { - profileAccumulator.addAll(result.profiles()); + completionInfoAccumulator.accumulate(result.completionInfo()); LocalRelation resultWrapper = resultToPlan(tuple.logical, result); // replace the original logical plan with the backing result @@ -271,12 +271,14 @@ private void executeSubPlan( }); if (subPlanIterator.hasNext() == false) { runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { - profileAccumulator.addAll(finalResult.profiles()); - finalListener.onResponse(new Result(finalResult.schema(), finalResult.pages(), profileAccumulator, executionInfo)); + completionInfoAccumulator.accumulate(finalResult.completionInfo()); + finalListener.onResponse( + new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo) + ); })); } else { // continue executing the subplans - executeSubPlan(profileAccumulator, newPlan, subPlanIterator, executionInfo, runner, next); + executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next); } } finally { Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java index 4f90893c759b8..5da8a53e53f15 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java @@ -9,7 +9,7 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.core.expression.Attribute; @@ -23,10 +23,12 @@ * that was run. Each {@link Page} contains a {@link Block} of values for each * attribute in this list. * @param pages Actual values produced by running the ESQL. - * @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These - * are quite cheap to build, so we build them for all ESQL runs, regardless of if - * users have asked for them. But we only include them in the results if users ask - * for them. + * @param completionInfo Information collected from drivers after they've been completed. * @param executionInfo Metadata about the execution of this query. Used for cross cluster queries. */ -public record Result(List schema, List pages, List profiles, @Nullable EsqlExecutionInfo executionInfo) {} +public record Result( + List schema, + List pages, + DriverCompletionInfo completionInfo, + @Nullable EsqlExecutionInfo executionInfo +) {} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index d4893194345f2..3c73f95f1ec83 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverRunner; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; @@ -705,6 +706,9 @@ protected void start(Driver driver, ActionListener driverListener) { } }; listener = ActionListener.releaseAfter(listener, () -> Releasables.close(drivers)); - runner.runToCompletion(drivers, listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, List.of(), null))); + runner.runToCompletion( + drivers, + listener.map(ignore -> new Result(physicalPlan.output(), collectedPages, DriverCompletionInfo.EMPTY, null)) + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index 286aa8fad4bbe..091784210b47c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -14,8 +14,8 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; -import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.List; @@ -58,7 +58,7 @@ private DriverProfile randomDriverProfile() { ); } - private DriverStatus.OperatorStatus randomOperatorStatus() { + private OperatorStatus randomOperatorStatus() { String name = randomAlphaOfLength(4); Operator.Status status = randomBoolean() ? null @@ -68,6 +68,6 @@ private DriverStatus.OperatorStatus randomOperatorStatus() { randomNonNegativeLong(), randomNonNegativeLong() ); - return new DriverStatus.OperatorStatus(name, status); + return new OperatorStatus(name, status); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index d790b6e465cb1..0499b901b6e13 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; -import org.elasticsearch.compute.operator.DriverStatus; +import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.test.TestBlockFactory; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -42,6 +42,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.InstantiatingObjectParser; import org.elasticsearch.xcontent.ObjectParser; @@ -130,7 +131,18 @@ EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profil id = randomAlphaOfLengthBetween(1, 16); isRunning = randomBoolean(); } - return new EsqlQueryResponse(columns, values, profile, columnar, id, isRunning, async, createExecutionInfo()); + return new EsqlQueryResponse( + columns, + values, + randomNonNegativeLong(), + randomNonNegativeLong(), + profile, + columnar, + id, + isRunning, + async, + createExecutionInfo() + ); } EsqlExecutionInfo createExecutionInfo() { @@ -265,58 +277,41 @@ protected EsqlQueryResponse mutateInstance(EsqlQueryResponse instance) { allNull = false; } } - return switch (allNull ? between(0, 2) : between(0, 3)) { + List columns = instance.columns(); + List pages = deepCopyOfPages(instance); + long documentsFound = instance.documentsFound(); + long valuesLoaded = instance.valuesLoaded(); + EsqlQueryResponse.Profile profile = instance.profile(); + boolean columnar = instance.columnar(); + boolean isAsync = instance.isAsync(); + EsqlExecutionInfo executionInfo = instance.getExecutionInfo(); + switch (allNull ? between(0, 4) : between(0, 5)) { case 0 -> { int mutCol = between(0, instance.columns().size() - 1); - List cols = new ArrayList<>(instance.columns()); + columns = new ArrayList<>(instance.columns()); // keep the type the same so the values are still valid but change the name - cols.set( - mutCol, - new ColumnInfoImpl(cols.get(mutCol).name() + "mut", cols.get(mutCol).type(), cols.get(mutCol).originalTypes()) - ); - yield new EsqlQueryResponse( - cols, - deepCopyOfPages(instance), - instance.profile(), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); + ColumnInfoImpl mut = columns.get(mutCol); + columns.set(mutCol, new ColumnInfoImpl(mut.name() + "mut", mut.type(), mut.originalTypes())); } - case 1 -> new EsqlQueryResponse( - instance.columns(), - deepCopyOfPages(instance), - instance.profile(), - false == instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); - case 2 -> new EsqlQueryResponse( - instance.columns(), - deepCopyOfPages(instance), - randomValueOtherThan(instance.profile(), this::randomProfile), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); - case 3 -> { + case 1 -> documentsFound = randomValueOtherThan(documentsFound, ESTestCase::randomNonNegativeLong); + case 2 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); + case 3 -> columnar = false == columnar; + case 4 -> profile = randomValueOtherThan(profile, this::randomProfile); + case 5 -> { + assert allNull == false + : "can't replace values while preserving types if all pages are null - the only valid values are null"; int noPages = instance.pages().size(); List differentPages = List.of(); do { differentPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks)); differentPages = randomList(noPages, noPages, () -> randomPage(instance.columns())); } while (differentPages.equals(instance.pages())); - yield new EsqlQueryResponse( - instance.columns(), - differentPages, - instance.profile(), - instance.columnar(), - instance.isAsync(), - instance.getExecutionInfo() - ); + pages.forEach(Page::releaseBlocks); + pages = differentPages; } default -> throw new IllegalArgumentException(); - }; + } + return new EsqlQueryResponse(columns, pages, documentsFound, valuesLoaded, profile, columnar, isAsync, executionInfo); } private List deepCopyOfPages(EsqlQueryResponse response) { @@ -368,6 +363,8 @@ public static class ResponseBuilder { ObjectParser.ValueType.BOOLEAN_OR_NULL ); parser.declareInt(constructorArg(), new ParseField("took")); + parser.declareLong(constructorArg(), new ParseField("documents_found")); + parser.declareLong(constructorArg(), new ParseField("values_loaded")); parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfoImpl.fromXContent(p), new ParseField("columns")); parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY); parser.declareObject(optionalConstructorArg(), (p, c) -> parseClusters(p), new ParseField("_clusters")); @@ -382,6 +379,8 @@ public ResponseBuilder( @Nullable String asyncExecutionId, Boolean isRunning, Integer took, + long documentsFound, + long valuesLoaded, List columns, List> values, EsqlExecutionInfo executionInfo @@ -390,6 +389,8 @@ public ResponseBuilder( this.response = new EsqlQueryResponse( columns, List.of(valuesToPage(TestBlockFactory.getNonBreakingInstance(), columns, values)), + documentsFound, + valuesLoaded, null, false, asyncExecutionId, @@ -584,62 +585,154 @@ public void testChunkResponseSizeColumnar() { try (EsqlQueryResponse resp = randomResponse(true, null)) { int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; - assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) { int columnCount = resp.pages().get(0).getBlockCount(); int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2; - assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running + assertChunkCount(resp, r -> 10 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running } } public void testChunkResponseSizeRows() { try (EsqlQueryResponse resp = randomResponse(false, null)) { int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum(); - assertChunkCount(resp, r -> 6 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } try (EsqlQueryResponse resp = randomResponseAsync(false, null, true)) { int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum(); - assertChunkCount(resp, r -> 8 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); + assertChunkCount(resp, r -> 10 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); } } public void testSimpleXContentColumnar() { try (EsqlQueryResponse response = simple(true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""")); } } public void testSimpleXContentColumnarDropNulls() { try (EsqlQueryResponse response = simple(true)) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), equalTo(""" - {"all_columns":[{"name":"foo","type":"integer"}],"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""") + { + "documents_found" : 3, + "values_loaded" : 100, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""") ); } } public void testSimpleXContentColumnarAsync() { try (EsqlQueryResponse response = simple(true, true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "is_running" : false, + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40, + 80 + ] + ] + }""")); } } public void testSimpleXContentRows() { try (EsqlQueryResponse response = simple(false)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } public void testSimpleXContentRowsAsync() { try (EsqlQueryResponse response = simple(false, true)) { - assertThat(Strings.toString(wrapAsToXContent(response)), equalTo(""" - {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "is_running" : false, + "documents_found" : 3, + "values_loaded" : 100, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } @@ -648,6 +741,8 @@ public void testBasicXContentIdAndRunning() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 10, + 99, null, false, "id-123", @@ -656,8 +751,27 @@ public void testBasicXContentIdAndRunning() { null ) ) { - assertThat(Strings.toString(response), equalTo(""" - {"id":"id-123","is_running":true,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "id" : "id-123", + "is_running" : true, + "documents_found" : 10, + "values_loaded" : 99, + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""")); } } @@ -666,6 +780,8 @@ public void testXContentOriginalTypes() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "unsupported", List.of("foo", "bar"))), List.of(new Page(blockFactory.newConstantNullBlock(2))), + 1, + 1, null, false, null, @@ -674,8 +790,29 @@ public void testXContentOriginalTypes() { null ) ) { - assertThat(Strings.toString(response), equalTo(""" - {"columns":[{"name":"foo","type":"unsupported","original_types":["foo","bar"]}],"values":[[null],[null]]}""")); + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 1, + "columns" : [ + { + "name" : "foo", + "type" : "unsupported", + "original_types" : [ + "foo", + "bar" + ] + } + ], + "values" : [ + [ + null + ], + [ + null + ] + ] + }""")); } } @@ -684,6 +821,8 @@ public void testNullColumnsXContentDropNulls() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), blockFactory.newConstantNullBlock(2))), + 1, + 3, null, false, null, @@ -693,11 +832,41 @@ public void testNullColumnsXContentDropNulls() { ) ) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), - equalTo("{" + """ - "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """ - "columns":[{"name":"foo","type":"integer"}],""" + """ - "values":[[40],[80]]}""") + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), + equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 3, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + }, + { + "name" : "all_null", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""") ); } } @@ -714,6 +883,8 @@ public void testNullColumnsFromBuilderXContentDropNulls() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("all_null", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), b.build())), + 1, + 3, null, false, null, @@ -723,11 +894,41 @@ public void testNullColumnsFromBuilderXContentDropNulls() { ) ) { assertThat( - Strings.toString(wrapAsToXContent(response), new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true"))), - equalTo("{" + """ - "all_columns":[{"name":"foo","type":"integer"},{"name":"all_null","type":"integer"}],""" + """ - "columns":[{"name":"foo","type":"integer"}],""" + """ - "values":[[40],[80]]}""") + Strings.toString( + wrapAsToXContent(response), + new ToXContent.MapParams(Map.of(DROP_NULL_COLUMNS_OPTION, "true")), + true, + false + ), + equalTo(""" + { + "documents_found" : 1, + "values_loaded" : 3, + "all_columns" : [ + { + "name" : "foo", + "type" : "integer" + }, + { + "name" : "all_null", + "type" : "integer" + } + ], + "columns" : [ + { + "name" : "foo", + "type" : "integer" + } + ], + "values" : [ + [ + 40 + ], + [ + 80 + ] + ] + }""") ); } } @@ -741,6 +942,8 @@ private EsqlQueryResponse simple(boolean columnar, boolean async) { return new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 3, + 100, null, columnar, async, @@ -753,6 +956,8 @@ public void testProfileXContent() { EsqlQueryResponse response = new EsqlQueryResponse( List.of(new ColumnInfoImpl("foo", "integer", null)), List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())), + 10, + 100, new EsqlQueryResponse.Profile( List.of( new DriverProfile( @@ -762,7 +967,7 @@ public void testProfileXContent() { 20021, 20000, 12, - List.of(new DriverStatus.OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))), + List.of(new OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))), DriverSleeps.empty() ) ) @@ -772,8 +977,10 @@ public void testProfileXContent() { null ); ) { - assertThat(Strings.toString(response, true, false), equalTo(""" + assertThat(Strings.toString(wrapAsToXContent(response), true, false), equalTo(""" { + "documents_found" : 10, + "values_loaded" : 100, "columns" : [ { "name" : "foo", @@ -796,6 +1003,8 @@ public void testProfileXContent() { "stop_millis" : 1723489819929, "took_nanos" : 20021, "cpu_nanos" : 20000, + "documents_found" : 0, + "values_loaded" : 0, "iterations" : 12, "operators" : [ { @@ -833,7 +1042,7 @@ public void testColumns() { var longBlk2 = blockFactory.newLongArrayVector(new long[] { 300L, 400L, 500L }, 3).asBlock(); var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null), new ColumnInfoImpl("bar", "long", null)); var pages = List.of(new Page(intBlk1, longBlk1), new Page(intBlk2, longBlk2)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(10, 20, 30, 40, 50)); assertThat(columnValues(response.column(1)), contains(100L, 200L, 300L, 400L, 500L)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); @@ -845,7 +1054,7 @@ public void testColumnsIllegalArg() { var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10 }, 1).asBlock(); var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(intBlk1)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(1)); } @@ -864,7 +1073,7 @@ public void testColumnsWithNull() { } var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(10, null, 30, null, null, 60, null, 80, 90, null)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(2)); @@ -884,7 +1093,7 @@ public void testColumnsWithMultiValue() { } var columnInfo = List.of(new ColumnInfoImpl("foo", "integer", null)); var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3)); - try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false, null)) { + try (var response = new EsqlQueryResponse(columnInfo, pages, 0, 0, null, false, null, false, false, null)) { assertThat(columnValues(response.column(0)), contains(List.of(10, 20), null, List.of(40, 50), null, 70, 80, null)); expectThrows(IllegalArgumentException.class, () -> response.column(-1)); expectThrows(IllegalArgumentException.class, () -> response.column(2)); @@ -897,7 +1106,7 @@ public void testRowValues() { List columns = randomList(numColumns, numColumns, this::randomColumnInfo); int noPages = randomIntBetween(1, 20); List pages = randomList(noPages, noPages, () -> randomPage(columns)); - try (var resp = new EsqlQueryResponse(columns, pages, null, false, "", false, false, null)) { + try (var resp = new EsqlQueryResponse(columns, pages, 0, 0, null, false, "", false, false, null)) { var rowValues = getValuesList(resp.rows()); var valValues = getValuesList(resp.values()); for (int i = 0; i < rowValues.size(); i++) { @@ -1011,5 +1220,4 @@ static Page valuesToPage(BlockFactory blockFactory, List columns } return new Page(results.stream().map(Block.Builder::build).toArray(Block[]::new)); } - } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java index 1134c82dad9f9..2f11d4ae2fc1a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java @@ -246,7 +246,7 @@ public void testPlainTextEmptyCursorWithColumns() { public void testPlainTextEmptyCursorWithoutColumns() { assertEquals( StringUtils.EMPTY, - getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), null, false, false, null))) + getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), 0, 0, null, false, false, null))) ); } @@ -269,7 +269,16 @@ public void testTsvFormatWithDropNullColumns() { } private static EsqlQueryResponse emptyData() { - return new EsqlQueryResponse(singletonList(new ColumnInfoImpl("name", "keyword", null)), emptyList(), null, false, false, null); + return new EsqlQueryResponse( + singletonList(new ColumnInfoImpl("name", "keyword", null)), + emptyList(), + 0, + 0, + null, + false, + false, + null + ); } private static EsqlQueryResponse regularData() { @@ -303,7 +312,7 @@ private static EsqlQueryResponse regularData() { ) ); - return new EsqlQueryResponse(headers, values, null, false, false, null); + return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null); } private static EsqlQueryResponse escapedData() { @@ -327,7 +336,7 @@ private static EsqlQueryResponse escapedData() { ) ); - return new EsqlQueryResponse(headers, values, null, false, false, null); + return new EsqlQueryResponse(headers, values, 0, 0, null, false, false, null); } private static RestRequest req() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java index ec9bb14d2a265..91456c4f44893 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java @@ -79,6 +79,8 @@ public class TextFormatterTests extends ESTestCase { blockFactory.newConstantNullBlock(2) ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), @@ -181,6 +183,8 @@ public void testFormatWithoutHeader() { blockFactory.newConstantNullBlock(2) ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), @@ -222,6 +226,8 @@ public void testVeryLongPadding() { .build() ) ), + 0, + 0, null, randomBoolean(), randomBoolean(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index f4deaa45f1f87..b5e8547a981ab 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; import org.elasticsearch.core.TimeValue; @@ -36,6 +37,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -58,7 +60,7 @@ public void shutdownTransportService() { terminate(threadPool); } - private List randomProfiles() { + private DriverCompletionInfo randomCompletionInfo() { int numProfiles = randomIntBetween(0, 2); List profiles = new ArrayList<>(numProfiles); for (int i = 0; i < numProfiles; i++) { @@ -75,20 +77,22 @@ private List randomProfiles() { ) ); } - return profiles; + return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles); } public void testEmpty() { - PlainActionFuture> results = new PlainActionFuture<>(); + PlainActionFuture results = new PlainActionFuture<>(); try (var ignored = new ComputeListener(threadPool, () -> {}, results)) { assertFalse(results.isDone()); } assertTrue(results.isDone()); - assertThat(results.actionGet(10, TimeUnit.SECONDS), empty()); + assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty()); } public void testCollectComputeResults() { - PlainActionFuture> future = new PlainActionFuture<>(); + PlainActionFuture future = new PlainActionFuture<>(); + long documentsFound = 0; + long valuesLoaded = 0; List allProfiles = new ArrayList<>(); AtomicInteger onFailure = new AtomicInteger(); try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, future)) { @@ -102,20 +106,24 @@ public void testCollectComputeResults() { threadPool.generic() ); } else { - var profiles = randomProfiles(); - allProfiles.addAll(profiles); - ActionListener> subListener = computeListener.acquireCompute(); + var info = randomCompletionInfo(); + documentsFound += info.documentsFound(); + valuesLoaded += info.valuesLoaded(); + allProfiles.addAll(info.collectedProfiles()); + ActionListener subListener = computeListener.acquireCompute(); threadPool.schedule( - ActionRunnable.wrap(subListener, l -> l.onResponse(profiles)), + ActionRunnable.wrap(subListener, l -> l.onResponse(info)), TimeValue.timeValueNanos(between(0, 100)), threadPool.generic() ); } } } - List profiles = future.actionGet(10, TimeUnit.SECONDS); + DriverCompletionInfo actual = future.actionGet(10, TimeUnit.SECONDS); + assertThat(actual.documentsFound(), equalTo(documentsFound)); + assertThat(actual.valuesLoaded(), equalTo(valuesLoaded)); assertThat( - profiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); assertThat(onFailure.get(), equalTo(0)); @@ -129,13 +137,13 @@ public void testCancelOnFailure() throws Exception { ); int successTasks = between(1, 50); int failedTasks = between(1, 100); - PlainActionFuture> rootListener = new PlainActionFuture<>(); + PlainActionFuture rootListener = new PlainActionFuture<>(); final AtomicInteger onFailure = new AtomicInteger(); try (var computeListener = new ComputeListener(threadPool, onFailure::incrementAndGet, rootListener)) { for (int i = 0; i < successTasks; i++) { - ActionListener> subListener = computeListener.acquireCompute(); + ActionListener subListener = computeListener.acquireCompute(); threadPool.schedule( - ActionRunnable.wrap(subListener, l -> l.onResponse(randomProfiles())), + ActionRunnable.wrap(subListener, l -> l.onResponse(randomCompletionInfo())), TimeValue.timeValueNanos(between(0, 100)), threadPool.generic() ); @@ -160,13 +168,17 @@ public void testCancelOnFailure() throws Exception { } public void testCollectWarnings() throws Exception { + AtomicLong documentsFound = new AtomicLong(); + AtomicLong valuesLoaded = new AtomicLong(); List allProfiles = new ArrayList<>(); Map> allWarnings = new HashMap<>(); - ActionListener> rootListener = new ActionListener<>() { + ActionListener rootListener = new ActionListener<>() { @Override - public void onResponse(List result) { + public void onResponse(DriverCompletionInfo result) { + assertThat(result.documentsFound(), equalTo(documentsFound.get())); + assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get())); assertThat( - result.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); Map> responseHeaders = threadPool.getThreadContext() @@ -201,8 +213,10 @@ public void onFailure(Exception e) { threadPool.generic() ); } else { - var resp = randomProfiles(); - allProfiles.addAll(resp); + var resp = randomCompletionInfo(); + documentsFound.addAndGet(resp.documentsFound()); + valuesLoaded.addAndGet(resp.valuesLoaded()); + allProfiles.addAll(resp.collectedProfiles()); int numWarnings = randomIntBetween(1, 5); Map warnings = new HashMap<>(); for (int i = 0; i < numWarnings; i++) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index bf68d91874f91..8e88f9cc4ebbc 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.test.ComputeTestCase; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -123,7 +124,7 @@ public void testOnePass() { Queue sent = ConcurrentCollections.newQueue(); var future = sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); }); safeGet(future); assertThat(sent.size(), equalTo(2)); @@ -142,7 +143,7 @@ public void testMissingShards() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); var future = sendRequests(true, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { assertThat(shard3, not(in(shardIds))); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); }); ComputeResponse resp = safeGet(future); assertThat(resp.totalShards, equalTo(3)); @@ -173,7 +174,7 @@ public void testRetryThenSuccess() { if (node.equals(node4) && shardIds.contains(shard2)) { failures.put(shard2, new IOException("test")); } - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); try { future.actionGet(1, TimeUnit.MINUTES); @@ -202,7 +203,7 @@ public void testRetryButFail() { if (shardIds.contains(shard5)) { failures.put(shard5, new IOException("test failure for shard5")); } - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), failures))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, failures))); }); var error = expectThrows(Exception.class, future::actionGet); assertNotNull(ExceptionsHelper.unwrap(error, IOException.class)); @@ -227,7 +228,7 @@ public void testDoNotRetryOnRequestLevelFailure() { if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } }); Exception exception = expectThrows(Exception.class, future::actionGet); @@ -247,7 +248,7 @@ public void testAllowPartialResults() { if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } }); ComputeResponse resp = safeGet(future); @@ -268,7 +269,7 @@ public void testNonFatalErrorIsRetriedOnAnotherShard() { if (Objects.equals(node1, node)) { runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false)); } else { - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); } })); assertThat(response.totalShards, equalTo(1)); @@ -326,7 +327,7 @@ public void testLimitConcurrentNodes() { sent.add(new NodeRequest(node, shardIds, aliasFilters)); runWithDelay(() -> { concurrentRequests.decrementAndGet(); - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); }); })); assertThat(sent.size(), equalTo(5)); @@ -349,7 +350,7 @@ public void testSkipNodes() { var response = safeGet(sendRequests(randomBoolean(), 1, targetShards, (node, shardIds, aliasFilters, listener) -> { runWithDelay(() -> { if (processed.incrementAndGet() == 1) { - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } else { listener.onSkip(); } @@ -371,7 +372,7 @@ public void testSkipRemovesPriorNonFatalErrors() { if (Objects.equals(node.getId(), node1.getId()) && shardIds.equals(List.of(shard1))) { listener.onFailure(new RuntimeException("test request level non fatal failure"), false); } else if (Objects.equals(node.getId(), node3.getId()) && shardIds.equals(List.of(shard2))) { - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } else if (Objects.equals(node.getId(), node2.getId()) && shardIds.equals(List.of(shard1))) { listener.onSkip(); } @@ -396,7 +397,7 @@ public void testQueryHotShardsFirst() { var sent = Collections.synchronizedList(new ArrayList()); safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(node.getId()); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); })); assertThat(sent, equalTo(List.of("node-1", "node-2", "node-3", "node-4"))); } @@ -409,7 +410,7 @@ public void testQueryHotShardsFirstWhenIlmMovesShard() { var sent = ConcurrentCollections.newQueue(); safeGet(sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); - runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()))); })); assertThat(groupRequests(sent, 1), equalTo(Map.of(node1, List.of(shard1)))); assertThat(groupRequests(sent, 1), anyOf(equalTo(Map.of(node2, List.of(shard2))), equalTo(Map.of(warmNode2, List.of(shard2))))); @@ -426,8 +427,8 @@ public void testRetryMovedShard() { (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(node, node4) - ? new DataNodeComputeResponse(List.of(), Map.of()) - : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) ) ) ) @@ -451,10 +452,10 @@ public void testRetryMultipleMovedShards() { () -> listener.onResponse( attempt.incrementAndGet() <= 6 ? new DataNodeComputeResponse( - List.of(), + DriverCompletionInfo.EMPTY, shardIds.stream().collect(toMap(Function.identity(), ShardNotFoundException::new)) ) - : new DataNodeComputeResponse(List.of(), Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) ) ) ) @@ -472,7 +473,9 @@ public void testDoesNotRetryMovedShardIndefinitely() { return Map.of(shard1, List.of(node2)); }, (node, shardIds, aliasFilters, listener) -> runWithDelay( - () -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))) + () -> listener.onResponse( + new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ) ) )); assertThat(response.totalShards, equalTo(1)); @@ -493,12 +496,16 @@ public void testRetryOnlyMovedShards() { }, (node, shardIds, aliasFilters, listener) -> runWithDelay(() -> { if (Objects.equals(node, node1)) { // search is going to be retried from replica on node3 without shard resolution - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))); + listener.onResponse( + new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) + ); } else if (Objects.equals(node, node2)) { // search is going to be retried after resolving new shard node since there are no replicas - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard2, new ShardNotFoundException(shard2)))); + listener.onResponse( + new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard2, new ShardNotFoundException(shard2))) + ); } else { - listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())); } })) ); @@ -519,8 +526,8 @@ public void testRetryUnassignedShardWithoutPartialResults() { (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(shardIds, List.of(shard2)) - ? new DataNodeComputeResponse(List.of(), Map.of()) - : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) ) ) @@ -538,8 +545,8 @@ public void testRetryUnassignedShardWithPartialResults() { (node, shardIds, aliasFilters, listener) -> runWithDelay( () -> listener.onResponse( Objects.equals(shardIds, List.of(shard2)) - ? new DataNodeComputeResponse(List.of(), Map.of()) - : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()) + : new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1))) ) ) )); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java index add3bf77efb00..b23517dd14088 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querylog/EsqlQueryLogTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.SlowLogFieldProvider; @@ -105,7 +106,7 @@ public void testPrioritiesOnSuccess() { for (int i = 0; i < actualTook.length; i++) { EsqlExecutionInfo warnQuery = getEsqlExecutionInfo(actualTook[i], actualPlanningTook[i]); - queryLog.onQueryPhase(new Result(List.of(), List.of(), List.of(), warnQuery), query); + queryLog.onQueryPhase(new Result(List.of(), List.of(), DriverCompletionInfo.EMPTY, warnQuery), query); if (expectedLevel[i] != null) { assertThat(appender.lastEvent(), is(not(nullValue()))); var msg = (ESLogMessage) appender.lastMessage(); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml index 56befed48ce31..f2899dbf75e5e 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/10_basic.yml @@ -169,7 +169,7 @@ setup: - match: {values.0.1: 40} --- -"Basic ESQL query": +basic: - do: esql.query: body: @@ -181,12 +181,66 @@ setup: - match: {values.0: [1, 1]} --- -"Test From Eval Sort Limit": +basic with documents_found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + + - do: + esql.query: + body: + query: 'from test | keep data | sort data | limit 2' + columnar: true + + - match: {documents_found: 10} # two documents per shard + - match: {values_loaded: 10} # one per document + - match: {columns.0.name: "data"} + - match: {columns.0.type: "long"} + - match: {values.0: [1, 1]} + +--- +FROM EVAL SORT LIMIT: + - do: + esql.query: + body: + query: 'from test | eval x = count + 7 | sort x | limit 1' + + - match: {columns.0.name: "color"} + - match: {columns.1.name: "count"} + - match: {columns.2.name: "count_d"} + - match: {columns.3.name: "data"} + - match: {columns.4.name: "data_d"} + - match: {columns.5.name: "time"} + - match: {columns.6.name: "x"} + - match: {values.0.6: 47} + - length: {values: 1} + +--- +FROM EVAL SORT LIMIT with documents_found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + - do: esql.query: body: query: 'from test | eval x = count + 7 | sort x | limit 1' + - match: {documents_found: 40} + # We can't be sure quite how many values we'll load. It's at least + # one per document in the index. And one per top document. But we + # might load more values because we run in more threads. + - gte: {values_loaded: 45} - match: {columns.0.name: "color"} - match: {columns.1.name: "count"} - match: {columns.2.name: "count_d"} diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml index 17034de677b8d..a7b0c48b93ebe 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/120_profile.yml @@ -1,9 +1,5 @@ --- setup: - - requires: - cluster_features: ["gte_v8.12.0"] - reason: "profile option added in 8.12" - test_runner_features: warnings - do: indices.create: index: test @@ -140,3 +136,29 @@ avg 8.14 or after: - gte: {profile.drivers.1.took_nanos: 0} - gte: {profile.drivers.1.cpu_nanos: 0} # It's hard to assert much about these because they don't come back in any particular order. + +--- +documents found: + - requires: + test_runner_features: [capabilities, contains] + capabilities: + - method: POST + path: /_query + parameters: [] + capabilities: [documents_found_and_values_loaded] + reason: "checks for documents_found and values_loaded" + + - do: + esql.query: + body: + query: 'FROM test | LIMIT 1' + profile: true + + - length: {profile.drivers: 3} + - match: {profile.drivers.0.operators.0.operator: /ExchangeSourceOperator|LuceneSourceOperator.+/} + - gte: {profile.drivers.0.documents_found: 0} + - gte: {profile.drivers.0.values_loaded: 0} + - gte: {profile.drivers.1.documents_found: 0} + - gte: {profile.drivers.1.values_loaded: 0} + - gte: {profile.drivers.2.documents_found: 0} + - gte: {profile.drivers.2.values_loaded: 0}