From f2f809ea280ffba217451da894a5899f1cec02ab Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Fri, 2 Dec 2022 17:36:25 -0800 Subject: [PATCH] Fix 1.x compatibility bug with stored Tasks (#5412) (#5440) When the new 'cancelled' field was introduced it was a miss not to increment the version number on the mapping definitions for the .tasks index. This commit fixes that oversight, as well as modifies the existing backward compatiblity test to ensure that it will catch future mistakes like this one. Closes #5376 Signed-off-by: Andrew Ross (cherry picked from commit 4616dfac5382062a5149c799554dda4ff601369f) Signed-off-by: Andrew Ross Signed-off-by: Andrew Ross --- CHANGELOG.md | 1 + .../upgrades/SystemIndicesUpgradeIT.java | 51 ++++++++++++------- .../opensearch/tasks/TaskResultsService.java | 2 +- .../opensearch/tasks/task-index-mapping.json | 2 +- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19b3a5e2cdee1..f090146cc4777 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Deprecated ### Removed ### Fixed +- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412)) ### Security ## [2.4] diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java index c50af0084b000..22b6b4a201877 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/SystemIndicesUpgradeIT.java @@ -34,13 +34,17 @@ import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.hamcrest.MatcherAssert; import org.opensearch.client.Request; +import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.test.XContentTestUtils.JsonMapView; +import java.io.IOException; import java.util.Map; import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ENFORCEMENT_VERSION; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -68,25 +72,7 @@ public void testSystemIndicesUpgrades() throws Exception { } client().performRequest(bulk); - // start a async reindex job - Request reindex = new Request("POST", "/_reindex"); - reindex.setJsonEntity( - "{\n" + - " \"source\":{\n" + - " \"index\":\"test_index_old\"\n" + - " },\n" + - " \"dest\":{\n" + - " \"index\":\"test_index_reindex\"\n" + - " }\n" + - "}"); - reindex.addParameter("wait_for_completion", "false"); - Map response = entityAsMap(client().performRequest(reindex)); - String taskId = (String) response.get("task"); - - // wait for task - Request getTask = new Request("GET", "/_tasks/" + taskId); - getTask.addParameter("wait_for_completion", "true"); - client().performRequest(getTask); + createAndVerifyStoredTask(); // make sure .tasks index exists Request getTasksIndex = new Request("GET", "/.tasks"); @@ -121,6 +107,8 @@ public void testSystemIndicesUpgrades() throws Exception { assertThat(client().performRequest(putAliasRequest).getStatusLine().getStatusCode(), is(200)); } } else if (CLUSTER_TYPE == ClusterType.UPGRADED) { + createAndVerifyStoredTask(); + assertBusy(() -> { Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata"); Map indices = new JsonMapView(entityAsMap(client().performRequest(clusterStateRequest))) @@ -152,4 +140,29 @@ public void testSystemIndicesUpgrades() throws Exception { }); } } + + /** + * Completed tasks get persisted into the .tasks index, so this method waits + * until the task is completed in order to verify that it has been successfully + * written to the index and can be retrieved. + */ + private static void createAndVerifyStoredTask() throws Exception { + // Use update by query to create an async task + final Request updateByQueryRequest = new Request("POST", "/test_index_old/_update_by_query"); + updateByQueryRequest.addParameter("wait_for_completion", "false"); + final Response updateByQueryResponse = client().performRequest(updateByQueryRequest); + MatcherAssert.assertThat(updateByQueryResponse.getStatusLine().getStatusCode(), equalTo(200)); + final String taskId = (String) entityAsMap(updateByQueryResponse).get("task"); + + // wait for task to complete + waitUntil(() -> { + try { + final Response getTaskResponse = client().performRequest(new Request("GET", "/_tasks/" + taskId)); + MatcherAssert.assertThat(getTaskResponse.getStatusLine().getStatusCode(), equalTo(200)); + return (Boolean) entityAsMap(getTaskResponse).get("completed"); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java index 66d3aeb748cf7..accc02624f71c 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResultsService.java @@ -86,7 +86,7 @@ public class TaskResultsService { public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 3; // must match version in task-index-mapping.json + public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json /** * The backoff policy to use when saving a task result fails. The total wait diff --git a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json index 54e9d39902f03..58b6b2d3bc873 100644 --- a/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/opensearch/tasks/task-index-mapping.json @@ -1,7 +1,7 @@ { "_doc" : { "_meta": { - "version": 3 + "version": 4 }, "dynamic" : "strict", "properties" : {