From dc3aaf1dc10d0c22ae867f118a51209555908055 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 16 Jan 2025 17:09:25 +0000 Subject: [PATCH] Make requests_per_second configurable to throttle reindexing (#120207) (#120300) * Make requests_per_second configurable to throttle reindexing * Update docs/changelog/120207.yaml * Add restrictions to prevent zero or negative rate limit Also allow -1 as infinite * PR Changes - Switch to cluster settings for rate limit retrieval --- docs/changelog/120207.yaml | 5 + ...indexDatastreamIndexTransportActionIT.java | 27 --- .../xpack/migrate/MigratePlugin.java | 2 + ...ReindexDataStreamIndexTransportAction.java | 34 ++- ...exDataStreamIndexTransportActionTests.java | 206 ++++++++++++++++++ 5 files changed, 246 insertions(+), 28 deletions(-) create mode 100644 docs/changelog/120207.yaml create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java diff --git a/docs/changelog/120207.yaml b/docs/changelog/120207.yaml new file mode 100644 index 0000000000000..c01dfc6aecf78 --- /dev/null +++ b/docs/changelog/120207.yaml @@ -0,0 +1,5 @@ +pr: 120207 +summary: Make `requests_per_second` configurable to throttle reindexing +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index 32e31929fc2c1..e5d766c971185 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -493,31 +493,4 @@ private static String getIndexUUID(String index) { .get(IndexMetadata.SETTING_INDEX_UUID); } - public void testGenerateDestIndexName_noDotPrefix() { - String sourceIndex = "sourceindex"; - String expectedDestIndex = "migrated-sourceindex"; - String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); - assertEquals(expectedDestIndex, actualDestIndex); - } - - public void testGenerateDestIndexName_withDotPrefix() { - String sourceIndex = ".sourceindex"; - String expectedDestIndex = ".migrated-sourceindex"; - String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); - assertEquals(expectedDestIndex, actualDestIndex); - } - - public void testGenerateDestIndexName_withHyphen() { - String sourceIndex = "source-index"; - String expectedDestIndex = "migrated-source-index"; - String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); - assertEquals(expectedDestIndex, actualDestIndex); - } - - public void testGenerateDestIndexName_withUnderscore() { - String sourceIndex = "source_index"; - String expectedDestIndex = "migrated-source_index"; - String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); - assertEquals(expectedDestIndex, actualDestIndex); - } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index 55ec4065be8c4..93b90e551e721 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -59,6 +59,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING; import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING; public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { @@ -160,6 +161,7 @@ public List> getPersistentTasksExecutor( public List> getSettings() { List> pluginSettings = new ArrayList<>(); pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING); + pluginSettings.add(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING); return pluginSettings; } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 7bb440bc52a15..65e4214d808dc 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -46,8 +47,37 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio ReindexDataStreamIndexAction.Request, ReindexDataStreamIndexAction.Response> { + public static final String REINDEX_MAX_REQUESTS_PER_SECOND_KEY = "migrate.data_stream_reindex_max_request_per_second"; + + public static final Setting REINDEX_MAX_REQUESTS_PER_SECOND_SETTING = new Setting<>( + REINDEX_MAX_REQUESTS_PER_SECOND_KEY, + Float.toString(10f), + s -> { + if (s.equals("-1")) { + return Float.POSITIVE_INFINITY; + } else { + return Float.parseFloat(s); + } + }, + value -> { + if (value <= 0f) { + throw new IllegalArgumentException( + "Failed to parse value [" + + value + + "] for setting [" + + REINDEX_MAX_REQUESTS_PER_SECOND_KEY + + "] " + + "must be greater than 0 or -1 for infinite" + ); + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class); private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); + private final ClusterService clusterService; private final Client client; @@ -176,7 +206,8 @@ private void createIndex( client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage)); } - private void reindex(String sourceIndexName, String destIndexName, ActionListener listener, TaskId parentTaskId) { + // Visible for testing + void reindex(String sourceIndexName, String destIndexName, ActionListener listener, TaskId parentTaskId) { logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName); var reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(sourceIndexName); @@ -184,6 +215,7 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene reindexRequest.getSearchRequest().source().fetchSource(true); reindexRequest.setDestIndex(destIndexName); reindexRequest.setParentTask(parentTaskId); + reindexRequest.setRequestsPerSecond(clusterService.getClusterSettings().get(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)); reindexRequest.setSlices(0); // equivalent to slices=auto in rest api client.execute(ReindexAction.INSTANCE, reindexRequest, listener); } diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java new file mode 100644 index 0000000000000..99e1031dec3a2 --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java @@ -0,0 +1,206 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.Before; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private TransportService transportService; + @Mock + private ClusterService clusterService; + @Mock + private ActionFilters actionFilters; + @Mock + private Client client; + + @InjectMocks + private ReindexDataStreamIndexTransportAction action; + + @Captor + private ArgumentCaptor request; + + private AutoCloseable mocks; + + @Before + public void setUp() throws Exception { + super.setUp(); + mocks = MockitoAnnotations.openMocks(this); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + mocks.close(); + } + + public void testGenerateDestIndexName_noDotPrefix() { + String sourceIndex = "sourceindex"; + String expectedDestIndex = "migrated-sourceindex"; + String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + assertEquals(expectedDestIndex, actualDestIndex); + } + + public void testGenerateDestIndexName_withDotPrefix() { + String sourceIndex = ".sourceindex"; + String expectedDestIndex = ".migrated-sourceindex"; + String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + assertEquals(expectedDestIndex, actualDestIndex); + } + + public void testGenerateDestIndexName_withHyphen() { + String sourceIndex = "source-index"; + String expectedDestIndex = "migrated-source-index"; + String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + assertEquals(expectedDestIndex, actualDestIndex); + } + + public void testGenerateDestIndexName_withUnderscore() { + String sourceIndex = "source_index"; + String expectedDestIndex = "migrated-source_index"; + String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + assertEquals(expectedDestIndex, actualDestIndex); + } + + public void testReindexIncludesRateLimit() { + var targetRateLimit = randomFloatBetween(1, 100, true); + Settings settings = Settings.builder() + .put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), targetRateLimit) + .build(); + + String sourceIndex = randomAlphanumericOfLength(10); + String destIndex = randomAlphanumericOfLength(10); + ActionListener listener = ActionListener.noop(); + TaskId taskId = TaskId.EMPTY_TASK_ID; + + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings( + settings, + Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) + ) + ); + + doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener)); + + action.reindex(sourceIndex, destIndex, listener, taskId); + + ReindexRequest requestValue = request.getValue(); + + assertEquals(targetRateLimit, requestValue.getRequestsPerSecond(), 0.0); + } + + public void testReindexIncludesInfiniteRateLimit() { + Settings settings = Settings.builder() + .put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), "-1") + .build(); + + String sourceIndex = randomAlphanumericOfLength(10); + String destIndex = randomAlphanumericOfLength(10); + ActionListener listener = ActionListener.noop(); + TaskId taskId = TaskId.EMPTY_TASK_ID; + + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings( + settings, + Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) + ) + ); + doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener)); + + action.reindex(sourceIndex, destIndex, listener, taskId); + + ReindexRequest requestValue = request.getValue(); + + assertEquals(Float.POSITIVE_INFINITY, requestValue.getRequestsPerSecond(), 0.0); + } + + public void testReindexZeroRateLimitThrowsError() { + Settings settings = Settings.builder() + .put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), "0") + .build(); + + String sourceIndex = randomAlphanumericOfLength(10); + String destIndex = randomAlphanumericOfLength(10); + ActionListener listener = ActionListener.noop(); + TaskId taskId = TaskId.EMPTY_TASK_ID; + + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings( + settings, + Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) + ) + ); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> action.reindex(sourceIndex, destIndex, listener, taskId) + ); + assertEquals( + "Failed to parse value [0.0] for setting [migrate.data_stream_reindex_max_request_per_second]" + + " must be greater than 0 or -1 for infinite", + e.getMessage() + ); + } + + public void testReindexNegativeRateLimitThrowsError() { + float targetRateLimit = randomFloatBetween(-100, -1, true); + Settings settings = Settings.builder() + .put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), targetRateLimit) + .build(); + + String sourceIndex = randomAlphanumericOfLength(10); + String destIndex = randomAlphanumericOfLength(10); + ActionListener listener = ActionListener.noop(); + TaskId taskId = TaskId.EMPTY_TASK_ID; + + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings( + settings, + Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) + ) + ); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> action.reindex(sourceIndex, destIndex, listener, taskId) + ); + assertEquals( + "Failed to parse value [" + + targetRateLimit + + "] for setting [migrate.data_stream_reindex_max_request_per_second]" + + " must be greater than 0 or -1 for infinite", + e.getMessage() + ); + } +}