Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,18 @@ public void testClusterHealthNotFoundIndex() throws IOException {
createIndex("index", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(true));
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
assertNoIndices(response);
// After the fix, querying non-existent index should return 404 instead of timeout
OpenSearchException exception = expectThrows(
OpenSearchException.class,
() -> execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync)
);

assertThat(exception.status(), equalTo(RestStatus.NOT_FOUND));
assertThat(
exception.getMessage(),
equalTo("OpenSearch exception [type=index_not_found_exception, reason=no such index [notexisted-index]]")
);
}

public void testRemoteInfo() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -55,7 +56,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -83,19 +86,20 @@ public void testSimpleLocalHealth() {

public void testHealth() {
logger.info("--> running cluster health on an index that does not exists");
IndexNotFoundException exception = expectThrows(
IndexNotFoundException.class,
() -> client().admin().cluster().prepareHealth("test1").setWaitForYellowStatus().setTimeout("1s").execute().actionGet()
);
assertThat(exception.getMessage(), equalTo("no such index [test1]"));

logger.info("--> running cluster wide health");
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth("test1")
.setWaitForYellowStatus()
.setTimeout("1s")
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout("10s")
.execute()
.actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(true));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(healthResponse.getIndices().isEmpty(), equalTo(true));

logger.info("--> running cluster wide health");
healthResponse = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(healthResponse.getIndices().isEmpty(), equalTo(true));
Expand All @@ -110,17 +114,37 @@ public void testHealth() {
assertThat(healthResponse.getIndices().get("test1").getStatus(), equalTo(ClusterHealthStatus.GREEN));

logger.info("--> running cluster health on an index that does exists and an index that doesn't exists");
healthResponse = client().admin()
.cluster()
.prepareHealth("test1", "test2")
.setWaitForYellowStatus()
.setTimeout("1s")
.execute()
.actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(true));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(healthResponse.getIndices().get("test1").getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(healthResponse.getIndices().size(), equalTo(1));
IndexNotFoundException exception2 = expectThrows(
IndexNotFoundException.class,
() -> client().admin().cluster().prepareHealth("test1", "test2").setWaitForYellowStatus().setTimeout("1s").execute().actionGet()
);
assertThat(exception2.getMessage(), equalTo("no such index [test2]"));
}

public void testHealthOnMissingIndexTimeout() {
logger.info("--> testing cluster health timeout returns 404 for missing index");
// This test specifically validates that when waiting for a non-existent index,
// the onTimeout() callback correctly returns IndexNotFoundException instead of
// a timeout response with RED status
long startTime = System.currentTimeMillis();
IndexNotFoundException exception = expectThrows(
IndexNotFoundException.class,
() -> client().admin()
.cluster()
.prepareHealth("missing-index-" + randomAlphaOfLength(5))
.setWaitForGreenStatus()
.setTimeout("2s") // Use a longer timeout to ensure onTimeout() fires
.execute()
.actionGet()
);
long elapsedTime = System.currentTimeMillis() - startTime;

// Verify we got the correct exception
assertThat(exception.getMessage(), containsString("no such index"));

// Verify that we actually waited for the timeout (should be close to 2 seconds)
// This confirms the onTimeout() callback was triggered
assertThat("Expected timeout to be triggered", elapsedTime, greaterThanOrEqualTo(1500L));
}

public void testHealthWithClosedIndices() {
Expand Down Expand Up @@ -341,7 +365,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
createIndex("index");
assertFalse(client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().get().isTimedOut());

// at this point the original health response should not have returned: there was never a point where the index was green AND
// at this point the original health response should not have returned: there
// was never a point where the index was green AND
// the cluster-manager had processed all pending tasks above LANGUID priority.
assertFalse(healthResponseFuture.isDone());
keepSubmittingTasks.set(false);
Expand All @@ -356,10 +381,14 @@ public void testHealthOnClusterManagerFailover() throws Exception {
final String node = internalCluster().startDataOnlyNode();
final boolean withIndex = randomBoolean();
if (withIndex) {
// Create index with many shards to provoke the health request to wait (for green) while cluster-manager is being shut down.
// Notice that this is set to 0 after the test completed starting a number of health requests and cluster-manager restarts.
// This ensures that the cluster is yellow when the health request is made, making the health request wait on the observer,
// triggering a call to observer.onClusterServiceClose when cluster-manager is shutdown.
// Create index with many shards to provoke the health request to wait (for
// green) while cluster-manager is being shut down.
// Notice that this is set to 0 after the test completed starting a number of
// health requests and cluster-manager restarts.
// This ensures that the cluster is yellow when the health request is made,
// making the health request wait on the observer,
// triggering a call to observer.onClusterServiceClose when cluster-manager is
// shutdown.
createIndex(
"test",
Settings.builder()
Expand All @@ -370,7 +399,8 @@ public void testHealthOnClusterManagerFailover() throws Exception {
);
}
final List<ActionFuture<ClusterHealthResponse>> responseFutures = new ArrayList<>();
// Run a few health requests concurrent to cluster-manager fail-overs against a data-node
// Run a few health requests concurrent to cluster-manager fail-overs against a
// data-node
// to make sure cluster-manager failover is handled without exceptions
final int iterations = withIndex ? 10 : 20;
for (int i = 0; i < iterations; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -40,6 +40,7 @@

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
Expand Down Expand Up @@ -646,7 +647,8 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// Check cluster health for weighed in node, health check should return a response with 200 status code
// Check cluster health for weighed in node, health check should return a
// response with 200 status code
ClusterHealthResponse nodeLocalHealth = client(nodes_in_zone_a.get(0)).admin()
.cluster()
.prepareHealth()
Expand All @@ -655,25 +657,29 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
.get();
assertFalse(nodeLocalHealth.isTimedOut());

// Check cluster health for weighed away node, health check should respond with an exception
// Check cluster health for weighed away node, health check should respond with
// an exception
NodeWeighedAwayException ex = expectThrows(
NodeWeighedAwayException.class,
() -> client(nodes_in_zone_c.get(0)).admin().cluster().prepareHealth().setLocal(true).setEnsureNodeWeighedIn(true).get()
);
assertTrue(ex.getMessage().contains("local node is weighed away"));

logger.info("--> running cluster health on an index that does not exists");
ClusterHealthResponse healthResponse = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth("test1")
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.setTimeout("1s")
.execute()
.actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(true));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(healthResponse.getIndices().isEmpty(), equalTo(true));
// After the fix, querying non-existent index should return 404 instead of
// timeout
IndexNotFoundException exception = expectThrows(
IndexNotFoundException.class,
() -> client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth("test1")
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.setTimeout("1s")
.execute()
.actionGet()
);
assertThat(exception.getMessage(), containsString("no such index [test1]"));

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Expand All @@ -689,14 +695,16 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
networkDisruption.startDisrupting();

assertBusy(() -> {
// Check cluster health for weighed in node when cluster manager is not discovered, health check should
// Check cluster health for weighed in node when cluster manager is not
// discovered, health check should
// return a response with 503 status code
assertThrows(
ClusterManagerNotDiscoveredException.class,
() -> client(nodes_in_zone_a.get(0)).admin().cluster().prepareHealth().setLocal(true).setEnsureNodeWeighedIn(true).get()
);

// Check cluster health for weighed away node when cluster manager is not discovered, health check should
// Check cluster health for weighed away node when cluster manager is not
// discovered, health check should
// return a response with 503 status code
assertThrows(
ClusterManagerNotDiscoveredException.class,
Expand Down Expand Up @@ -779,7 +787,8 @@ public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
// check weighted routing metadata after node restart, ensure node comes healthy
// after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
Expand Down Expand Up @@ -878,8 +887,10 @@ public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception

/**
* https://github.com/opensearch-project/OpenSearch/issues/18817
* For regression in custom string query preference with awareness attributes enabled.
* We expect preference will consistently route to the same shard replica. However, when awareness attributes
* For regression in custom string query preference with awareness attributes
* enabled.
* We expect preference will consistently route to the same shard replica.
* However, when awareness attributes
* are configured this does not hold.
*/
public void testCustomPreferenceShardIdCombination() {
Expand Down Expand Up @@ -915,10 +926,12 @@ public void testCustomPreferenceShardIdCombination() {
refreshAndWaitForReplication("test_index");

/*
Execute the same match all query with custom string preference.
For each search and each shard in the response we record the node on which the shard was located.
Given the custom string preference, we expect each shard or each search should report the exact same node id.
Otherwise, the custom string pref is not producing consistent shard routing.
* Execute the same match all query with custom string preference.
* For each search and each shard in the response we record the node on which
* the shard was located.
* Given the custom string preference, we expect each shard or each search
* should report the exact same node id.
* Otherwise, the custom string pref is not producing consistent shard routing.
*/
Map<String, Set<String>> shardToNodes = new HashMap<>();
for (int i = 0; i < 20; i++) {
Expand All @@ -935,8 +948,9 @@ public void testCustomPreferenceShardIdCombination() {
}

/*
If more than one node was responsible for serving a request for a given shard,
then there was a regression in the custom preference string.
* If more than one node was responsible for serving a request for a given
* shard,
* then there was a regression in the custom preference string.
*/
logger.info("--> shard to node mappings: {}", shardToNodes);
for (Map.Entry<String, Set<String>> entry : shardToNodes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,21 @@ public void onClusterServiceClose() {

@Override
public void onTimeout(TimeValue timeout) {
listener.onResponse(getResponse(request, observer.setAndGetObservedState(), waitCount, TimeoutState.TIMED_OUT));
ClusterState finalState = observer.setAndGetObservedState();

// Check if timeout was due to missing indices
if (CollectionUtils.isEmpty(request.indices()) == false) {
try {
indexNameExpressionResolver.concreteIndexNames(finalState, IndicesOptions.strictExpand(), request);
} catch (IndexNotFoundException e) {
// Indices still don't exist after timeout - return 404 instead of timed out response
listener.onFailure(e);
return;
}
}

// Normal timeout response for other reasons
listener.onResponse(getResponse(request, finalState, waitCount, TimeoutState.TIMED_OUT));
}
};
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
Expand Down
Loading