Skip to content

Commit 70d2d6a

Browse files
Delay searchable snapshot allocation during shutdown (#86153)
During a shutdown, searchable snapshots should not be re-allocated to new nodes, since this leads to data download. Instead, now wait for the shutdown reallocation delay before reallocating shards. Fixes #85052
1 parent 46db6a5 commit 70d2d6a

File tree

5 files changed

+186
-16
lines changed

5 files changed

+186
-16
lines changed

docs/changelog/86153.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 86153
2+
summary: Delay searchable snapshot allocation during shutdown
3+
area: "Snapshot/Restore"
4+
type: bug
5+
issues:
6+
- 85052

server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -250,27 +250,40 @@ public AllocateUnassignedDecision makeAllocationDecision(
250250
// if we didn't manage to find *any* data (regardless of matching sizes), and the replica is
251251
// unassigned due to a node leaving, so we delay allocation of this replica to see if the
252252
// node with the shard copy will rejoin so we can re-use the copy it has
253-
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
254-
long remainingDelayMillis = 0L;
255-
long totalDelayMillis = 0L;
256-
if (explain) {
257-
UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo();
258-
Metadata metadata = allocation.metadata();
259-
IndexMetadata indexMetadata = metadata.index(unassignedShard.index());
260-
totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis();
261-
long remainingDelayNanos = unassignedInfo.getRemainingDelay(
262-
System.nanoTime(),
263-
indexMetadata.getSettings(),
264-
metadata.nodeShutdowns()
265-
);
266-
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
267-
}
268-
return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions);
253+
return delayedDecision(unassignedShard, allocation, logger, nodeDecisions);
269254
}
270255

271256
return AllocateUnassignedDecision.NOT_TAKEN;
272257
}
273258

