Skip to content

Commit

Permalink
Make requests_per_second configurable to throttle reindexing (elastic…
Browse files Browse the repository at this point in the history
…#120207) (elastic#120300)

* Make requests_per_second configurable to throttle reindexing

* Update docs/changelog/120207.yaml

* Add restrictions to prevent zero or negative rate limit

Also allow -1 as infinite

* PR Changes - Switch to cluster settings for rate limit retrieval
  • Loading branch information
lukewhiting authored Jan 16, 2025
1 parent 2b1210f commit dc3aaf1
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 28 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120207.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120207
summary: Make `requests_per_second` configurable to throttle reindexing
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -493,31 +493,4 @@ private static String getIndexUUID(String index) {
.get(IndexMetadata.SETTING_INDEX_UUID);
}

public void testGenerateDestIndexName_noDotPrefix() {
String sourceIndex = "sourceindex";
String expectedDestIndex = "migrated-sourceindex";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withDotPrefix() {
String sourceIndex = ".sourceindex";
String expectedDestIndex = ".migrated-sourceindex";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withHyphen() {
String sourceIndex = "source-index";
String expectedDestIndex = "migrated-source-index";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withUnderscore() {
String sourceIndex = "source_index";
String expectedDestIndex = "migrated-source_index";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING;
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;

public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
Expand Down Expand Up @@ -160,6 +161,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
public List<Setting<?>> getSettings() {
List<Setting<?>> pluginSettings = new ArrayList<>();
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
pluginSettings.add(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING);
return pluginSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -46,8 +47,37 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
ReindexDataStreamIndexAction.Request,
ReindexDataStreamIndexAction.Response> {

public static final String REINDEX_MAX_REQUESTS_PER_SECOND_KEY = "migrate.data_stream_reindex_max_request_per_second";

public static final Setting<Float> REINDEX_MAX_REQUESTS_PER_SECOND_SETTING = new Setting<>(
REINDEX_MAX_REQUESTS_PER_SECOND_KEY,
Float.toString(10f),
s -> {
if (s.equals("-1")) {
return Float.POSITIVE_INFINITY;
} else {
return Float.parseFloat(s);
}
},
value -> {
if (value <= 0f) {
throw new IllegalArgumentException(
"Failed to parse value ["
+ value
+ "] for setting ["
+ REINDEX_MAX_REQUESTS_PER_SECOND_KEY
+ "] "
+ "must be greater than 0 or -1 for infinite"
);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class);
private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false);

private final ClusterService clusterService;
private final Client client;

Expand Down Expand Up @@ -176,14 +206,16 @@ private void createIndex(
client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage));
}

private void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener, TaskId parentTaskId) {
// Visible for testing
void reindex(String sourceIndexName, String destIndexName, ActionListener<BulkByScrollResponse> listener, TaskId parentTaskId) {
logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName);
var reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndexName);
reindexRequest.getSearchRequest().allowPartialSearchResults(false);
reindexRequest.getSearchRequest().source().fetchSource(true);
reindexRequest.setDestIndex(destIndexName);
reindexRequest.setParentTask(parentTaskId);
reindexRequest.setRequestsPerSecond(clusterService.getClusterSettings().get(REINDEX_MAX_REQUESTS_PER_SECOND_SETTING));
reindexRequest.setSlices(0); // equivalent to slices=auto in rest api
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.Collections;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private TransportService transportService;
@Mock
private ClusterService clusterService;
@Mock
private ActionFilters actionFilters;
@Mock
private Client client;

@InjectMocks
private ReindexDataStreamIndexTransportAction action;

@Captor
private ArgumentCaptor<ReindexRequest> request;

private AutoCloseable mocks;

@Before
public void setUp() throws Exception {
super.setUp();
mocks = MockitoAnnotations.openMocks(this);
}

@After
public void tearDown() throws Exception {
super.tearDown();
mocks.close();
}

public void testGenerateDestIndexName_noDotPrefix() {
String sourceIndex = "sourceindex";
String expectedDestIndex = "migrated-sourceindex";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withDotPrefix() {
String sourceIndex = ".sourceindex";
String expectedDestIndex = ".migrated-sourceindex";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withHyphen() {
String sourceIndex = "source-index";
String expectedDestIndex = "migrated-source-index";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testGenerateDestIndexName_withUnderscore() {
String sourceIndex = "source_index";
String expectedDestIndex = "migrated-source_index";
String actualDestIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex);
assertEquals(expectedDestIndex, actualDestIndex);
}

public void testReindexIncludesRateLimit() {
var targetRateLimit = randomFloatBetween(1, 100, true);
Settings settings = Settings.builder()
.put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), targetRateLimit)
.build();

String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
ActionListener<BulkByScrollResponse> listener = ActionListener.noop();
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
settings,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);

doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));

action.reindex(sourceIndex, destIndex, listener, taskId);

ReindexRequest requestValue = request.getValue();

assertEquals(targetRateLimit, requestValue.getRequestsPerSecond(), 0.0);
}

public void testReindexIncludesInfiniteRateLimit() {
Settings settings = Settings.builder()
.put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), "-1")
.build();

String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
ActionListener<BulkByScrollResponse> listener = ActionListener.noop();
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
settings,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), eq(listener));

action.reindex(sourceIndex, destIndex, listener, taskId);

ReindexRequest requestValue = request.getValue();

assertEquals(Float.POSITIVE_INFINITY, requestValue.getRequestsPerSecond(), 0.0);
}

public void testReindexZeroRateLimitThrowsError() {
Settings settings = Settings.builder()
.put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), "0")
.build();

String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
ActionListener<BulkByScrollResponse> listener = ActionListener.noop();
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
settings,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> action.reindex(sourceIndex, destIndex, listener, taskId)
);
assertEquals(
"Failed to parse value [0.0] for setting [migrate.data_stream_reindex_max_request_per_second]"
+ " must be greater than 0 or -1 for infinite",
e.getMessage()
);
}

public void testReindexNegativeRateLimitThrowsError() {
float targetRateLimit = randomFloatBetween(-100, -1, true);
Settings settings = Settings.builder()
.put(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING.getKey(), targetRateLimit)
.build();

String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
ActionListener<BulkByScrollResponse> listener = ActionListener.noop();
TaskId taskId = TaskId.EMPTY_TASK_ID;

when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
settings,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> action.reindex(sourceIndex, destIndex, listener, taskId)
);
assertEquals(
"Failed to parse value ["
+ targetRateLimit
+ "] for setting [migrate.data_stream_reindex_max_request_per_second]"
+ " must be greater than 0 or -1 for infinite",
e.getMessage()
);
}
}

0 comments on commit dc3aaf1

Please sign in to comment.