Skip to content

Commit c2d316a

Browse files
committed
Remove obsolete resolving logic from TRA (#49685)
This stems from a time where index requests were directly forwarded to TransportReplicationAction. Nowadays they are wrapped in a BulkShardRequest, and this logic is obsolete. In contrast to prior PR (#49647), this PR also fixes (see b3697cc) a situation where the previous index expression logic had an interesting side effect. For bulk requests (which had resolveIndex = false), the reroute phase was waiting for the index to appear in case where it was not present, and for all other replication requests (resolveIndex = true) it would right away throw an IndexNotFoundException while resolving the name and exit. With #49647, every replication request was now waiting for the index to appear, which was problematic when the given index had just been deleted (e.g. deleting a follower index while it's still receiving requests from the leader, where these requests would now wait up to a minute for the index to appear). This PR now adds b3697cc on top of that prior PR to make sure to reestablish some of the prior behavior where the reroute phase waits for the bulk request for the index to appear. That logic was in place to ensure that when an index was created and not all nodes had learned about it yet, that the bulk would not fail somewhere in the reroute phase. This is now only restricted to the situation where the current node has an older cluster state than the one that coordinated the bulk request (which checks that the index is present). This also means that when an index is deleted, we will no longer unnecessarily wait up to the timeout for the index o appear, and instead fail the request. Closes #20279
1 parent 4edb2e7 commit c2d316a

27 files changed

+149
-153
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3232
import org.elasticsearch.cluster.block.ClusterBlock;
3333
import org.elasticsearch.cluster.block.ClusterBlocks;
34-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3534
import org.elasticsearch.cluster.service.ClusterService;
3635
import org.elasticsearch.common.inject.Inject;
3736
import org.elasticsearch.common.io.stream.StreamInput;
@@ -58,8 +57,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
5857
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
5958
final ClusterService clusterService, final IndicesService indicesService,
6059
final ThreadPool threadPool, final ShardStateAction stateAction,
61-
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
62-
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
60+
final ActionFilters actionFilters) {
61+
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
6362
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
6463
}
6564

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.support.replication.ReplicationResponse;
2525
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2626
import org.elasticsearch.cluster.action.shard.ShardStateAction;
27-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2827
import org.elasticsearch.cluster.service.ClusterService;
2928
import org.elasticsearch.common.inject.Inject;
3029
import org.elasticsearch.common.io.stream.StreamInput;
@@ -44,9 +43,9 @@ public class TransportShardFlushAction
4443
@Inject
4544
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
4645
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
47-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
46+
ActionFilters actionFilters) {
4847
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
49-
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
48+
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
5049
}
5150

5251
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.support.replication.ReplicationResponse;
2626
import org.elasticsearch.action.support.replication.TransportReplicationAction;
2727
import org.elasticsearch.cluster.action.shard.ShardStateAction;
28-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2928
import org.elasticsearch.cluster.service.ClusterService;
3029
import org.elasticsearch.common.inject.Inject;
3130
import org.elasticsearch.common.io.stream.StreamInput;
@@ -46,9 +45,9 @@ public class TransportShardRefreshAction
4645
@Inject
4746
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
4847
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
49-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
48+
ActionFilters actionFilters) {
5049
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
51-
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
50+
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
5251
}
5352

5453
@Override

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ public String getDescription() {
118118
return stringBuilder.toString();
119119
}
120120

121+
@Override
122+
protected BulkShardRequest routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
123+
return super.routedBasedOnClusterVersion(routedBasedOnClusterVersion);
124+
}
125+
121126
@Override
122127
public void onRetry() {
123128
for (BulkItemRequest item : items) {

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ protected void doRun() {
490490
requests.toArray(new BulkItemRequest[requests.size()]));
491491
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
492492
bulkShardRequest.timeout(bulkRequest.timeout());
493+
bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
493494
if (task != null) {
494495
bulkShardRequest.setParentTask(nodeId, task.getId());
495496
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
4444
import org.elasticsearch.cluster.action.shard.ShardStateAction;
4545
import org.elasticsearch.cluster.metadata.IndexMetaData;
46-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4746
import org.elasticsearch.cluster.metadata.MappingMetaData;
4847
import org.elasticsearch.cluster.service.ClusterService;
4948
import org.elasticsearch.common.bytes.BytesReference;
@@ -91,10 +90,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
9190
@Inject
9291
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
9392
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
94-
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
95-
IndexNameExpressionResolver indexNameExpressionResolver) {
93+
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
9694
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
97-
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
95+
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
9896
this.updateHelper = updateHelper;
9997
this.mappingUpdatedAction = mappingUpdatedAction;
10098
}
@@ -109,11 +107,6 @@ protected BulkShardResponse newResponseInstance(StreamInput in) throws IOExcepti
109107
return new BulkShardResponse(in);
110108
}
111109

112-
@Override
113-
protected boolean resolveIndex() {
114-
return false;
115-
}
116-
117110
@Override
118111
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
119112
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.action.support.replication.TransportWriteAction;
2828
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2929
import org.elasticsearch.cluster.block.ClusterBlockLevel;
30-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3130
import org.elasticsearch.cluster.routing.ShardRouting;
3231
import org.elasticsearch.cluster.service.ClusterService;
3332
import org.elasticsearch.common.inject.Inject;
@@ -55,10 +54,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
5554
@Inject
5655
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
5756
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
58-
ShardStateAction shardStateAction, ActionFilters actionFilters,
59-
IndexNameExpressionResolver indexNameExpressionResolver) {
57+
ShardStateAction shardStateAction, ActionFilters actionFilters) {
6058
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
61-
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
59+
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
6260
true /* we should never reject resync because of thread pool capacity on primary */);
6361
}
6462

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public final Request waitForActiveShards(final int waitForActiveShards) {
169169
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
170170
*/
171171
@SuppressWarnings("unchecked")
172-
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
172+
protected Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
173173
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
174174
return (Request) this;
175175
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 37 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@
4141
import org.elasticsearch.cluster.block.ClusterBlockException;
4242
import org.elasticsearch.cluster.block.ClusterBlockLevel;
4343
import org.elasticsearch.cluster.metadata.IndexMetaData;
44-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4544
import org.elasticsearch.cluster.node.DiscoveryNode;
4645
import org.elasticsearch.cluster.routing.AllocationId;
47-
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
4846
import org.elasticsearch.cluster.routing.ShardRouting;
4947
import org.elasticsearch.cluster.service.ClusterService;
5048
import org.elasticsearch.common.Nullable;
@@ -105,7 +103,6 @@ public abstract class TransportReplicationAction<
105103
protected final ClusterService clusterService;
106104
protected final ShardStateAction shardStateAction;
107105
protected final IndicesService indicesService;
108-
protected final IndexNameExpressionResolver indexNameExpressionResolver;
109106
protected final TransportRequestOptions transportOptions;
110107
protected final String executor;
111108

@@ -118,19 +115,17 @@ public abstract class TransportReplicationAction<
118115
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
119116
ClusterService clusterService, IndicesService indicesService,
120117
ThreadPool threadPool, ShardStateAction shardStateAction,
121-
ActionFilters actionFilters,
122-
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
118+
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
123119
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
124120
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
125-
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
121+
requestReader, replicaRequestReader, executor, false, false);
126122
}
127123

