Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance(
false,
collectionLink,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(true, true)
)
new AvailabilityStrategyContext(true, true),
new AtomicBoolean(false))
);

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance(
false,
collectionLink,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(false, false)));
new AvailabilityStrategyContext(false, false),
new AtomicBoolean(false)));

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ private static RxDocumentServiceRequest createRequest(
true,
collectionResourceId,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(true, false)
));
new AvailabilityStrategyContext(true, false),
new AtomicBoolean(false)));
} else {
request.requestContext.setCrossRegionAvailabilityContext(
new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
Expand All @@ -310,8 +310,8 @@ private static RxDocumentServiceRequest createRequest(
false,
collectionResourceId,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(false, false)
));
new AvailabilityStrategyContext(false, false),
new AtomicBoolean(false)));
}

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -179,13 +180,17 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
options);

if (request.requestContext != null) {

AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);

request.requestContext.setExcludeRegions(options.getExcludedRegions());
request.requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers());
request.requestContext.setCrossRegionAvailabilityContext(
new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
new FeedOperationContextForCircuitBreaker(new ConcurrentHashMap<>(), false, collectionLink),
null,
new AvailabilityStrategyContext(false, false)));
new AvailabilityStrategyContext(false, false),
shouldAddHubRegionProcessingOnlyHeader));
}

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}

return this.shouldRetryOnEndpointFailureAsync(false, true, false);
return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, true, false);
}

// Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress)
Expand Down Expand Up @@ -268,6 +268,19 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ
crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true);
}

if (!this.globalEndpointManager.canUseMultipleWriteLocations(request)) {

if (request.requestContext != null) {

CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext
= request.requestContext.getCrossRegionAvailabilityContext();

if (crossRegionAvailabilityContext != null) {
crossRegionAvailabilityContext.setShouldAddHubRegionProcessingOnlyHeader(true);
}
}
}

return ShouldRetryResult.retryAfter(Duration.ZERO);
}
}
Expand All @@ -282,11 +295,6 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead

Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh, usePreferredLocations);

// if PPAF is enabled, mark pk-range as unavailable and force a retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting removed?

if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false)) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}

// Some requests may be in progress when the endpoint manager and client are closed.
// In that case, the request won't succeed since the http client is closed.
// Therefore just skip the retry here to avoid the delay because retrying won't go through in the end.
Expand Down Expand Up @@ -516,7 +524,22 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) {

// Resolve the endpoint for the request and pin the resolution to the resolved endpoint
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request);

boolean isInHubRegionDiscoveryMode = false;

if (request.requestContext != null) {
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext
= request.requestContext.getCrossRegionAvailabilityContext();

if (crossRegionAvailabilityContext != null) {
if (crossRegionAvailabilityContext.shouldAddHubRegionProcessingOnlyHeader()) {
isInHubRegionDiscoveryMode = true;
request.getHeaders().put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, "true");
}
}
}

this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode);

if (request.requestContext != null) {
request.requestContext.routeToLocation(this.regionalRoutingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest {

private final AtomicBoolean hasPerPartitionAutomaticFailoverBeenAppliedForReads = new AtomicBoolean(false);

private final AtomicBoolean shouldAddHubRegionProcessingOnlyHeader;

private final FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker;

private final PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker;
Expand All @@ -20,11 +22,13 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest {
public CrossRegionAvailabilityContextForRxDocumentServiceRequest(
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker,
PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker,
AvailabilityStrategyContext availabilityStrategyContext) {
AvailabilityStrategyContext availabilityStrategyContext,
AtomicBoolean shouldAddHubRegionProcessingOnlyHeader) {

this.feedOperationContextForCircuitBreaker = feedOperationContextForCircuitBreaker;
this.pointOperationContextForCircuitBreaker = pointOperationContextForCircuitBreaker;
this.availabilityStrategyContext = availabilityStrategyContext;
this.shouldAddHubRegionProcessingOnlyHeader = shouldAddHubRegionProcessingOnlyHeader;
}

public FeedOperationContextForCircuitBreaker getFeedOperationContextForCircuitBreaker() {
Expand Down Expand Up @@ -54,4 +58,12 @@ public void setPerPartitionAutomaticFailoverAppliedStatusForReads(boolean perPar
public boolean hasPerPartitionAutomaticFailoverBeenAppliedForReads() {
return this.hasPerPartitionAutomaticFailoverBeenAppliedForReads.get();
}

public void setShouldAddHubRegionProcessingOnlyHeader(boolean shouldAddHubRegionProcessingOnlyHeader) {
this.shouldAddHubRegionProcessingOnlyHeader.set(shouldAddHubRegionProcessingOnlyHeader);
}

public boolean shouldAddHubRegionProcessingOnlyHeader() {
return this.shouldAddHubRegionProcessingOnlyHeader.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ public static Mono<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
}

public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request) {
RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request);
return this.resolveServiceEndpoint(request, false);
}

public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request, boolean isInHubRegionDiscoveryMode) {
RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode);
if (request.faultInjectionRequestContext != null) {
// TODO: integrate thin client into fault injection
request.faultInjectionRequestContext.setRegionalRoutingContextToRoute(serviceEndpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ public static class HttpHeaders {

// Throughput bucket header
public static final String THROUGHPUT_BUCKET = "x-ms-cosmos-throughput-bucket";

// Region affinity headers
public static final String HUB_REGION_PROCESSING_ONLY = "x-ms-cosmos-hub-region-processing-only";
}

public static class A_IMHeaderValues {
Expand Down
Loading
Loading