diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java index f70667b91aec8..c966a7354482c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java @@ -11,10 +11,10 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; @@ -108,9 +108,7 @@ public IndexResult index(Index index) throws IOException { config().getMapperService(), DocumentSizeAccumulator.EMPTY_INSTANCE ); - ParsedDocument parsedDocument = index.parsedDoc(); - documentParsingReporter.onIndexingCompleted(parsedDocument); - + documentParsingReporter.onIndexingCompleted(index); return result; } }); @@ -125,7 +123,7 @@ public TestDocumentParsingProviderPlugin() {} public DocumentParsingProvider getDocumentParsingProvider() { return new DocumentParsingProvider() { @Override - public XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { + public XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { return new TestXContentMeteringParserDecorator(0L); } @@ -150,8 +148,8 @@ public TestDocumentSizeReporter(String indexName) { } @Override - public void onIndexingCompleted(ParsedDocument parsedDocument) { - long delta = parsedDocument.getNormalizedSize(); + public void onIndexingCompleted(Engine.Index index) { + long delta = index.parsedDoc().getNormalizedSize(); if (delta > XContentMeteringParserDecorator.UNKNOWN_SIZE) { COUNTER.addAndGet(delta); } diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 64182b000827d..922144c103d32 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -58,6 +58,7 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -175,6 +176,8 @@ public interface DirectoryWrapper { private final Map recoveryStateFactories; private final SetOnce indexCommitListener = new SetOnce<>(); private final MapperMetrics mapperMetrics; + private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier; + /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -195,8 +198,9 @@ public IndexModule( final Map recoveryStateFactories, final SlowLogFieldProvider slowLogFieldProvider, final MapperMetrics mapperMetrics, - final List searchOperationListeners - ) { + final List searchOperationListeners, + final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier + ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; this.engineFactory = Objects.requireNonNull(engineFactory); @@ -209,6 +213,7 @@ public IndexModule( this.expressionResolver = expressionResolver; this.recoveryStateFactories = recoveryStateFactories; this.mapperMetrics = mapperMetrics; + this.parserDecoratorSupplier = parserDecoratorSupplier; } /** @@ -528,6 +533,7 @@ public IndexService newIndexService( eventListener, readerWrapperFactory, mapperRegistry, + parserDecoratorSupplier, indicesFieldDataCache, searchOperationListeners, indexOperationListeners, diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 571bbd76a49dd..b7ef7b0e7d80d 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -85,6 +85,7 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.threadpool.ThreadPool; @@ -130,6 +131,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final Engine.IndexCommitListener indexCommitListener; private final IndexCache indexCache; private final MapperService mapperService; + private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier; private final XContentParserConfiguration parserConfiguration; private final NamedWriteableRegistry namedWriteableRegistry; private final SimilarityService similarityService; @@ -184,6 +186,7 @@ public IndexService( IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, + XContentMeteringParserDecoratorSupplier parserDecoratorSupplier, IndicesFieldDataCache indicesFieldDataCache, List searchOperationListeners, List indexingOperationListeners, @@ -211,6 +214,7 @@ public IndexService( this.valuesSourceRegistry = valuesSourceRegistry; this.snapshotCommitSupplier = snapshotCommitSupplier; this.indexAnalyzers = indexAnalyzers; + this.parserDecoratorSupplier = parserDecoratorSupplier; if (needsMapperService(indexSettings, indexCreationContext)) { assert indexAnalyzers != null; this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this)); @@ -561,7 +565,8 @@ public synchronized IndexShard createShard( snapshotCommitSupplier, System::nanoTime, indexCommitListener, - mapperMetrics + mapperMetrics, + parserDecoratorSupplier ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f84ac22cd78e4..0166ade52abb9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -146,6 +146,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; @@ -218,6 +219,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // Package visible for testing final CircuitBreakerService circuitBreakerService; + private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier; + private final SearchOperationListener searchOperationListener; private final ShardBulkStats bulkOperationListener; @@ -326,7 +329,8 @@ public IndexShard( final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier, final LongSupplier relativeTimeInNanosSupplier, final Engine.IndexCommitListener indexCommitListener, - final MapperMetrics mapperMetrics + final MapperMetrics mapperMetrics, + final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -365,6 +369,8 @@ public IndexShard( state = IndexShardState.CREATED; this.path = path; this.circuitBreakerService = circuitBreakerService; + this.parserDecoratorSupplier = parserDecoratorSupplier; + /* create engine config */ logger.debug("state: [CREATED]"); @@ -2015,7 +2021,14 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o index.getAutoGeneratedIdTimestamp(), true, origin, - new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing()) + new SourceToParse( + index.id(), + index.source(), + XContentHelper.xContentType(index.source()), + index.routing(), + Collections.emptyMap(), + parserDecoratorSupplier.newMeteringParserDecorator(index) + ) ); } case DELETE -> { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 27d832241bfed..f0a26070389e5 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -134,6 +134,8 @@ import org.elasticsearch.plugins.FieldPredicate; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; @@ -174,6 +176,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -233,6 +236,7 @@ public class IndicesService extends AbstractLifecycleComponent private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); private final MapperRegistry mapperRegistry; + private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier; private final NamedWriteableRegistry namedWriteableRegistry; private final Map snapshotCommitSuppliers; private final IndexingMemoryController indexingMemoryController; @@ -285,6 +289,7 @@ protected void doStart() { this.indicesRequestCache = new IndicesRequestCache(settings); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = builder.mapperRegistry; + this.parserDecoratorSupplier = builder.parserDecoratorSupplier; this.namedWriteableRegistry = builder.namedWriteableRegistry; indexingMemoryController = new IndexingMemoryController( settings, @@ -751,7 +756,8 @@ private synchronized IndexService createIndexService( recoveryStateFactories, loadSlowLogFieldProvider(), mapperMetrics, - searchOperationListeners + searchOperationListeners, + parserDecoratorSupplier ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -830,7 +836,8 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet recoveryStateFactories, loadSlowLogFieldProvider(), mapperMetrics, - searchOperationListeners + searchOperationListeners, + XContentMeteringParserDecoratorSupplier.NOOP ); pluginsService.forEach(p -> p.onIndexModule(indexModule)); return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java index 08d1b5ce3a96c..e5cb212f4235c 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java @@ -32,6 +32,8 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -45,6 +47,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; public class IndicesServiceBuilder { @@ -76,6 +79,7 @@ public class IndicesServiceBuilder { CheckedBiConsumer requestCacheKeyDifferentiator; MapperMetrics mapperMetrics; List searchOperationListener = List.of(); + XContentMeteringParserDecoratorSupplier parserDecoratorSupplier; public IndicesServiceBuilder settings(Settings settings) { this.settings = settings; @@ -188,6 +192,11 @@ public IndicesServiceBuilder searchOperationListeners(List XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { - return XContentMeteringParserDecorator.NOOP; - } } diff --git a/server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java b/server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java index af998c7c27947..9c836fc9428ad 100644 --- a/server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java +++ b/server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java @@ -9,10 +9,12 @@ package org.elasticsearch.plugins.internal; -import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.engine.Engine; /** * An interface to allow performing an action when parsing and indexing has been completed + * + * TODO: Should this be dropped in favor of {@link org.elasticsearch.index.shard.IndexingOperationListener}? */ public interface DocumentSizeReporter { /** @@ -22,12 +24,14 @@ public interface DocumentSizeReporter { }; /** - * An action to be performed upon finished indexing. + * An action to be performed upon finished parsing. + * Note: Corresponds to {@link org.elasticsearch.index.shard.IndexingOperationListener#preIndex} */ - default void onParsingCompleted(ParsedDocument parsedDocument) {} + default void onParsingCompleted(Engine.Index index) {} /** * An action to be performed upon finished indexing. + * Note: Corresponds to {@link org.elasticsearch.index.shard.IndexingOperationListener#postIndex} */ - default void onIndexingCompleted(ParsedDocument parsedDocument) {} + default void onIndexingCompleted(Engine.Index index) {} } diff --git a/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorSupplier.java b/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorSupplier.java new file mode 100644 index 0000000000000..ae32fd6e6f816 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorSupplier.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.plugins.internal; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.index.translog.Translog; + +public interface XContentMeteringParserDecoratorSupplier { + XContentMeteringParserDecoratorSupplier NOOP = new XContentMeteringParserDecoratorSupplier() { + }; + + /** + * @return a parser decorator + */ + default XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { + return XContentMeteringParserDecorator.NOOP; + } + + /** + * @return a parser decorator + */ + default XContentMeteringParserDecorator newMeteringParserDecorator(Translog.Index index) { + return XContentMeteringParserDecorator.NOOP; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index b389e33993b9b..17698ee44f09a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -123,7 +123,7 @@ public void testExecuteBulkIndexRequest() throws Exception { DocumentParsingProvider documentParsingProvider = mock(); XContentMeteringParserDecorator parserDecorator = mock(); - when(documentParsingProvider.newMeteringParserDecorator(any())).thenReturn(parserDecorator); + when(documentParsingProvider.newMeteringParserDecorator(any(IndexRequest.class))).thenReturn(parserDecorator); when(parserDecorator.decorate(any())).then(i -> i.getArgument(0)); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -190,7 +190,7 @@ public void testExecuteBulkIndexRequest() throws Exception { assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); assertThat(replicaRequest, equalTo(primaryRequest)); - verify(documentParsingProvider).newMeteringParserDecorator(any()); + verify(documentParsingProvider).newMeteringParserDecorator(any(IndexRequest.class)); verify(parserDecorator).decorate(any()); // Assert that the document count is still 1 @@ -660,7 +660,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { assertThat(failure.getCause(), equalTo(err)); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); - verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any()); + verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any(IndexRequest.class)); } @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 49a4d519c0ea4..8b77abd891878 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.test.ClusterServiceUtils; @@ -239,7 +240,8 @@ public void testWrapperIsBound() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); module.setReaderWrapper(s -> new Wrapper()); @@ -267,7 +269,8 @@ public void testRegisterIndexStore() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); final IndexService indexService = newIndexService(module); @@ -293,7 +296,8 @@ public void testDirectoryWrapper() throws IOException { Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); module.setDirectoryWrapper(new TestDirectoryWrapper()); @@ -647,7 +651,8 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException { recoveryStateFactories, mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); final IndexService indexService = newIndexService(module); @@ -670,7 +675,8 @@ public void testIndexCommitListenerIsBound() throws IOException, ExecutionExcept Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong(); @@ -773,7 +779,8 @@ private static IndexModule createIndexModule( Collections.emptyMap(), mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, - emptyList() + emptyList(), + XContentMeteringParserDecoratorSupplier.NOOP ); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5b9a9e7a4efee..05c6db791838a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -78,6 +78,7 @@ import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService; import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGeneration; @@ -547,7 +548,8 @@ protected IndexShard newShard( IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, relativeTimeSupplier, null, - MapperMetrics.NOOP + MapperMetrics.NOOP, + XContentMeteringParserDecoratorSupplier.NOOP ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;