Skip to content

Default enablement of Threshold-Based Availability Strategy with Per-Partition Automatic Failover. #45267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
* Fixed diagnostics issue where operations in Gateway mode hitting end-to-end timeout would not capture diagnostics correctly. - [PR 44099](https://github.com/Azure/azure-sdk-for-java/pull/44099)
* Enabled `excludeRegions` to be applied for `QueryPlan` calls. - [PR 45196](https://github.com/Azure/azure-sdk-for-java/pull/45196)
* Fixed the behavior to correctly perform partition-level failover through circuit breaker for operations enabled with threshold-based availability strategy. - [PR 45244](https://github.com/Azure/azure-sdk-for-java/pull/45244)
* Fixed an issue where `QueryPlan` calls are indefinitely retried in the same region when there are connectivity or response delays with / from the Gateway. - [PR 45267](https://github.com/Azure/azure-sdk-for-java/pull/45267)

#### Other Changes
* Added the `vectorIndexShardKeys` to the vectorIndexSpec for QuantizedFlat and DiskANN vector search. - [PR 44007](https://github.com/Azure/azure-sdk-for-java/pull/44007)
* Added user agent suffixing if Per-Partition Automatic Failover or Per-Partition Circuit Breaker are enabled at client scope. - [PR 45197](https://github.com/Azure/azure-sdk-for-java/pull/45197)
* Enable threshold-based availability strategy for reads by default when Per-Partition Automatic Failover is also enabled. - [PR 45267](https://github.com/Azure/azure-sdk-for-java/pull/45267)

### 4.68.0 (2025-03-20)

Expand Down
16 changes: 16 additions & 0 deletions sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,19 @@
| 410 | 1000 | NO | N/A | N/A | N/A | 1 | N/A | |
| 410 | 1002 | NO | N/A | N/A | N/A | 1 | N/A | Only applies to `Query`, `ChangeFeed` |
| 400 | 1001 | NO | N/A | N/A | N/A | 1 | N/A | |

### Per-Partition Automatic Failover (PPAF) defaults

With PPAF enabled, the SDK will also enable threshold-based availability strategy with defaults as below:

#### Threshold-based availability strategy defaults

NOTE: 6s was chosen as in `Direct` Connection Mode, the connect timeout and network request timeout are 5s. This will allow the SDK to do at least 1 in-region retry. In Gateway mode, the Gateway performs the in-region retries on behalf of the SDK within the same time bound.

| Operation Type | Connection Mode | End-to-end timeout | Threshold duration | Threshold step duration |
|----------------|-----------------|--------------------|--------------------|-------------------------|
| Read | Direct | 6s | 1s | 500ms |
| Query | Direct | 6s | 1s | 500ms |
| Read | Gateway | 6s | 1s | 500ms |
| Query | Gateway | 6s | 1s | 500ms |

Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ public class Configs {
private static final String E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF = "COSMOS.E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF";
private static final String E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF_VARIABLE = "COSMOS_E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF";

private static final String DEFAULT_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "true";
private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";
private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF";

private static final String COSMOS_DISABLE_IMDS_ACCESS = "COSMOS.DISABLE_IMDS_ACCESS";
private static final String COSMOS_DISABLE_IMDS_ACCESS_VARIABLE = "COSMOS_DISABLE_IMDS_ACCESS";
private static final boolean COSMOS_DISABLE_IMDS_ACCESS_DEFAULT = false;
Expand Down Expand Up @@ -1167,4 +1171,14 @@ public static String getEmulatorHost() {
emptyToNull(System.getenv().get(EMULATOR_HOST_VARIABLE)),
DEFAULT_EMULATOR_HOST));
}

public static boolean isReadAvailabilityStrategyEnabledWithPpaf() {
String isReadAvailabilityStrategyEnabledWithPpaf = System.getProperty(
IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF,
firstNonNull(
emptyToNull(System.getenv().get(IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE)),
DEFAULT_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF));

return Boolean.parseBoolean(isReadAvailabilityStrategyEnabledWithPpaf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosDiagnosticsContext;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.CosmosOperationPolicy;
Expand Down Expand Up @@ -192,6 +193,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization

private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " +
"ParallelDocumentQueryExecutioncontext, but not used";

private final static ObjectMapper mapper = Utils.getSimpleObjectMapper();
private final CosmosItemSerializer defaultCustomSerializer;
private final static Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
Expand Down Expand Up @@ -266,7 +268,8 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private final boolean sessionCapturingDisabled;
private final boolean isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig;
private List<CosmosOperationPolicy> operationPolicies;
private AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot;
private final AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot;
private final CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads;

public RxDocumentClientImpl(URI serviceEndpoint,
String masterKeyOrResourceToken,
Expand Down Expand Up @@ -600,6 +603,10 @@ private RxDocumentClientImpl(URI serviceEndpoint,
this.queryPlanCache = new ConcurrentHashMap<>();
this.apiType = apiType;
this.clientTelemetryConfig = clientTelemetryConfig;
this.ppafEnforcedE2ELatencyPolicyConfigForReads = evaluatePpafEnforcedE2eLatencyPolicyCfgForReads(
this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
this.connectionPolicy
);
} catch (RuntimeException e) {
logger.error("unexpected failure in initializing client.", e);
close();
Expand Down Expand Up @@ -2463,7 +2470,7 @@ private Mono<ResourceResponse<Document>> createDocumentCore(
crossRegionAvailabilityContextForRxDocumentServiceRequest),
requestRetryPolicy),
scopedDiagnosticsFactory
), requestReference);
), requestReference, endToEndPolicyConfig);
}

private Mono<ResourceResponse<Document>> createDocumentInternal(
Expand Down Expand Up @@ -2573,7 +2580,10 @@ private static <T> Mono<T> getPointOperationResponseMonoWithE2ETimeout(

private <T> Mono<T> handleCircuitBreakingFeedbackForPointOperation(
Mono<T> response,
AtomicReference<RxDocumentServiceRequest> requestReference) {
AtomicReference<RxDocumentServiceRequest> requestReference,
CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) {

applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig);

return response
.doOnSuccess(ignore -> {
Expand Down Expand Up @@ -2766,6 +2776,23 @@ private static CosmosException getNegativeTimeoutException(CosmosDiagnostics cos
return exception;
}

private static void applyEndToEndLatencyPolicyCfgToRequestContext(RxDocumentServiceRequest rxDocumentServiceRequest, CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) {

if (rxDocumentServiceRequest == null) {
return;
}

if (rxDocumentServiceRequest.requestContext == null) {
return;
}

if (effectiveEndToEndPolicyConfig == null) {
return;
}

rxDocumentServiceRequest.requestContext.setEndToEndOperationLatencyPolicyConfig(effectiveEndToEndPolicyConfig);
}

@Override
public Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object document,
RequestOptions options, boolean disableAutomaticIdGeneration) {
Expand Down Expand Up @@ -2828,7 +2855,7 @@ private Mono<ResourceResponse<Document>> upsertDocumentCore(
requestReference,
crossRegionAvailabilityContextForRequest),
finalRetryPolicyInstance),
scopedDiagnosticsFactory), requestReference);
scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig);
}

private Mono<ResourceResponse<Document>> upsertDocumentInternal(
Expand Down Expand Up @@ -2971,7 +2998,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
requestReference,
crossRegionAvailabilityContextForRequest),
requestRetryPolicy),
scopedDiagnosticsFactory), requestReference);
scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig);
}

