diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java index 15da7043bd87..ab59ad363e12 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java @@ -421,8 +421,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance( false, collectionLink, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(true, true) - ) + new AvailabilityStrategyContext(true, true), + new AtomicBoolean(false)) ); return request; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java index 11d14758f352..17cd37b72acd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java @@ -1056,7 +1056,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance( false, collectionLink, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(false, false))); + new AvailabilityStrategyContext(false, false), + new AtomicBoolean(false))); return request; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java index 3312dcba3eda..339dddf811e6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java @@ -299,8 +299,8 @@ private static RxDocumentServiceRequest createRequest( true, collectionResourceId, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(true, false) - )); + new AvailabilityStrategyContext(true, false), + new AtomicBoolean(false))); } else { request.requestContext.setCrossRegionAvailabilityContext( new CrossRegionAvailabilityContextForRxDocumentServiceRequest( @@ -310,8 +310,8 @@ private static RxDocumentServiceRequest createRequest( false, collectionResourceId, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(false, false) - )); + new AvailabilityStrategyContext(false, false), + new AtomicBoolean(false))); } return request; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index dc2adf1dce7b..5da9e11f76ff 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -179,13 +180,17 @@ private RxDocumentServiceRequest createDocumentServiceRequest() { options); if (request.requestContext != null) { + + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + request.requestContext.setExcludeRegions(options.getExcludedRegions()); request.requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); request.requestContext.setCrossRegionAvailabilityContext( new CrossRegionAvailabilityContextForRxDocumentServiceRequest( new FeedOperationContextForCircuitBreaker(new ConcurrentHashMap<>(), false, collectionLink), null, - new AvailabilityStrategyContext(false, false))); + new AvailabilityStrategyContext(false, false), + shouldAddHubRegionProcessingOnlyHeader)); } return request; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index ca9731f86e50..2a9c3d2f798e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -111,7 +111,7 @@ public Mono shouldRetry(Exception e) { return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); } - return this.shouldRetryOnEndpointFailureAsync(false, true, false); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, true, false); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -268,6 +268,19 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); } + if (!this.globalEndpointManager.canUseMultipleWriteLocations(request)) { + + if (request.requestContext != null) { + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext + = request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContext != null) { + crossRegionAvailabilityContext.setShouldAddHubRegionProcessingOnlyHeader(true); + } + } + } + return ShouldRetryResult.retryAfter(Duration.ZERO); } } @@ -282,11 +295,6 @@ private Mono shouldRetryOnEndpointFailureAsync(boolean isRead Mono refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh, usePreferredLocations); - // if PPAF is enabled, mark pk-range as unavailable and force a retry - if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false)) { - return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); - } - // Some requests may be in progress when the endpoint manager and client are closed. // In that case, the request won't succeed since the http client is closed. // Therefore just skip the retry here to avoid the delay because retrying won't go through in the end. @@ -516,7 +524,22 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { // Resolve the endpoint for the request and pin the resolution to the resolved endpoint // This enables marking the endpoint unavailability on endpoint failover/unreachability - this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request); + + boolean isInHubRegionDiscoveryMode = false; + + if (request.requestContext != null) { + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext + = request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContext != null) { + if (crossRegionAvailabilityContext.shouldAddHubRegionProcessingOnlyHeader()) { + isInHubRegionDiscoveryMode = true; + request.getHeaders().put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, "true"); + } + } + } + + this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode); if (request.requestContext != null) { request.requestContext.routeToLocation(this.regionalRoutingContext); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java index 8053957e3051..712c610bc2e9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java @@ -11,6 +11,8 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest { private final AtomicBoolean hasPerPartitionAutomaticFailoverBeenAppliedForReads = new AtomicBoolean(false); + private final AtomicBoolean shouldAddHubRegionProcessingOnlyHeader; + private final FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker; private final PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker; @@ -20,11 +22,13 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest { public CrossRegionAvailabilityContextForRxDocumentServiceRequest( FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker, PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker, - AvailabilityStrategyContext availabilityStrategyContext) { + AvailabilityStrategyContext availabilityStrategyContext, + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader) { this.feedOperationContextForCircuitBreaker = feedOperationContextForCircuitBreaker; this.pointOperationContextForCircuitBreaker = pointOperationContextForCircuitBreaker; this.availabilityStrategyContext = availabilityStrategyContext; + this.shouldAddHubRegionProcessingOnlyHeader = shouldAddHubRegionProcessingOnlyHeader; } public FeedOperationContextForCircuitBreaker getFeedOperationContextForCircuitBreaker() { @@ -54,4 +58,12 @@ public void setPerPartitionAutomaticFailoverAppliedStatusForReads(boolean perPar public boolean hasPerPartitionAutomaticFailoverBeenAppliedForReads() { return this.hasPerPartitionAutomaticFailoverBeenAppliedForReads.get(); } + + public void setShouldAddHubRegionProcessingOnlyHeader(boolean shouldAddHubRegionProcessingOnlyHeader) { + this.shouldAddHubRegionProcessingOnlyHeader.set(shouldAddHubRegionProcessingOnlyHeader); + } + + public boolean shouldAddHubRegionProcessingOnlyHeader() { + return this.shouldAddHubRegionProcessingOnlyHeader.get(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ec0ceb536615..cdbbaa45bfcc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -154,7 +154,11 @@ public static Mono getDatabaseAccountFromAnyLocationsAsync( } public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request) { - RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request); + return this.resolveServiceEndpoint(request, false); + } + + public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request, boolean isInHubRegionDiscoveryMode) { + RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode); if (request.faultInjectionRequestContext != null) { // TODO: integrate thin client into fault injection request.faultInjectionRequestContext.setRegionalRoutingContextToRoute(serviceEndpoints); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 174b155485f8..b70b2812ee30 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -294,6 +294,9 @@ public static class HttpHeaders { // Throughput bucket header public static final String THROUGHPUT_BUCKET = "x-ms-cosmos-throughput-bucket"; + + // Region affinity headers + public static final String HUB_REGION_PROCESSING_ONLY = "x-ms-cosmos-hub-region-processing-only"; } public static class A_IMHeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 9b035b28dff6..37cb7b83f46f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -2245,6 +2245,8 @@ private Mono getBatchDocumentRequest(DocumentClientRet MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + request.requestContext.setCrossRegionAvailabilityContext( new CrossRegionAvailabilityContextForRxDocumentServiceRequest( @@ -2254,7 +2256,8 @@ private Mono getBatchDocumentRequest(DocumentClientRet false, documentCollectionLink, serializationDiagnosticsContext), - null)); + null, + shouldAddHubRegionProcessingOnlyHeader)); return this.collectionCache.resolveCollectionAsync(metadataDiagnosticsContext, request) .flatMap(documentCollectionValueHolder -> { @@ -2637,7 +2640,7 @@ private Mono> createDocumentCore( crossRegionAvailabilityContextForRxDocumentServiceRequest), finalRetryPolicyInstance), scopedDiagnosticsFactory - ), requestReference, endToEndPolicyConfig); + ), requestReference); } private Mono> createDocumentInternal( @@ -2699,6 +2702,8 @@ private Mono> createDocumentInternal( requestRetryPolicy, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + documentServiceRequestReference.set(request); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -2747,10 +2752,7 @@ private static Mono getPointOperationResponseMonoWithE2ETimeout( private Mono handleCircuitBreakingFeedbackForPointOperation( Mono response, - AtomicReference requestReference, - CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { - - applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig); + AtomicReference requestReference) { return response .doOnSuccess(ignore -> { @@ -3022,7 +3024,7 @@ private Mono> upsertDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), finalRetryPolicyInstance), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> upsertDocumentInternal( @@ -3083,6 +3085,8 @@ private Mono> upsertDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(request); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3163,7 +3167,7 @@ private Mono> replaceDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), finalRequestRetryPolicy), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> replaceDocumentInternal( @@ -3245,7 +3249,7 @@ private Mono> replaceDocumentCore( clientContextOverride, requestReference, crossRegionAvailabilityContextForRequest), - requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); + requestRetryPolicy), requestReference); } private Mono> replaceDocumentInternal( @@ -3382,6 +3386,8 @@ private Mono> replaceDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3498,7 +3504,7 @@ private Mono> patchDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), documentClientRetryPolicy), - scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> patchDocumentInternal( @@ -3614,6 +3620,8 @@ private Mono> patchDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3710,7 +3718,7 @@ private Mono> deleteDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), requestRetryPolicy), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> deleteDocumentInternal( @@ -3786,6 +3794,8 @@ private Mono> deleteDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3902,7 +3912,7 @@ private Mono> readDocumentCore( crossRegionAvailabilityContextForRequest), retryPolicyInstance), scopedDiagnosticsFactory - ), requestReference, endToEndPolicyConfig); + ), requestReference); } private Mono> readDocumentInternal( @@ -3974,6 +3984,8 @@ private Mono> readDocumentInternal( retryPolicyInstance, preResolvedPartionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -5215,7 +5227,7 @@ public Mono executeBatchRequest(String collectionLink, requestReference), documentClientRetryPolicy), scopedDiagnosticsFactory ), - requestReference, endToEndPolicyConfig); + requestReference); } private Mono executeStoredProcedureInternal(String storedProcedureLink, @@ -5271,6 +5283,7 @@ private Mono executeBatchRequestInternal(String collectionL Mono responseObservable = requestObs.flatMap(request -> { requestReference.set(request); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); return create(request, requestRetryPolicy, getOperationContextAndListenerTuple(options)); }); @@ -7230,6 +7243,7 @@ private Mono> wrapPointOperationWithAvailabilityStrat nonNullRequestOptions); AtomicBoolean isOperationSuccessful = new AtomicBoolean(false); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); if (orderedApplicableRegionsForSpeculation.size() < 2) { // There is at most one applicable region - no hedging possible @@ -7247,7 +7261,8 @@ private Mono> wrapPointOperationWithAvailabilityStrat = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForMainRequest, - availabilityStrategyContextForMainRequest); + availabilityStrategyContextForMainRequest, + shouldAddHubRegionProcessingOnlyHeader); return callback.apply(nonNullRequestOptions, endToEndPolicyConfig, innerDiagnosticsFactory, crossRegionAvailabilityContextForMainRequest); } @@ -7283,7 +7298,8 @@ private Mono> wrapPointOperationWithAvailabilityStrat = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForMainRequest, - availabilityStrategyContextForMainRequest); + availabilityStrategyContextForMainRequest, + shouldAddHubRegionProcessingOnlyHeader); Mono initialMonoAcrossAllRegions = callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForMainRequest) @@ -7327,7 +7343,8 @@ private Mono> wrapPointOperationWithAvailabilityStrat CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForHedgedRequest, - availabilityStrategyContextForHedgedRequest); + availabilityStrategyContextForHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader); Mono regionalCrossRegionRetryMono = callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForHedgedRequest) @@ -7505,7 +7522,7 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); - Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); + Duration overallE2eLatencyTimeout = Duration.ofSeconds(500); Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); @@ -7514,7 +7531,7 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc .build(); } else { - Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); + Duration httpNetworkRequestTimeout = Duration.ofSeconds(500); checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); @@ -7649,6 +7666,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( this.getEffectiveEndToEndOperationLatencyPolicyConfig( req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig); List initialExcludedRegions = req.requestContext.getExcludeRegions(); @@ -7676,7 +7695,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow, null, - availabilityStrategyContext); + availabilityStrategyContext, + shouldAddHubRegionProcessingOnlyHeader); req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); @@ -7697,7 +7717,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow, null, - availabilityStrategyContext); + availabilityStrategyContext, + shouldAddHubRegionProcessingOnlyHeader); req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); @@ -7728,7 +7749,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForNonHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForNonHedgedRequest, null, - availabilityStrategyContextForNonHedgedRequest); + availabilityStrategyContextForNonHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader); clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest); @@ -7771,8 +7793,8 @@ private Mono executeFeedOperationWithAvailabilityStrategy( = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForHedgedRequest, null, - availabilityStrategyContextForHedgedRequest - ); + availabilityStrategyContextForHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader); clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForHedgedRequest); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 4c71d49fd5e6..786e08199d19 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1185,6 +1185,10 @@ public void setThroughputBucket(Integer throughputBucket) { } } + public void setHubRegionProcessingOnly(boolean hubRegionProcessingOnly) { + this.headers.put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, Boolean.toString(hubRegionProcessingOnly)); + } + public Duration getResponseTimeout() { return responseTimeout; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 0e9dfa7b86de..38d889c8a2df 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -597,7 +597,8 @@ public enum RntbdRequestHeader implements RntbdHeader { ChangeFeedWireFormatVersion((short) 0x00B2, RntbdTokenType.String, false), PriorityLevel((short) 0x00BF, RntbdTokenType.Byte, false), GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false), - ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false); + ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false), + HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false); public static final List thinClientHeadersInOrderList = Arrays.asList( EffectivePartitionKey, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java index 0c3f9615312f..6f6e46ee695d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java @@ -133,6 +133,7 @@ final class RntbdRequestHeaders extends RntbdTokenStream { this.addPriorityLevel(headers); this.addGlobalDatabaseAccountName(headers); this.addThroughputBucket(headers); + this.addHubRegionProcessingOnly(headers); // Normal headers (Strings, Ints, Longs, etc.) @@ -294,6 +295,8 @@ private RntbdToken getCorrelatedActivityId() { private RntbdToken getThroughputBucket() { return this.get(RntbdRequestHeader.ThroughputBucket); } + private RntbdToken getHubRegionProcessingOnly() { return this.get(RntbdRequestHeader.HubRegionProcessingOnly); } + private RntbdToken getGlobalDatabaseAccountName() { return this.get(RntbdRequestHeader.GlobalDatabaseAccountName); } @@ -795,8 +798,7 @@ private void addPriorityLevel(final Map headers) } } - private void addThroughputBucket(final Map headers) - { + private void addThroughputBucket(final Map headers) { final String value = headers.get(HttpHeaders.THROUGHPUT_BUCKET); if (StringUtils.isNotEmpty(value)) { @@ -805,6 +807,15 @@ private void addThroughputBucket(final Map headers) } } + private void addHubRegionProcessingOnly(final Map headers) { + final String value = headers.get(HttpHeaders.HUB_REGION_PROCESSING_ONLY); + + if (StringUtils.isNotEmpty(value)) { + final boolean hubRegionProcessingOnly = Boolean.parseBoolean(value); + this.getHubRegionProcessingOnly().setValue(hubRegionProcessingOnly); + } + } + private void addGlobalDatabaseAccountName(final Map headers) { final String value = headers.get(HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index d50643f0e5a9..048178fba9ea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -336,6 +336,13 @@ private void clear() { this.warnLevelLoggedCounts.set(0); } + public boolean shouldAddHubRegionProcessingOnlyHeader(RxDocumentServiceRequest request) { + return this.isPerPartitionAutomaticFailoverEnabled() && + request != null && + request.getOperationType().isReadOnlyOperation() && + !request.isMetadataRequest(); + } + private static void logAsWarnOrDebug(String message, AtomicInteger warnLogThreshold) { // warnLogThreshold is not atomic still but with interleaved // updates there would be few extra warn logs in the worst case diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index 6d61cca536d1..7c09bff5adc0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -173,6 +173,10 @@ public void onDatabaseAccountRead(DatabaseAccount databaseAccount) { * @return Resolved getEndpoint */ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request) { + return this.resolveServiceEndpoint(request, false); + } + + public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request, boolean isInHubRegionDiscoveryMode) { Objects.requireNonNull(request.requestContext, "RxDocumentServiceRequest.requestContext is required and cannot be null."); @@ -190,6 +194,13 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; if (this.enableEndpointDiscovery && !currentLocationInfo.availableWriteLocations.isEmpty()) { + + if (isInHubRegionDiscoveryMode && !currentLocationInfo.availableReadLocations.isEmpty()) { + locationIndex = locationIndex % currentLocationInfo.availableReadLocations.size(); + String potentialWriteLocation = currentLocationInfo.availableReadLocations.get(locationIndex); + return currentLocationInfo.availableReadRegionalRoutingContextsByRegionName.get(potentialWriteLocation); + } + locationIndex = Math.min(locationIndex%2, currentLocationInfo.availableWriteLocations.size()-1); String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex); return currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation); @@ -203,6 +214,7 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re } } + public UnmodifiableList getApplicableWriteRegionRoutingContexts(RxDocumentServiceRequest request) { return this.getApplicableWriteRegionRoutingContexts(request, request.requestContext.getExcludeRegions(), request.requestContext.getUnavailableRegionsForPartition()); }