-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Refine indexing pressure accounting in semantic bulk inference filter #129320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Previously, we conservatively estimated that inference results would double the document _source size. This can led to unnecessary circuit breaker exceptions, even when the node had sufficient memory to handle the operation. This PR replaces the rough estimation with the actual size of the _source after the update. Since inference calls use a batch size of 1 MB, we rely on the real circuit breaker to ensure that results fit in memory before applying indexing pressure accounting. Additionally, this PR introduces a new counter in InferenceStats to track the number of rejections caused by indexing pressure from inference results.
Pinging @elastic/search-eng (Team:SearchOrg) |
Pinging @elastic/search-relevance (Team:Search - Relevance) |
Hi @jimczi, I've created a changelog YAML for you. |
…' into indexing_pressure_bulk_inference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before merging this PR, I would like to examine the assumption that doubling document source size for indexing pressure purposes is incorrect. This was done during development of #125517 because the original source is pooled and the memory for it is released only after bulk request handling is complete. We accounted for this by adding indexing memory pressure for the additional copy of source generated to insert embeddings into. What new information do we have now that allows us to change this approach?
...ain/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there was an update to this test suite in #129140 that disabled these tests on the new semantic text format. We should probably fix that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oups, good catch thanks. I added the test back in c85718d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should apply just this change to 9.1 and 8.19 so that we restore test coverage in those branches
...ava/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java
Outdated
Show resolved
Hide resolved
...ava/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach looks good overall, very clever! How often do you figure the source will be array-backed? In other words, is this an optimization you expect that we can use most of the time in production?
newSource.array(), | ||
newSource.arrayOffset(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assume that newSource
will always be array-backed?
private void appendSourceAndInferenceMetadata( | ||
XContentBuilder builder, | ||
BytesReference source, | ||
XContentType xContentType, | ||
Map<String, Object> inferenceFieldsMap | ||
) throws IOException { | ||
builder.startObject(); | ||
|
||
// append the original source | ||
try ( | ||
XContentParser parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, source, xContentType) | ||
) { | ||
// skip start object | ||
parser.nextToken(); | ||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) { | ||
builder.copyCurrentStructure(parser); | ||
} | ||
} | ||
long modifiedSourceSize = indexRequest.source().ramBytesUsed(); | ||
|
||
// Add the indexing pressure from the source modifications. | ||
// add the inference metadata field | ||
builder.field(InferenceMetadataFieldsMapper.NAME); | ||
try (XContentParser parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, inferenceFieldsMap)) { | ||
builder.copyCurrentStructure(parser); | ||
} | ||
|
||
builder.endObject(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the operation performed in this method temporarily generate a second copy of the source with _inference_fields
added? I think it would, but asking to confirm my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should apply just this change to 9.1 and 8.19 so that we restore test coverage in those branches
// Apply the updated source to the index request. | ||
if (originalSource.hasArray()) { | ||
// If the original source is backed by an array, perform in-place update: | ||
// - Copy as much of the new source as fits into the original array. | ||
System.arraycopy( | ||
newSource.array(), | ||
newSource.arrayOffset(), | ||
originalSource.array(), | ||
originalSource.arrayOffset(), | ||
originalSource.length() | ||
); | ||
|
||
int remainingSize = newSource.length() - originalSource.length(); | ||
if (remainingSize > 0) { | ||
// If there are additional bytes, append them as a new BytesArray segment. | ||
byte[] remainingBytes = new byte[remainingSize]; | ||
System.arraycopy( | ||
newSource.array(), | ||
newSource.arrayOffset() + originalSource.length(), | ||
remainingBytes, | ||
0, | ||
remainingSize | ||
); | ||
indexRequest.source( | ||
CompositeBytesReference.of(originalSource, new BytesArray(remainingBytes)), | ||
indexRequest.getContentType() | ||
); | ||
} else { | ||
// No additional bytes; just adjust the slice length. | ||
indexRequest.source(originalSource.slice(0, newSource.length())); | ||
} | ||
} else { | ||
try (XContentBuilder builder = XContentBuilder.builder(indexRequest.getContentType().xContent())) { | ||
appendSourceAndInferenceMetadata(builder, indexRequest.source(), indexRequest.getContentType(), inferenceFieldsMap); | ||
indexRequest.source(builder); | ||
// If the original source is not array-backed, replace it entirely. | ||
indexRequest.source(newSource, indexRequest.getContentType()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to do this in this PR, but it would be good to put this logic in a common place (IndexRequest
? BytesReference
?) so that we can leverage it in other places as well. I was thinking of adding methods like canUpdateInPlace
and updateInPlace
.
In #125517, we estimated that inference results would double the document _source size since they are pooled by the bulk action.
This PR reduces the memory needed to perform the update by reusing the original source array when possible. This way we can only account for the extra inference fields and reduce the overall indexing pressure.
Additionally, this PR introduces a new counter in InferenceStats to track the number of rejections caused by indexing pressure from inference results.