private Mono<ResourceResponse<Document>> replaceDocumentInternal(
Expand Down Expand Up @@ -3053,7 +3080,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
clientContextOverride,
requestReference,
crossRegionAvailabilityContextForRequest),
requestRetryPolicy), requestReference);
requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig);
}

private Mono<ResourceResponse<Document>> replaceDocumentInternal(
Expand Down Expand Up @@ -3229,7 +3256,17 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation
return null;
}

return this.cosmosEndToEndOperationLatencyPolicyConfig;
if (this.cosmosEndToEndOperationLatencyPolicyConfig != null) {
return this.cosmosEndToEndOperationLatencyPolicyConfig;
}

// If request options level and client-level e2e latency policy config,
// rely on PPAF enforced defaults
if (operationType.isReadOnlyOperation()) {
return this.ppafEnforcedE2ELatencyPolicyConfigForReads;
}

return null;
}

@Override
Expand Down Expand Up @@ -3292,7 +3329,7 @@ private Mono<ResourceResponse<Document>> patchDocumentCore(
requestReference,
crossRegionAvailabilityContextForRequest),
documentClientRetryPolicy),
scopedDiagnosticsFactory), requestReference);
scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig);
}

private Mono<ResourceResponse<Document>> patchDocumentInternal(
Expand Down Expand Up @@ -3501,7 +3538,7 @@ private Mono<ResourceResponse<Document>> deleteDocumentCore(
requestReference,
crossRegionAvailabilityContextForRequest),
requestRetryPolicy),
scopedDiagnosticsFactory), requestReference);
scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig);
}