128124

129125
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
130126
ClusterService clusterService, IndicesService indicesService,
131127
ThreadPool threadPool, ShardStateAction shardStateAction,
132-
ActionFilters actionFilters,
133-
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
128+
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
134129
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
135130
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
136131
super(actionName, actionFilters, transportService.getTaskManager());
@@ -139,7 +134,6 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
139134
this.clusterService = clusterService;
140135
this.indicesService = indicesService;
141136
this.shardStateAction = shardStateAction;
142-
this.indexNameExpressionResolver = indexNameExpressionResolver;
143137
this.executor = executor;
144138

145139
this.transportPrimaryAction = actionName + "[p]";
@@ -220,21 +214,10 @@ public ClusterBlockLevel indexBlockLevel() {
220214
return null;
221215
}
222216

223-
/**
224-
* True if provided index should be resolved when resolving request
225-
*/
226-
protected boolean resolveIndex() {
227-
return true;
228-
}
229-
230217
protected TransportRequestOptions transportOptions(Settings settings) {
231218
return TransportRequestOptions.EMPTY;
232219
}
233220

234-
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
235-
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
236-
}
237-
238221
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
239222
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
240223
if (globalBlockLevel != null) {
@@ -649,8 +632,7 @@ public void onFailure(Exception e) {
649632
protected void doRun() {
650633
setPhase(task, "routing");
651634
final ClusterState state = observer.setAndGetObservedState();
652-
final String concreteIndex = concreteIndex(state, request);
653-
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
635+
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
654636
if (blockException != null) {
655637
if (blockException.retryable()) {
656638
logger.trace("cluster is blocked, scheduling a retry", blockException);
@@ -659,23 +641,47 @@ protected void doRun() {
659641
finishAsFailed(blockException);
660642
}
661643
} else {
662-
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
663-
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
644+
final IndexMetaData indexMetaData = state.metaData().index(request.shardId().getIndex());
664645
if (indexMetaData == null) {
665-
retry(new IndexNotFoundException(concreteIndex));
666-
return;
646+
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
647+
if (state.version() < request.routedBasedOnClusterVersion()) {
648+
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
649+
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
650+
request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
651+
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
652+
"] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
653+
request.shardId().getIndexName()));
654+
return;
655+
} else {
656+
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
657+
return;
658+
}
667659
}
660+
668661
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
669-
throw new IndexClosedException(indexMetaData.getIndex());
662+
finishAsFailed(new IndexClosedException(indexMetaData.getIndex()));
663+
return;
670664
}
671665

672-
// resolve all derived request fields, so we can route and apply it
673-
resolveRequest(indexMetaData, request);
666+
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
667+
// if the wait for active shard count has not been set in the request,
668+
// resolve it from the index settings
669+
request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
670+
}
674671
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
675672
"request waitForActiveShards must be set in resolveRequest";
676673

677-
final ShardRouting primary = primary(state);
678-
if (retryIfUnavailable(state, primary)) {
674+
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
675+
if (primary == null || primary.active() == false) {
676+
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
677+
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
678+
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
679+
return;
680+
}
681+
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
682+
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
683+
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
684+
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
679685
return;
680686
}
681687
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
@@ -719,27 +725,6 @@ private void performRemoteAction(ClusterState state, ShardRouting primary, Disco
719725
performAction(node, actionName, false, request);
720726
}
721727

722-
private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
723-
if (primary == null || primary.active() == false) {
724-
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
725-
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
726-
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
727-
return true;
728-
}
729-
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
730-
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
731-
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
732-
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
733-
return true;
734-
}
735-
return false;
736-
}
737-
738-
private ShardRouting primary(ClusterState state) {
739-
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
740-
return indexShard.primaryShard();
741-
}
742-
743728
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
744729
final TransportRequest requestToPerform) {
745730
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

0 commit comments

Comments
 (0)