Skip to content

[WIP] Meter docs when replaying translog to capture RA-S #118341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -108,9 +108,7 @@ public IndexResult index(Index index) throws IOException {
config().getMapperService(),
DocumentSizeAccumulator.EMPTY_INSTANCE
);
ParsedDocument parsedDocument = index.parsedDoc();
documentParsingReporter.onIndexingCompleted(parsedDocument);

documentParsingReporter.onIndexingCompleted(index);
return result;
}
});
Expand All @@ -125,7 +123,7 @@ public TestDocumentParsingProviderPlugin() {}
public DocumentParsingProvider getDocumentParsingProvider() {
return new DocumentParsingProvider() {
@Override
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
public XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
return new TestXContentMeteringParserDecorator(0L);
}

Expand All @@ -150,8 +148,8 @@ public TestDocumentSizeReporter(String indexName) {
}

@Override
public void onIndexingCompleted(ParsedDocument parsedDocument) {
long delta = parsedDocument.getNormalizedSize();
public void onIndexingCompleted(Engine.Index index) {
long delta = index.parsedDoc().getNormalizedSize();
if (delta > XContentMeteringParserDecorator.UNKNOWN_SIZE) {
COUNTER.addAndGet(delta);
}
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -175,6 +176,8 @@ public interface DirectoryWrapper {
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
private final MapperMetrics mapperMetrics;
private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier;


/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -195,8 +198,9 @@ public IndexModule(
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final SlowLogFieldProvider slowLogFieldProvider,
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners
) {
final List<SearchOperationListener> searchOperationListeners,
final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
Expand All @@ -209,6 +213,7 @@ public IndexModule(
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.mapperMetrics = mapperMetrics;
this.parserDecoratorSupplier = parserDecoratorSupplier;
}

/**
Expand Down Expand Up @@ -528,6 +533,7 @@ public IndexService newIndexService(
eventListener,
readerWrapperFactory,
mapperRegistry,
parserDecoratorSupplier,
indicesFieldDataCache,
searchOperationListeners,
indexOperationListeners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Engine.IndexCommitListener indexCommitListener;
private final IndexCache indexCache;
private final MapperService mapperService;
private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier;
private final XContentParserConfiguration parserConfiguration;
private final NamedWriteableRegistry namedWriteableRegistry;
private final SimilarityService similarityService;
Expand Down Expand Up @@ -184,6 +186,7 @@ public IndexService(
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
XContentMeteringParserDecoratorSupplier parserDecoratorSupplier,
IndicesFieldDataCache indicesFieldDataCache,
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners,
Expand Down Expand Up @@ -211,6 +214,7 @@ public IndexService(
this.valuesSourceRegistry = valuesSourceRegistry;
this.snapshotCommitSupplier = snapshotCommitSupplier;
this.indexAnalyzers = indexAnalyzers;
this.parserDecoratorSupplier = parserDecoratorSupplier;
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
Expand Down Expand Up @@ -561,7 +565,8 @@ public synchronized IndexShard createShard(
snapshotCommitSupplier,
System::nanoTime,
indexCommitListener,
mapperMetrics
mapperMetrics,
parserDecoratorSupplier
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -218,6 +219,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// Package visible for testing
final CircuitBreakerService circuitBreakerService;

private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier;

private final SearchOperationListener searchOperationListener;

private final ShardBulkStats bulkOperationListener;
Expand Down Expand Up @@ -326,7 +329,8 @@ public IndexShard(
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
final LongSupplier relativeTimeInNanosSupplier,
final Engine.IndexCommitListener indexCommitListener,
final MapperMetrics mapperMetrics
final MapperMetrics mapperMetrics,
final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -365,6 +369,8 @@ public IndexShard(
state = IndexShardState.CREATED;
this.path = path;
this.circuitBreakerService = circuitBreakerService;
this.parserDecoratorSupplier = parserDecoratorSupplier;

/* create engine config */
logger.debug("state: [CREATED]");

Expand Down Expand Up @@ -2015,7 +2021,14 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
index.getAutoGeneratedIdTimestamp(),
true,
origin,
new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing())
new SourceToParse(
index.id(),
index.source(),
XContentHelper.xContentType(index.source()),
index.routing(),
Collections.emptyMap(),
parserDecoratorSupplier.newMeteringParserDecorator(index)
)
);
}
case DELETE -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
import org.elasticsearch.plugins.FieldPredicate;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -174,6 +176,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -233,6 +236,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
private final OldShardsStats oldShardsStats = new OldShardsStats();
private final MapperRegistry mapperRegistry;
private final XContentMeteringParserDecoratorSupplier parserDecoratorSupplier;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers;
private final IndexingMemoryController indexingMemoryController;
Expand Down Expand Up @@ -285,6 +289,7 @@ protected void doStart() {
this.indicesRequestCache = new IndicesRequestCache(settings);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = builder.mapperRegistry;
this.parserDecoratorSupplier = builder.parserDecoratorSupplier;
this.namedWriteableRegistry = builder.namedWriteableRegistry;
indexingMemoryController = new IndexingMemoryController(
settings,
Expand Down Expand Up @@ -751,7 +756,8 @@ private synchronized IndexService createIndexService(
recoveryStateFactories,
loadSlowLogFieldProvider(),
mapperMetrics,
searchOperationListeners
searchOperationListeners,
parserDecoratorSupplier
);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
Expand Down Expand Up @@ -830,7 +836,8 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet
recoveryStateFactories,
loadSlowLogFieldProvider(),
mapperMetrics,
searchOperationListeners
searchOperationListeners,
XContentMeteringParserDecoratorSupplier.NOOP
);
pluginsService.forEach(p -> p.onIndexModule(indexModule));
return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecoratorSupplier;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.ShardSearchRequest;
Expand All @@ -45,6 +47,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class IndicesServiceBuilder {
Expand Down Expand Up @@ -76,6 +79,7 @@ public class IndicesServiceBuilder {
CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
MapperMetrics mapperMetrics;
List<SearchOperationListener> searchOperationListener = List.of();
XContentMeteringParserDecoratorSupplier parserDecoratorSupplier;

public IndicesServiceBuilder settings(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -188,6 +192,11 @@ public IndicesServiceBuilder searchOperationListeners(List<SearchOperationListen
return this;
}

public IndicesServiceBuilder parserDecoratorSupplier(XContentMeteringParserDecoratorSupplier parserDecoratorSupplier) {
this.parserDecoratorSupplier = parserDecoratorSupplier;
return this;
}

public IndicesService build() {
Objects.requireNonNull(settings);
Objects.requireNonNull(pluginsService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ private void construct(
.requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
.mapperMetrics(mapperMetrics)
.searchOperationListeners(searchOperationListeners)
.parserDecoratorSupplier(documentParsingProvider)
.build();

final var parameters = new IndexSettingProvider.Parameters(clusterService, indicesService::createIndexMapperServiceForValidation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.translog.Translog;

/**
* An interface to provide instances of document parsing observer and reporter
*/
public interface DocumentParsingProvider {
public interface DocumentParsingProvider extends XContentMeteringParserDecoratorSupplier {
DocumentParsingProvider EMPTY_INSTANCE = new DocumentParsingProvider() {
};

Expand All @@ -36,11 +37,4 @@ default DocumentSizeReporter newDocumentSizeReporter(
default DocumentSizeAccumulator createDocumentSizeAccumulator() {
return DocumentSizeAccumulator.EMPTY_INSTANCE;
}

/**
* @return an observer
*/
default <T> XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
return XContentMeteringParserDecorator.NOOP;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

package org.elasticsearch.plugins.internal;

import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.engine.Engine;

/**
* An interface to allow performing an action when parsing and indexing has been completed
*
* TODO: Should this be dropped in favor of {@link org.elasticsearch.index.shard.IndexingOperationListener}?
*/
public interface DocumentSizeReporter {
/**
Expand All @@ -22,12 +24,14 @@ public interface DocumentSizeReporter {
};

/**
* An action to be performed upon finished indexing.
* An action to be performed upon finished parsing.
* Note: Corresponds to {@link org.elasticsearch.index.shard.IndexingOperationListener#preIndex}
*/
default void onParsingCompleted(ParsedDocument parsedDocument) {}
default void onParsingCompleted(Engine.Index index) {}

/**
* An action to be performed upon finished indexing.
* Note: Corresponds to {@link org.elasticsearch.index.shard.IndexingOperationListener#postIndex}
*/
default void onIndexingCompleted(ParsedDocument parsedDocument) {}
default void onIndexingCompleted(Engine.Index index) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.plugins.internal;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.translog.Translog;

public interface XContentMeteringParserDecoratorSupplier {
XContentMeteringParserDecoratorSupplier NOOP = new XContentMeteringParserDecoratorSupplier() {
};

/**
* @return a parser decorator
*/
default XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
return XContentMeteringParserDecorator.NOOP;
}

/**
* @return a parser decorator
*/
default XContentMeteringParserDecorator newMeteringParserDecorator(Translog.Index index) {
return XContentMeteringParserDecorator.NOOP;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testExecuteBulkIndexRequest() throws Exception {

DocumentParsingProvider documentParsingProvider = mock();
XContentMeteringParserDecorator parserDecorator = mock();
when(documentParsingProvider.newMeteringParserDecorator(any())).thenReturn(parserDecorator);
when(documentParsingProvider.newMeteringParserDecorator(any(IndexRequest.class))).thenReturn(parserDecorator);
when(parserDecorator.decorate(any())).then(i -> i.getArgument(0));

BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testExecuteBulkIndexRequest() throws Exception {
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));

assertThat(replicaRequest, equalTo(primaryRequest));
verify(documentParsingProvider).newMeteringParserDecorator(any());
verify(documentParsingProvider).newMeteringParserDecorator(any(IndexRequest.class));
verify(parserDecorator).decorate(any());

// Assert that the document count is still 1
Expand Down Expand Up @@ -660,7 +660,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
assertThat(failure.getCause(), equalTo(err));
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));

verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any());
verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any(IndexRequest.class));
}

@SuppressWarnings("unchecked")
Expand Down
Loading