259+
/**
260+
* Return a delayed decision, filling in the right amount of remaining time if decisions are debugged/explained.
261+
*/
262+
public static AllocateUnassignedDecision delayedDecision(
263+
ShardRouting unassignedShard,
264+
RoutingAllocation allocation,
265+
Logger logger,
266+
List<NodeAllocationResult> nodeDecisions
267+
) {
268+
boolean explain = allocation.debugDecision();
269+
logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard);
270+
long remainingDelayMillis = 0L;
271+
long totalDelayMillis = 0L;
272+
if (explain) {
273+
UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo();
274+
Metadata metadata = allocation.metadata();
275+
IndexMetadata indexMetadata = metadata.index(unassignedShard.index());
276+
totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis();
277+
long remainingDelayNanos = unassignedInfo.getRemainingDelay(
278+
System.nanoTime(),
279+
indexMetadata.getSettings(),
280+
metadata.nodeShutdowns()
281+
);
282+
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
283+
}
284+
return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions);
285+
}
286+
274287
/**
275288
* Determines if the shard can be allocated on at least one node based on the allocation deciders.
276289
*

x-pack/plugin/searchable-snapshots/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies {
1414
compileOnly project(path: xpackModule('core'))
1515
implementation project(path: 'preallocate')
1616
internalClusterTestImplementation(testArtifact(project(xpackModule('core'))))
17+
internalClusterTestImplementation(project(path: xpackModule('shutdown')))
1718
internalClusterTestImplementation(project(path: ':modules:reindex'))
1819
}
1920

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.searchablesnapshots.allocation;
9+
10+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
11+
import org.elasticsearch.client.internal.Requests;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.util.CollectionUtils;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.snapshots.SnapshotId;
19+
import org.elasticsearch.test.ESIntegTestCase;
20+
import org.elasticsearch.test.InternalTestCluster;
21+
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
22+
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
23+
import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService;
24+
import org.elasticsearch.xpack.shutdown.DeleteShutdownNodeAction;
25+
import org.elasticsearch.xpack.shutdown.PutShutdownNodeAction;
26+
import org.elasticsearch.xpack.shutdown.ShutdownPlugin;
27+
import org.hamcrest.Matchers;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.Locale;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.StreamSupport;
38+
39+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
40+
41+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2)
42+
public class SearchableSnapshotShutdownIntegTests extends BaseSearchableSnapshotsIntegTestCase {
43+
44+
@Override
45+
protected Collection<Class<? extends Plugin>> nodePlugins() {
46+
return CollectionUtils.appendToCopy(super.nodePlugins(), ShutdownPlugin.class);
47+
}
48+
49+
@Override
50+
protected int numberOfShards() {
51+
// use 1 shard per index and instead use multiple indices to have multiple shards.
52+
return 1;
53+
}
54+
55+
public void testAllocationDisabledDuringShutdown() throws Exception {
56+
final List<String> restoredIndexNames = setupMountedIndices();
57+
final String[] restoredIndexNamesArray = restoredIndexNames.toArray(String[]::new);
58+
final Set<String> indexNodes = restoredIndexNames.stream()
59+
.flatMap(index -> internalCluster().nodesInclude(index).stream())
60+
.collect(Collectors.toSet());
61+
final ClusterState state = client().admin().cluster().prepareState().clear().setRoutingTable(true).setNodes(true).get().getState();
62+
final Map<String, String> nodeNameToId = state.getNodes()
63+
.stream()
64+
.collect(Collectors.toMap(DiscoveryNode::getName, DiscoveryNode::getId));
65+
66+
for (String indexNode : indexNodes) {
67+
final String indexNodeId = nodeNameToId.get(indexNode);
68+
putShutdown(indexNodeId);
69+
final int shards = (int) StreamSupport.stream(state.routingTable().allShards(restoredIndexNamesArray).spliterator(), false)
70+
.filter(s -> indexNodeId.equals(s.currentNodeId()))
71+
.count();
72+
assert shards > 0;
73+
74+
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
75+
assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
76+
waitForBlobCacheFillsToComplete();
77+
final CacheService cacheService = internalCluster().getInstance(CacheService.class, indexNode);
78+
cacheService.synchronizeCache();
79+
80+
logger.info("--> Restarting [{}/{}]", indexNodeId, indexNode);
81+
internalCluster().restartNode(indexNode, new InternalTestCluster.RestartCallback() {
82+
@Override
83+
public Settings onNodeStopped(String nodeName) throws Exception {
84+
assertBusy(() -> {
85+
ClusterHealthResponse response = client().admin()
86+
.cluster()
87+
.health(Requests.clusterHealthRequest(restoredIndexNamesArray))
88+
.actionGet();
89+
assertThat(response.getUnassignedShards(), Matchers.equalTo(shards));
90+
});
91+
return super.onNodeStopped(nodeName);
92+
}
93+
});
94+
// leave shutdown in place for some nodes to check that shards get assigned anyway.
95+
if (randomBoolean()) {
96+
removeShutdown(indexNodeId);
97+
}
98+
}
99+
100+
ensureGreen(restoredIndexNamesArray);
101+
}
102+
103+
private List<String> setupMountedIndices() throws Exception {
104+
int count = between(1, 10);
105+
List<String> restoredIndices = new ArrayList<>();
106+
final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
107+
createRepository(repositoryName, "mock");
108+
109+
for (int i = 0; i < count; ++i) {
110+
final String indexName = "index_" + i;
111+
createAndPopulateIndex(indexName, Settings.builder());
112+
113+
final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)).snapshotId();
114+
assertAcked(client().admin().indices().prepareDelete(indexName));
115+
restoredIndices.add(mountSnapshot(repositoryName, snapshotId.getName(), indexName, Settings.EMPTY));
116+
}
117+
return restoredIndices;
118+
}
119+
120+
private void putShutdown(String nodeToRestartId) throws InterruptedException, ExecutionException {
121+
PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request(
122+
nodeToRestartId,
123+
SingleNodeShutdownMetadata.Type.RESTART,
124+
this.getTestName(),
125+
null,
126+
null
127+
);
128+
assertTrue(client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get().isAcknowledged());
129+
}
130+
131+
private void removeShutdown(String node) throws ExecutionException, InterruptedException {
132+
assertTrue(client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(node)).get().isAcknowledged());
133+
}
134+
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/allocation/SearchableSnapshotAllocator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
1919
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
20+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodes;
2223
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -341,11 +342,26 @@ private AllocateUnassignedDecision decideAllocation(RoutingAllocation allocation
341342
);
342343
return AllocateUnassignedDecision.yes(nodeWithHighestMatch.node(), null, nodeDecisions, true);
343344
}
345+
} else if (isDelayedDueToNodeRestart(allocation, shardRouting)) {
346+
return ReplicaShardAllocator.delayedDecision(shardRouting, allocation, logger, nodeDecisions);
344347
}
348+
345349
// TODO: do we need handling of delayed allocation for leaving replicas here?
346350
return AllocateUnassignedDecision.NOT_TAKEN;
347351
}
348352

353+
private boolean isDelayedDueToNodeRestart(RoutingAllocation allocation, ShardRouting shardRouting) {
354+
if (shardRouting.unassignedInfo().isDelayed()) {
355+
String lastAllocatedNodeId = shardRouting.unassignedInfo().getLastAllocatedNodeId();
356+
if (lastAllocatedNodeId != null) {
357+
SingleNodeShutdownMetadata nodeShutdownMetadata = allocation.nodeShutdowns().get(lastAllocatedNodeId);
358+
return nodeShutdownMetadata != null && nodeShutdownMetadata.getType() == SingleNodeShutdownMetadata.Type.RESTART;
359+
}
360+
}
361+
362+
return false;
363+
}
364+
349365
@Override
350366
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting shardRouting, RoutingAllocation routingAllocation) {
351367
assert shardRouting.unassigned();

0 commit comments

Comments
 (0)