diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java index b39af38a0e61..b73bf1d39759 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/EndToEndTimeOutWithAvailabilityTest.java @@ -10,10 +10,13 @@ import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.throughputControl.TestItem; +import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosPatchItemRequestOptions; import com.azure.cosmos.models.CosmosPatchOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.rx.TestSuiteBase; @@ -28,10 +31,6 @@ import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.azure.cosmos.test.faultinjection.IFaultInjectionResult; import com.azure.cosmos.test.implementation.faultinjection.FaultInjectorProvider; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.lang3.StringUtils; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -40,16 +39,16 @@ import org.testng.annotations.Test; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Random; -import java.util.UUID; -import static com.azure.cosmos.CosmosDiagnostics.OBJECT_MAPPER; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; public class EndToEndTimeOutWithAvailabilityTest extends TestSuiteBase { private static final int DEFAULT_NUM_DOCUMENTS = 100; @@ -69,7 +68,7 @@ public EndToEndTimeOutWithAvailabilityTest(CosmosClientBuilder clientBuilder) { random = new Random(); } - @BeforeClass(groups = {"multi-master"}, timeOut = SETUP_TIMEOUT * 100) + @BeforeClass(groups = {"multi-master", "multi-region"}, timeOut = SETUP_TIMEOUT * 100) public void beforeClass() throws Exception { System.setProperty("COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS", "1000"); System.setProperty("COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS", "500"); @@ -94,78 +93,184 @@ public void beforeClass() throws Exception { } @Test(groups = {"multi-master"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT*100) - public void testThresholdAvailabilityStrategy(OperationType operationType, FaultInjectionOperationType faultInjectionOperationType) throws InterruptedException { + public void testThresholdAvailabilityStrategy( + OperationType operationType, + FaultInjectionOperationType faultInjectionOperationType, + boolean ignore) throws InterruptedException { + if (this.preferredRegionList.size() <= 1) { throw new SkipException("excludeRegionTest_SkipFirstPreferredRegion can only be tested for multi-master with multi-regions"); } - if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) { - throw new SkipException("Failure injection only supported for DIRECT mode"); - } + ConnectionMode connectionMode = getClientBuilder().buildConnectionPolicy().getConnectionMode(); + FaultInjectionConnectionType faultInjectionConnectionType = connectionMode == ConnectionMode.DIRECT ? + FaultInjectionConnectionType.DIRECT : + FaultInjectionConnectionType.GATEWAY; TestItem createdItem = TestItem.createNewItem(); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); this.cosmosAsyncContainer.createItem(createdItem).block(); - // This is to wait for the item to be replicated to the secondary region - Thread.sleep(2000); - FaultInjectionRule rule = injectFailure(cosmosAsyncContainer, faultInjectionOperationType); - CosmosDiagnostics cosmosDiagnostics = performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, options); - assertThat(cosmosDiagnostics).isNotNull(); - CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); - assertThat(diagnosticsContext).isNotNull(); - assertThat(diagnosticsContext.getContactedRegionNames().size()).isGreaterThan(1); - ObjectNode diagnosticsNode; - try { - if (operationType == OperationType.Query) { - assertThat(cosmosDiagnostics.getClientSideRequestStatistics().iterator().next().getResponseStatisticsList().iterator().next().getRegionName()) - .isEqualTo(regions.get(1).toLowerCase(Locale.ROOT)); - } else { - diagnosticsNode = (ObjectNode) OBJECT_MAPPER.readTree(cosmosDiagnostics.toString()); - assertResponseFromSpeculatedRegion(diagnosticsNode); + // To run the test against the read all and read many variations of query + int maxIterations = (operationType == OperationType.Query) ? 3 : 1; + QueryFlavor[] queryFlavors = QueryFlavor.values(); + + for (int i = 0; i < maxIterations; i++) { + + FaultInjectionRule rule = injectFailure(this.cosmosAsyncContainer, faultInjectionConnectionType); + QueryFlavor queryFlavor = (operationType == OperationType.Query) ? queryFlavors[i] : null; + + try { + // This is to wait for the item to be replicated to the secondary region + Thread.sleep(2000); + CosmosDiagnostics cosmosDiagnostics = performDocumentOperation(this.cosmosAsyncContainer, operationType, createdItem, options, false, queryFlavor); + assertThat(cosmosDiagnostics).isNotNull(); + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getContactedRegionNames().size()).isBetween(1, 2); + assertThat(diagnosticsContext.getStatusCode()).isBetween(200, 204); + assertThat(diagnosticsContext.getDuration()).isLessThanOrEqualTo(Duration.ofSeconds(3)); + + // asserts response is obtained from the second preferred region + assertThat(diagnosticsContext.getContactedRegionNames()).contains(regions.get(1).toLowerCase(Locale.ROOT)); + + } catch (RuntimeException e) { + fail("Operation should have succeeded from the second preferred region.", e); + } finally { + rule.disable(); + } + } + } + + @Test(groups = {"multi-region"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT*100) + public void testThresholdAvailabilityStrategyForReadsDefaultEnablementWithPpaf( + OperationType operationType, + FaultInjectionOperationType faultInjectionOperationType, + boolean shouldPpafEnforcedReadAvailabilityStrategyBeEnforced) { + + if (this.preferredRegionList.size() <= 1) { + throw new SkipException("excludeRegionTest_SkipFirstPreferredRegion can only be tested for multi-master with multi-regions"); + } + + if (faultInjectionOperationType != FaultInjectionOperationType.READ_ITEM && faultInjectionOperationType != FaultInjectionOperationType.QUERY_ITEM) { + throw new SkipException("testThresholdAvailabilityStrategyForReadsDefaultEnablementWithPpaf only supported for READ and QUERY operations"); + } + + ConnectionMode connectionMode = getClientBuilder().buildConnectionPolicy().getConnectionMode(); + FaultInjectionConnectionType faultInjectionConnectionType = connectionMode == ConnectionMode.DIRECT ? + FaultInjectionConnectionType.DIRECT : + FaultInjectionConnectionType.GATEWAY; + + CosmosAsyncClient cosmosAsyncClient; + FaultInjectionRule rule = null; + + // To run the test against the read all and read many variations of query + int maxIterations = (operationType == OperationType.Query) ? 3 : 1; + QueryFlavor[] queryFlavors = QueryFlavor.values(); + + for (int i = 0; i < maxIterations; i++) { + + System.setProperty("COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED", "true"); + + // test opt out behavior - opt in behavior is the default + if (!shouldPpafEnforcedReadAvailabilityStrategyBeEnforced) { + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + } + + cosmosAsyncClient = getClientBuilder().preferredRegions(this.preferredRegionList).buildAsyncClient(); + + QueryFlavor queryFlavor = (operationType == OperationType.Query) ? queryFlavors[i] : null; + + try { + CosmosAsyncDatabase asyncDatabase = cosmosAsyncClient.getDatabase(this.cosmosAsyncContainer.getDatabase().getId()); + CosmosAsyncContainer asyncContainer = asyncDatabase.getContainer(this.cosmosAsyncContainer.getId()); + + TestItem createdItem = TestItem.createNewItem(); + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + asyncContainer.createItem(createdItem).block(); + + // This is to wait for the item to be replicated to the secondary region + Thread.sleep(2000); + rule = injectFailure(asyncContainer, faultInjectionConnectionType); + + Instant start = Instant.now(); + + CosmosDiagnostics cosmosDiagnostics + = performDocumentOperation(asyncContainer, operationType, createdItem, options, true, queryFlavor); + + Instant end = Instant.now(); + + assertThat(cosmosDiagnostics).isNotNull(); + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getStatusCode()).isBetween(200, 204); + + if (!shouldPpafEnforcedReadAvailabilityStrategyBeEnforced) { + + if (diagnosticsContext.isPointOperation()) { + assertThat(diagnosticsContext.getDuration()).isGreaterThanOrEqualTo(Duration.ofSeconds(30)); + } else { + assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(30)); + } + + } else { + // Default enablement of PPAF-enforced read availability strategy should + // return a success ideally in 2s-3s + // keeping loose enough bounds to ensure test is not flaky + if (diagnosticsContext.isPointOperation()) { + assertThat(diagnosticsContext.getDuration()).isLessThanOrEqualTo(Duration.ofSeconds(10)); + } else { + assertThat(Duration.between(start, end)).isLessThanOrEqualTo(Duration.ofSeconds(15)); + } + } + + // asserts response is obtained from the second preferred region + assertThat(diagnosticsContext.getContactedRegionNames()).contains(regions.get(1).toLowerCase(Locale.ROOT)); + } catch (Exception e) { + fail("Operation should have succeeded from the second preferred region.", e); + } finally { + + System.clearProperty("COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED"); + System.clearProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); + + if (rule != null) { + rule.disable(); + } + + safeClose(cosmosAsyncClient); } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } finally { - rule.disable(); } } @DataProvider(name = "faultInjectionArgProvider") public static Object[][] faultInjectionArgProvider() { return new Object[][] { - {OperationType.Read, FaultInjectionOperationType.READ_ITEM}, - {OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM}, - {OperationType.Create, FaultInjectionOperationType.CREATE_ITEM}, - {OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM}, - {OperationType.Query, FaultInjectionOperationType.QUERY_ITEM}, - {OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM} + // Operation type, Fault Injection Operation Type, Is PPAF-enforced read availability strategy enabled + {OperationType.Read, FaultInjectionOperationType.READ_ITEM, true}, + {OperationType.Read, FaultInjectionOperationType.READ_ITEM, false}, + {OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false}, + {OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true}, + {OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false}, + {OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true}, + {OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false}, + {OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true} }; } - private void assertResponseFromSpeculatedRegion(ObjectNode diagnosticsNode) { - JsonNode responseStatisticsList = diagnosticsNode.get("responseStatisticsList"); - assertThat(responseStatisticsList.isArray()).isTrue(); - assertThat(responseStatisticsList.size()).isGreaterThan(0); - JsonNode storeResult = responseStatisticsList.get(0).get("storeResult"); - assertThat(storeResult.get("storePhysicalAddress").toString()).contains(StringUtils.deleteWhitespace(regions.get(1).toLowerCase(Locale.ROOT))); - } - private FaultInjectionRule injectFailure( CosmosAsyncContainer container, - FaultInjectionOperationType operationType) { + FaultInjectionConnectionType faultInjectionConnectionType) { FaultInjectionServerErrorResultBuilder faultInjectionResultBuilder = FaultInjectionResultBuilders .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) - .delay(Duration.ofMillis(10000)) - .times(1); + .delay(Duration.ofSeconds(10000)) + .times(10000); IFaultInjectionResult result = faultInjectionResultBuilder.build(); logger.info("Injecting fault: {}", this.preferredRegionList.get(0)); FaultInjectionCondition condition = new FaultInjectionConditionBuilder() - .operationType(operationType) .region(this.preferredRegionList.get(0)) - .connectionType(FaultInjectionConnectionType.DIRECT) + .connectionType(faultInjectionConnectionType) .build(); FaultInjectionRule rule = new FaultInjectionRuleBuilder("InjectedResponseDelay") @@ -189,48 +294,78 @@ private EndToEndTimeOutValidationTests.TestObject getDocumentDefinition(String d return doc; } - private List insertDocuments(int documentCount, List partitionKeys, CosmosAsyncContainer container) { - List documentsToInsert = new ArrayList<>(); - - for (int i = 0; i < documentCount; i++) { - documentsToInsert.add( - getDocumentDefinition( - UUID.randomUUID().toString(), - partitionKeys == null ? UUID.randomUUID().toString() : partitionKeys.get(random.nextInt(partitionKeys.size())))); - } - - List documentInserted = bulkInsertBlocking(container, documentsToInsert); - - waitIfNeededForReplicasToCatchUp(this.getClientBuilder()); - - return documentInserted; - } - - @AfterClass(groups = {"multi-master"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = {"multi-master", "multi-region"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(this.clientWithPreferredRegions); System.clearProperty("COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS"); System.clearProperty("COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS"); + System.clearProperty("COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED"); + System.clearProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); } private CosmosDiagnostics performDocumentOperation( CosmosAsyncContainer cosmosAsyncContainer, OperationType operationType, TestItem createdItem, - CosmosItemRequestOptions cosmosItemRequestOptions) { - CosmosEndToEndOperationLatencyPolicyConfig config = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) - .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) - .build(); - cosmosItemRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(config); - cosmosItemRequestOptions.setNonIdempotentWriteRetryPolicy(true, true); + CosmosItemRequestOptions cosmosItemRequestOptions, + boolean ignoreE2E2LatencyCfgOnRequestOptions, + QueryFlavor queryFlavor) { + + CosmosEndToEndOperationLatencyPolicyConfig config = null; + + if (!ignoreE2E2LatencyCfgOnRequestOptions) { + config = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(4)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(Duration.ofMillis(100), Duration.ofMillis(200))) + .build(); + cosmosItemRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(config); + cosmosItemRequestOptions.setNonIdempotentWriteRetryPolicy(true, true); + } if (operationType == OperationType.Query) { + CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions(); queryRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(config); + + if (queryFlavor == QueryFlavor.ReadAll) { + + logger.info("Running readAllItems..."); + + FeedResponse response = cosmosAsyncContainer + .readAllItems(queryRequestOptions, TestItem.class) + .byPage() + .blockFirst(); + + assertThat(response).isNotNull(); + + return response.getCosmosDiagnostics(); + } + + if (queryFlavor == QueryFlavor.ReadMany) { + + CosmosReadManyRequestOptions readManyRequestOptions = new CosmosReadManyRequestOptions(); + readManyRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(config); + + logger.info("Running readMany..."); + + FeedResponse response = cosmosAsyncContainer + .readMany( + Arrays.asList(new CosmosItemIdentity(new PartitionKey(createdItem.getMypk()), createdItem.getId())), + readManyRequestOptions, + TestItem.class) + .block(); + + assertThat(response).isNotNull(); + return response.getCosmosDiagnostics(); + } + + logger.info("Running query ..."); + String query = String.format("SELECT * from c where c.id = '%s'", createdItem.getId()); FeedResponse itemFeedResponse = cosmosAsyncContainer.queryItems(query, queryRequestOptions, TestItem.class).byPage().blockFirst(); + assertThat(itemFeedResponse).isNotNull(); + return itemFeedResponse.getCosmosDiagnostics(); } @@ -243,33 +378,61 @@ private CosmosDiagnostics performDocumentOperation( if (operationType == OperationType.Read) { - return cosmosAsyncContainer.readItem( - createdItem.getId(), - new PartitionKey(createdItem.getMypk()), - cosmosItemRequestOptions, - TestItem.class).block().getDiagnostics(); + CosmosItemResponse response = cosmosAsyncContainer.readItem( + createdItem.getId(), + new PartitionKey(createdItem.getMypk()), + cosmosItemRequestOptions, + TestItem.class) + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } if (operationType == OperationType.Replace) { - return cosmosAsyncContainer.replaceItem( - createdItem, - createdItem.getId(), - new PartitionKey(createdItem.getMypk()), - cosmosItemRequestOptions).block().getDiagnostics(); + CosmosItemResponse response = cosmosAsyncContainer.replaceItem( + createdItem, + createdItem.getId(), + new PartitionKey(createdItem.getMypk()), + cosmosItemRequestOptions) + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } if (operationType == OperationType.Delete) { TestItem toBeDeletedItem = TestItem.createNewItem(); - cosmosAsyncContainer.createItem(toBeDeletedItem).block(); - return cosmosAsyncContainer.deleteItem(toBeDeletedItem, cosmosItemRequestOptions).block().getDiagnostics(); + cosmosAsyncContainer.createItem(toBeDeletedItem, cosmosItemRequestOptions).block(); + CosmosItemResponse response = cosmosAsyncContainer + .deleteItem(toBeDeletedItem, cosmosItemRequestOptions) + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } if (operationType == OperationType.Create) { - return cosmosAsyncContainer.createItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics(); + CosmosItemResponse response = cosmosAsyncContainer + .createItem(TestItem.createNewItem(), cosmosItemRequestOptions) + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } if (operationType == OperationType.Upsert) { - return cosmosAsyncContainer.upsertItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics(); + CosmosItemResponse response = cosmosAsyncContainer + .upsertItem(TestItem.createNewItem(), cosmosItemRequestOptions) + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } if (operationType == OperationType.Patch) { @@ -281,9 +444,13 @@ private CosmosDiagnostics performDocumentOperation( CosmosPatchItemRequestOptions patchItemRequestOptions = new CosmosPatchItemRequestOptions(); patchItemRequestOptions.setNonIdempotentWriteRetryPolicy(true, true); patchItemRequestOptions.setCosmosEndToEndOperationLatencyPolicyConfig(config); - return cosmosAsyncContainer + CosmosItemResponse response = cosmosAsyncContainer .patchItem(createdItem.getId(), new PartitionKey(createdItem.getMypk()), patchOperations, patchItemRequestOptions, TestItem.class) - .block().getDiagnostics(); + .block(); + + assertThat(response).isNotNull(); + + return response.getDiagnostics(); } } @@ -309,4 +476,8 @@ private List getPreferredRegionList(CosmosAsyncClient client) { return preferredRegionList; } + + private enum QueryFlavor { + Query, ReadAll, ReadMany + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 13b624b8f431..a35849946f94 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -13,10 +13,12 @@ * Fixed diagnostics issue where operations in Gateway mode hitting end-to-end timeout would not capture diagnostics correctly. - [PR 44099](https://github.com/Azure/azure-sdk-for-java/pull/44099) * Enabled `excludeRegions` to be applied for `QueryPlan` calls. - [PR 45196](https://github.com/Azure/azure-sdk-for-java/pull/45196) * Fixed the behavior to correctly perform partition-level failover through circuit breaker for operations enabled with threshold-based availability strategy. - [PR 45244](https://github.com/Azure/azure-sdk-for-java/pull/45244) +* Fixed an issue where `QueryPlan` calls are indefinitely retried in the same region when there are connectivity or response delays with / from the Gateway. - [PR 45267](https://github.com/Azure/azure-sdk-for-java/pull/45267) #### Other Changes * Added the `vectorIndexShardKeys` to the vectorIndexSpec for QuantizedFlat and DiskANN vector search. - [PR 44007](https://github.com/Azure/azure-sdk-for-java/pull/44007) * Added user agent suffixing if Per-Partition Automatic Failover or Per-Partition Circuit Breaker are enabled at client scope. - [PR 45197](https://github.com/Azure/azure-sdk-for-java/pull/45197) +* Enable threshold-based availability strategy for reads by default when Per-Partition Automatic Failover is also enabled. - [PR 45267](https://github.com/Azure/azure-sdk-for-java/pull/45267) ### 4.68.0 (2025-03-20) diff --git a/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md b/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md index 1e9617daef54..28a4fb74979a 100644 --- a/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md +++ b/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md @@ -33,3 +33,17 @@ | 410 | 1000 | NO | N/A | N/A | N/A | 1 | N/A | | | 410 | 1002 | NO | N/A | N/A | N/A | 1 | N/A | Only applies to `Query`, `ChangeFeed` | | 400 | 1001 | NO | N/A | N/A | N/A | 1 | N/A | | + +### Per-Partition Automatic Failover (PPAF) defaults + +With PPAF enabled, the SDK will also enable threshold-based availability strategy for item-based non-write operations (readItem, readMany, readAll, queryItems etc.) with defaults as below: + +#### Threshold-based availability strategy defaults + +NOTE: 6s was chosen as in `Direct` Connection Mode, the connect timeout and network request timeout are 5s. This will allow the SDK to do at least 1 in-region retry. In `Gateway` connection mode, the Gateway performs the in-region retries on behalf of the SDK within the same time bound. + +| Connection Mode | End-to-end timeout | Threshold duration | Threshold step duration | +|-----------------|--------------------|--------------------|-------------------------| +| Direct | 6s | 1s | 500ms | +| Gateway | 6s | 1s | 500ms | + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 29135fb58f62..d819e2f5daa3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -304,6 +304,14 @@ public class Configs { private static final String E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF = "COSMOS.E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF"; private static final String E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF_VARIABLE = "COSMOS_E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF"; + private static final String DEFAULT_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "true"; + private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; + private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; + + private static final int DEFAULT_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = 25; + private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = "COSMOS.WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF"; + private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE = "COSMOS_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE"; + private static final String COSMOS_DISABLE_IMDS_ACCESS = "COSMOS.DISABLE_IMDS_ACCESS"; private static final String COSMOS_DISABLE_IMDS_ACCESS_VARIABLE = "COSMOS_DISABLE_IMDS_ACCESS"; private static final boolean COSMOS_DISABLE_IMDS_ACCESS_DEFAULT = false; @@ -1167,4 +1175,24 @@ public static String getEmulatorHost() { emptyToNull(System.getenv().get(EMULATOR_HOST_VARIABLE)), DEFAULT_EMULATOR_HOST)); } + + public static boolean isReadAvailabilityStrategyEnabledWithPpaf() { + String isReadAvailabilityStrategyEnabledWithPpaf = System.getProperty( + IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF, + firstNonNull( + emptyToNull(System.getenv().get(IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE)), + DEFAULT_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF)); + + return Boolean.parseBoolean(isReadAvailabilityStrategyEnabledWithPpaf); + } + + public static int getWarnLevelLoggingThresholdForPpaf() { + String warnLevelLoggingThresholdForPpaf = System.getProperty( + WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF, + firstNonNull( + emptyToNull(System.getenv().get(WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE)), + String.valueOf(DEFAULT_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF))); + + return Integer.parseInt(warnLevelLoggingThresholdForPpaf); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 5e85f5a26866..8119b07735f3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -14,6 +14,7 @@ import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosDiagnosticsContext; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosItemSerializer; import com.azure.cosmos.CosmosOperationPolicy; @@ -192,6 +193,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " + "ParallelDocumentQueryExecutioncontext, but not used"; + private final static ObjectMapper mapper = Utils.getSimpleObjectMapper(); private final CosmosItemSerializer defaultCustomSerializer; private final static Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class); @@ -266,7 +268,8 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private final boolean sessionCapturingDisabled; private final boolean isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig; private List operationPolicies; - private AtomicReference cachedCosmosAsyncClientSnapshot; + private final AtomicReference cachedCosmosAsyncClientSnapshot; + private final CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads; public RxDocumentClientImpl(URI serviceEndpoint, String masterKeyOrResourceToken, @@ -600,6 +603,10 @@ private RxDocumentClientImpl(URI serviceEndpoint, this.queryPlanCache = new ConcurrentHashMap<>(); this.apiType = apiType; this.clientTelemetryConfig = clientTelemetryConfig; + this.ppafEnforcedE2ELatencyPolicyConfigForReads = evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( + this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, + this.connectionPolicy + ); } catch (RuntimeException e) { logger.error("unexpected failure in initializing client.", e); close(); @@ -2466,7 +2473,7 @@ private Mono> createDocumentCore( crossRegionAvailabilityContextForRxDocumentServiceRequest), requestRetryPolicy), scopedDiagnosticsFactory - ), requestReference); + ), requestReference, endToEndPolicyConfig); } private Mono> createDocumentInternal( @@ -2576,7 +2583,10 @@ private static Mono getPointOperationResponseMonoWithE2ETimeout( private Mono handleCircuitBreakingFeedbackForPointOperation( Mono response, - AtomicReference requestReference) { + AtomicReference requestReference, + CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { + + applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig); return response .doOnSuccess(ignore -> { @@ -2769,6 +2779,23 @@ private static CosmosException getNegativeTimeoutException(CosmosDiagnostics cos return exception; } + private static void applyEndToEndLatencyPolicyCfgToRequestContext(RxDocumentServiceRequest rxDocumentServiceRequest, CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { + + if (rxDocumentServiceRequest == null) { + return; + } + + if (rxDocumentServiceRequest.requestContext == null) { + return; + } + + if (effectiveEndToEndPolicyConfig == null) { + return; + } + + rxDocumentServiceRequest.requestContext.setEndToEndOperationLatencyPolicyConfig(effectiveEndToEndPolicyConfig); + } + @Override public Mono> upsertDocument(String collectionLink, Object document, RequestOptions options, boolean disableAutomaticIdGeneration) { @@ -2831,7 +2858,7 @@ private Mono> upsertDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), finalRetryPolicyInstance), - scopedDiagnosticsFactory), requestReference); + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); } private Mono> upsertDocumentInternal( @@ -2974,7 +3001,7 @@ private Mono> replaceDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), requestRetryPolicy), - scopedDiagnosticsFactory), requestReference); + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); } private Mono> replaceDocumentInternal( @@ -3056,7 +3083,7 @@ private Mono> replaceDocumentCore( clientContextOverride, requestReference, crossRegionAvailabilityContextForRequest), - requestRetryPolicy), requestReference); + requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); } private Mono> replaceDocumentInternal( @@ -3232,7 +3259,17 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation return null; } - return this.cosmosEndToEndOperationLatencyPolicyConfig; + if (this.cosmosEndToEndOperationLatencyPolicyConfig != null) { + return this.cosmosEndToEndOperationLatencyPolicyConfig; + } + + // If request options level and client-level e2e latency policy config, + // rely on PPAF enforced defaults + if (operationType.isReadOnlyOperation()) { + return this.ppafEnforcedE2ELatencyPolicyConfigForReads; + } + + return null; } @Override @@ -3295,7 +3332,7 @@ private Mono> patchDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), documentClientRetryPolicy), - scopedDiagnosticsFactory), requestReference); + scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); } private Mono> patchDocumentInternal( @@ -3504,7 +3541,7 @@ private Mono> deleteDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), requestRetryPolicy), - scopedDiagnosticsFactory), requestReference); + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); } private Mono> deleteDocumentInternal( @@ -3692,7 +3729,7 @@ private Mono> readDocumentCore( crossRegionAvailabilityContextForRequest), retryPolicyInstance), scopedDiagnosticsFactory - ), requestReference); + ), requestReference, endToEndPolicyConfig); } private Mono> readDocumentInternal( @@ -4883,7 +4920,7 @@ public Mono executeBatchRequest(String collectionLink, requestReference), documentClientRetryPolicy), scopedDiagnosticsFactory ), - requestReference); + requestReference, endToEndPolicyConfig); } private Mono executeStoredProcedureInternal(String storedProcedureLink, @@ -7145,6 +7182,49 @@ private static boolean isNonTransientResultForHedging(int statusCode, int subSta return false; } + private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( + GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover, + ConnectionPolicy connectionPolicy) { + + if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) { + return null; + } + + if (Configs.isReadAvailabilityStrategyEnabledWithPpaf()) { + + if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) { + Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout(); + + checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); + + Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); + Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); + + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) + .build(); + } else { + + Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); + + checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); + + // 6s was chosen to accommodate for control-plane hot path read timeout retries (like QueryPlan / PartitionKeyRange) + Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout); + + Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND); + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); + + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) + .build(); + } + } + + return null; + } + private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) { if (clientContextOverride != null) { return clientContextOverride; @@ -7261,6 +7341,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( this.getEffectiveEndToEndOperationLatencyPolicyConfig( req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType); + req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig); + List initialExcludedRegions = req.requestContext.getExcludeRegions(); List orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation( endToEndPolicyConfig, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java index a8779abb18e6..8d50984eba88 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java @@ -78,6 +78,10 @@ public class Utils { public static final Base64.Decoder Base64Decoder = Base64.getDecoder(); public static final Base64.Encoder Base64UrlEncoder = Base64.getUrlEncoder(); + public static final Duration ONE_SECOND = Duration.ofSeconds(1); + public static final Duration HALF_SECOND = Duration.ofMillis(500); + public static final Duration SIX_SECONDS = Duration.ofSeconds(6); + private static final ObjectMapper simpleObjectMapperAllowingDuplicatedProperties = createAndInitializeObjectMapper(true); private static final ObjectMapper simpleObjectMapperDisallowingDuplicatedProperties = @@ -808,4 +812,14 @@ public static boolean shouldAllowUnquotedControlChars() { return Boolean.parseBoolean(shouldAllowUnquotedControlCharsConfig); } + + public static Duration min(Duration duration1, Duration duration2) { + if (duration1 == null) { + return duration2; + } else if (duration2 == null) { + return duration1; + } else { + return duration1.compareTo(duration2) < 0 ? duration1 : duration2; + } + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index 09ceeb4cfca0..480ef913bd28 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -30,6 +31,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover { private final ConcurrentHashMap partitionKeyRangeToEndToEndTimeoutErrorTracker; private final GlobalEndpointManager globalEndpointManager; private final boolean isPerPartitionAutomaticFailoverEnabled; + private final AtomicInteger warnLevelLoggedCounts = new AtomicInteger(0); public GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover( GlobalEndpointManager globalEndpointManager, @@ -47,8 +49,17 @@ public boolean resetEndToEndTimeoutErrorCountIfPossible(RxDocumentServiceRequest return false; } - checkNotNull(request, "Argument 'request' cannot be null!"); - checkNotNull(request.requestContext, "Argument 'request.requestContext' cannot be null!"); + if (request == null) { + logAsWarnOrDebug("Argument 'request' is null, " + + "hence resetEndToEndTimeoutErrorCountIfPossible cannot be performed", this.warnLevelLoggedCounts); + return false; + } + + if (request.requestContext == null) { + logAsWarnOrDebug("Argument 'request.requestContext' is null, " + + "hence resetEndToEndTimeoutErrorCountIfPossible cannot be performed", this.warnLevelLoggedCounts); + return false; + } if (!isPerPartitionAutomaticFailoverApplicable(request)) { return false; @@ -58,10 +69,14 @@ public boolean resetEndToEndTimeoutErrorCountIfPossible(RxDocumentServiceRequest String resolvedCollectionRid = request.requestContext.resolvedCollectionRid; if (partitionKeyRange == null) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedPartitionKeyRangeForPerPartitionAutomaticFailover' " + + "is null, hence resetEndToEndTimeoutErrorCountIfPossible cannot be performed", this.warnLevelLoggedCounts); return false; } if (StringUtils.isEmpty(resolvedCollectionRid)) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedCollectionRid' is null, " + + "hence resetEndToEndTimeoutErrorCountIfPossible cannot be performed", this.warnLevelLoggedCounts); return false; } @@ -82,9 +97,17 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req return false; } - checkNotNull(request, "Argument 'request' cannot be null!"); - checkNotNull(request.requestContext, "Argument 'request.requestContext' cannot be null!"); + if (request == null) { + logAsWarnOrDebug("Argument 'request' is null, " + + "hence tryAddPartitionLevelLocationOverride cannot be performed", this.warnLevelLoggedCounts); + return false; + } + if (request.requestContext == null) { + logAsWarnOrDebug("Argument 'request.requestContext' is null, " + + "hence tryAddPartitionLevelLocationOverride cannot be performed", this.warnLevelLoggedCounts); + return false; + } if (request.getResourceType() != ResourceType.Document) { return false; @@ -98,10 +121,14 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req String resolvedCollectionRid = request.requestContext.resolvedCollectionRid; if (partitionKeyRange == null) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedPartitionKeyRangeForPerPartitionAutomaticFailover' " + + "is null, hence tryAddPartitionLevelLocationOverride cannot be performed", this.warnLevelLoggedCounts); return false; } if (StringUtils.isEmpty(resolvedCollectionRid)) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedCollectionRid' is null, " + + "hence tryAddPartitionLevelLocationOverride cannot be performed", this.warnLevelLoggedCounts); return false; } @@ -109,7 +136,11 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = request.requestContext.getCrossRegionAvailabilityContext(); - checkNotNull(crossRegionAvailabilityContextForRequest, "Argument 'crossRegionAvailabilityContextForRequest' cannot be null!"); + if (crossRegionAvailabilityContextForRequest == null) { + logAsWarnOrDebug("Argument 'request.requestContext.getCrossRegionAvailabilityContext()' is null, " + + "hence tryAddPartitionLevelLocationOverride cannot be performed", this.warnLevelLoggedCounts); + return false; + } if (!crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable()) { return false; @@ -150,9 +181,6 @@ public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServic return false; } - checkNotNull(request, "Argument 'request' cannot be null!"); - checkNotNull(request.requestContext, "Argument 'request.requestContext' cannot be null!"); - if (!isPerPartitionAutomaticFailoverApplicable(request)) { return false; } @@ -161,10 +189,14 @@ public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServic String resolvedCollectionRid = request.requestContext.resolvedCollectionRid; if (partitionKeyRange == null) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedPartitionKeyRangeForPerPartitionAutomaticFailover' " + + "is null, hence tryMarkEndpointAsUnavailableForPartitionKeyRange cannot be performed", this.warnLevelLoggedCounts); return false; } if (StringUtils.isEmpty(resolvedCollectionRid)) { + logAsWarnOrDebug("Argument 'request.requestContext.resolvedCollectionRid' is null, " + + "hence tryMarkEndpointAsUnavailableForPartitionKeyRange cannot be performed", this.warnLevelLoggedCounts); return false; } @@ -234,6 +266,18 @@ public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceReques return false; } + if (request == null) { + logAsWarnOrDebug("Argument 'request' is null, " + + "hence isPerPartitionAutomaticFailoverApplicable cannot be performed", this.warnLevelLoggedCounts); + return false; + } + + if (request.requestContext == null) { + logAsWarnOrDebug("Argument 'request.requestContext' is null, " + + "hence isPerPartitionAutomaticFailoverApplicable cannot be performed", this.warnLevelLoggedCounts); + return false; + } + if (request.isReadOnlyRequest()) { return false; } @@ -242,14 +286,21 @@ public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceReques return false; } - checkNotNull(request, "Argument 'request' cannot be null!"); + if (request.getResourceType() == null) { + logAsWarnOrDebug("Argument 'request.getResourceType()' is null, " + + "hence isPerPartitionAutomaticFailoverApplicable cannot be performed", this.warnLevelLoggedCounts); + return false; + } + + if (request.getOperationType() == null) { + logAsWarnOrDebug("Argument 'request.getOperationType()' is null, " + + "hence isPerPartitionAutomaticFailoverApplicable cannot be performed", this.warnLevelLoggedCounts); + return false; + } ResourceType resourceType = request.getResourceType(); OperationType operationType = request.getOperationType(); - checkNotNull(resourceType, "Argument 'resourceType' cannot be null!"); - checkNotNull(operationType, "Argument 'operationType' cannot be null!"); - if (request.getOperationType() == OperationType.QueryPlan) { return false; } @@ -264,4 +315,15 @@ public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceReques return false; } + + private static void logAsWarnOrDebug(String message, AtomicInteger warnLogThreshold) { + // warnLogThreshold is not atomic still but with interleaved + // updates there would be few extra warn logs in the worst case + if (warnLogThreshold.get() < Configs.getWarnLevelLoggingThresholdForPpaf()) { + logger.warn(message); + warnLogThreshold.incrementAndGet(); + } else { + logger.debug(message); + } + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java index 85023117e481..fdec9762dadf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/QueryPlanRetriever.java @@ -119,17 +119,17 @@ static Mono getQueryPlanThroughGatewayAsync(Diagn BiFunction, RxDocumentServiceRequest, Mono> executeFunc = (retryPolicyFactory, req) -> { DocumentClientRetryPolicy retryPolicyInstance = retryPolicyFactory.get(); - retryPolicyInstance.onBeforeSendRequest(req); - return BackoffRetryUtility.executeRetry(() -> - queryClient.executeQueryAsync(req).flatMap(rxDocumentServiceResponse -> { + return BackoffRetryUtility.executeRetry(() -> { + retryPolicyInstance.onBeforeSendRequest(req); + return queryClient.executeQueryAsync(req).flatMap(rxDocumentServiceResponse -> { PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo( - (ObjectNode)rxDocumentServiceResponse.getResponseBody(), + (ObjectNode) rxDocumentServiceResponse.getResponseBody(), rxDocumentServiceResponse.getGatewayHttpRequestTimeline()); return Mono.just(partitionedQueryExecutionInfo); - - }), retryPolicyInstance); + }); + }, retryPolicyInstance); }; return queryClient.executeFeedOperationWithAvailabilityStrategy(