private Mono<ResourceResponse<Document>> deleteDocumentInternal(
Expand Down Expand Up @@ -3689,7 +3726,7 @@ private Mono<ResourceResponse<Document>> readDocumentCore(
crossRegionAvailabilityContextForRequest),
retryPolicyInstance),
scopedDiagnosticsFactory
), requestReference);
), requestReference, endToEndPolicyConfig);
}

private Mono<ResourceResponse<Document>> readDocumentInternal(
Expand Down Expand Up @@ -4880,7 +4917,7 @@ public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink,
requestReference), documentClientRetryPolicy),
scopedDiagnosticsFactory
),
requestReference);
requestReference, endToEndPolicyConfig);
}

private Mono<StoredProcedureResponse> executeStoredProcedureInternal(String storedProcedureLink,
Expand Down Expand Up @@ -7142,6 +7179,49 @@ private static boolean isNonTransientResultForHedging(int statusCode, int subSta
return false;
}

private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads(
GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover,
ConnectionPolicy connectionPolicy) {

if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) {
return null;
}

if (Configs.isReadAvailabilityStrategyEnabledWithPpaf()) {

if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) {
Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout();

checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!");

Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND);
Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND);
Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND);

return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout)
.availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep))
.build();
} else {

Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout();

checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!");

// 6s was chosen to accommodate for control-plane hot path read timeout retries (like QueryPlan / PartitionKeyRange)
Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout);

Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND);
Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND);

return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout)
.availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep))
.build();
}
}

return null;
}

private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) {
if (clientContextOverride != null) {
return clientContextOverride;
Expand Down Expand Up @@ -7258,6 +7338,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType);

req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig);

List<String> initialExcludedRegions = req.requestContext.getExcludeRegions();
List<String> orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation(
endToEndPolicyConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public class Utils {
public static final Base64.Decoder Base64Decoder = Base64.getDecoder();
public static final Base64.Encoder Base64UrlEncoder = Base64.getUrlEncoder();

public static final Duration ONE_SECOND = Duration.ofSeconds(1);
public static final Duration HALF_SECOND = Duration.ofMillis(500);
public static final Duration SIX_SECONDS = Duration.ofSeconds(6);

private static final ObjectMapper simpleObjectMapperAllowingDuplicatedProperties =
createAndInitializeObjectMapper(true);
private static final ObjectMapper simpleObjectMapperDisallowingDuplicatedProperties =
Expand Down Expand Up @@ -808,4 +812,14 @@ public static boolean shouldAllowUnquotedControlChars() {

return Boolean.parseBoolean(shouldAllowUnquotedControlCharsConfig);
}

public static Duration min(Duration duration1, Duration duration2) {
if (duration1 == null) {
return duration2;
} else if (duration2 == null) {
return duration1;
} else {
return duration1.compareTo(duration2) < 0 ? duration1 : duration2;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ static Mono<PartitionedQueryExecutionInfo> getQueryPlanThroughGatewayAsync(Diagn
BiFunction<Supplier<DocumentClientRetryPolicy>, RxDocumentServiceRequest, Mono<PartitionedQueryExecutionInfo>> executeFunc =
(retryPolicyFactory, req) -> {
DocumentClientRetryPolicy retryPolicyInstance = retryPolicyFactory.get();
retryPolicyInstance.onBeforeSendRequest(req);

return BackoffRetryUtility.executeRetry(() ->
queryClient.executeQueryAsync(req).flatMap(rxDocumentServiceResponse -> {
return BackoffRetryUtility.executeRetry(() -> {
retryPolicyInstance.onBeforeSendRequest(req);
return queryClient.executeQueryAsync(req).flatMap(rxDocumentServiceResponse -> {
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo =
new PartitionedQueryExecutionInfo(
(ObjectNode)rxDocumentServiceResponse.getResponseBody(),
(ObjectNode) rxDocumentServiceResponse.getResponseBody(),
rxDocumentServiceResponse.getGatewayHttpRequestTimeline());
return Mono.just(partitionedQueryExecutionInfo);

}), retryPolicyInstance);
});
}, retryPolicyInstance);
};

return queryClient.executeFeedOperationWithAvailabilityStrategy(
Expand Down
Loading