Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance(
false,
collectionLink,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(true, true)
)
new AvailabilityStrategyContext(true, true),
new AtomicBoolean(false))
);

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,8 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance(
false,
collectionLink,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(false, false)));
new AvailabilityStrategyContext(false, false),
new AtomicBoolean(false)));

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ private static RxDocumentServiceRequest createRequest(
true,
collectionResourceId,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(true, false)
));
new AvailabilityStrategyContext(true, false),
new AtomicBoolean(false)));
} else {
request.requestContext.setCrossRegionAvailabilityContext(
new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
Expand All @@ -310,8 +310,8 @@ private static RxDocumentServiceRequest createRequest(
false,
collectionResourceId,
new SerializationDiagnosticsContext()),
new AvailabilityStrategyContext(false, false)
));
new AvailabilityStrategyContext(false, false),
new AtomicBoolean(false)));
}

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -179,13 +180,17 @@ private RxDocumentServiceRequest createDocumentServiceRequest() {
options);

if (request.requestContext != null) {

AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);

request.requestContext.setExcludeRegions(options.getExcludedRegions());
request.requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers());
request.requestContext.setCrossRegionAvailabilityContext(
new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
new FeedOperationContextForCircuitBreaker(new ConcurrentHashMap<>(), false, collectionLink),
null,
new AvailabilityStrategyContext(false, false)));
new AvailabilityStrategyContext(false, false),
shouldAddHubRegionProcessingOnlyHeader));
}

return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) {
logger.info("Endpoint not writable. Will refresh cache and retry ", e);

if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false)) {
if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false, true)) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}

Expand Down Expand Up @@ -266,6 +266,19 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ

checkNotNull(request.requestContext, "Argument 'crossRegionAvailabilityContextForRequest' cannot be null!");
crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true);

if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.shouldAddHubRegionProcessingOnlyHeader(request)) {

if (request.requestContext != null) {

CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext
= request.requestContext.getCrossRegionAvailabilityContext();

if (crossRegionAvailabilityContext != null) {
crossRegionAvailabilityContext.setShouldAddHubRegionProcessingOnlyHeader(true);
}
}
}
}

return ShouldRetryResult.retryAfter(Duration.ZERO);
Expand All @@ -282,11 +295,6 @@ private Mono<ShouldRetryResult> shouldRetryOnEndpointFailureAsync(boolean isRead

Mono<Void> refreshLocationCompletable = this.refreshLocation(isReadRequest, forceRefresh, usePreferredLocations);

// if PPAF is enabled, mark pk-range as unavailable and force a retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this getting removed?

if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false)) {
return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
}

// Some requests may be in progress when the endpoint manager and client are closed.
// In that case, the request won't succeed since the http client is closed.
// Therefore just skip the retry here to avoid the delay because retrying won't go through in the end.
Expand Down Expand Up @@ -520,6 +528,15 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) {

if (request.requestContext != null) {
request.requestContext.routeToLocation(this.regionalRoutingContext);

CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext
= request.requestContext.getCrossRegionAvailabilityContext();

if (crossRegionAvailabilityContext != null) {
if (crossRegionAvailabilityContext.shouldAddHubRegionProcessingOnlyHeader()) {
request.getHeaders().put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, "true");
}
}
}

// In case PPAF is enabled and a location override exists for the partition key range assigned to the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest {

private final AtomicBoolean hasPerPartitionAutomaticFailoverBeenAppliedForReads = new AtomicBoolean(false);

private final AtomicBoolean shouldAddHubRegionProcessingOnlyHeader;

private final FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker;

private final PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker;
Expand All @@ -20,11 +22,13 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest {
public CrossRegionAvailabilityContextForRxDocumentServiceRequest(
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker,
PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker,
AvailabilityStrategyContext availabilityStrategyContext) {
AvailabilityStrategyContext availabilityStrategyContext,
AtomicBoolean shouldAddHubRegionProcessingOnlyHeader) {

this.feedOperationContextForCircuitBreaker = feedOperationContextForCircuitBreaker;
this.pointOperationContextForCircuitBreaker = pointOperationContextForCircuitBreaker;
this.availabilityStrategyContext = availabilityStrategyContext;
this.shouldAddHubRegionProcessingOnlyHeader = shouldAddHubRegionProcessingOnlyHeader;
}

public FeedOperationContextForCircuitBreaker getFeedOperationContextForCircuitBreaker() {
Expand Down Expand Up @@ -54,4 +58,12 @@ public void setPerPartitionAutomaticFailoverAppliedStatusForReads(boolean perPar
public boolean hasPerPartitionAutomaticFailoverBeenAppliedForReads() {
return this.hasPerPartitionAutomaticFailoverBeenAppliedForReads.get();
}

public void setShouldAddHubRegionProcessingOnlyHeader(boolean shouldAddHubRegionProcessingOnlyHeader) {
this.shouldAddHubRegionProcessingOnlyHeader.set(shouldAddHubRegionProcessingOnlyHeader);
}

public boolean shouldAddHubRegionProcessingOnlyHeader() {
return this.shouldAddHubRegionProcessingOnlyHeader.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ public static class HttpHeaders {

// Throughput bucket header
public static final String THROUGHPUT_BUCKET = "x-ms-cosmos-throughput-bucket";

// Region affinity headers
public static final String HUB_REGION_PROCESSING_ONLY = "x-ms-cosmos-hub-region-processing-only";
}

public static class A_IMHeaderValues {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,8 @@ private Mono<RxDocumentServiceRequest> getBatchDocumentRequest(DocumentClientRet

MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics);

AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);

request.requestContext.setCrossRegionAvailabilityContext(

new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
Expand All @@ -2254,7 +2256,8 @@ private Mono<RxDocumentServiceRequest> getBatchDocumentRequest(DocumentClientRet
false,
documentCollectionLink,
serializationDiagnosticsContext),
null));
null,
shouldAddHubRegionProcessingOnlyHeader));

return this.collectionCache.resolveCollectionAsync(metadataDiagnosticsContext, request)
.flatMap(documentCollectionValueHolder -> {
Expand Down Expand Up @@ -7230,6 +7233,7 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
nonNullRequestOptions);

AtomicBoolean isOperationSuccessful = new AtomicBoolean(false);
AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);

if (orderedApplicableRegionsForSpeculation.size() < 2) {
// There is at most one applicable region - no hedging possible
Expand All @@ -7247,7 +7251,8 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
null,
pointOperationContextForCircuitBreakerForMainRequest,
availabilityStrategyContextForMainRequest);
availabilityStrategyContextForMainRequest,
shouldAddHubRegionProcessingOnlyHeader);

return callback.apply(nonNullRequestOptions, endToEndPolicyConfig, innerDiagnosticsFactory, crossRegionAvailabilityContextForMainRequest);
}
Expand Down Expand Up @@ -7283,7 +7288,8 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
null,
pointOperationContextForCircuitBreakerForMainRequest,
availabilityStrategyContextForMainRequest);
availabilityStrategyContextForMainRequest,
shouldAddHubRegionProcessingOnlyHeader);

Mono<NonTransientPointOperationResult> initialMonoAcrossAllRegions =
callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForMainRequest)
Expand Down Expand Up @@ -7327,7 +7333,8 @@ private Mono<ResourceResponse<Document>> wrapPointOperationWithAvailabilityStrat
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
null,
pointOperationContextForCircuitBreakerForHedgedRequest,
availabilityStrategyContextForHedgedRequest);
availabilityStrategyContextForHedgedRequest,
shouldAddHubRegionProcessingOnlyHeader);

Mono<NonTransientPointOperationResult> regionalCrossRegionRetryMono =
callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForHedgedRequest)
Expand Down Expand Up @@ -7649,6 +7656,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType);

AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false);

req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig);

List<String> initialExcludedRegions = req.requestContext.getExcludeRegions();
Expand Down Expand Up @@ -7676,7 +7685,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow,
null,
availabilityStrategyContext);
availabilityStrategyContext,
shouldAddHubRegionProcessingOnlyHeader);

req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);

Expand All @@ -7697,7 +7707,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow,
null,
availabilityStrategyContext);
availabilityStrategyContext,
shouldAddHubRegionProcessingOnlyHeader);

req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);

Expand Down Expand Up @@ -7728,7 +7739,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForNonHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForNonHedgedRequest,
null,
availabilityStrategyContextForNonHedgedRequest);
availabilityStrategyContextForNonHedgedRequest,
shouldAddHubRegionProcessingOnlyHeader);

clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest);

Expand Down Expand Up @@ -7771,8 +7783,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForHedgedRequest,
null,
availabilityStrategyContextForHedgedRequest
);
availabilityStrategyContextForHedgedRequest,
shouldAddHubRegionProcessingOnlyHeader);

clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForHedgedRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,10 @@ public void setThroughputBucket(Integer throughputBucket) {
}
}

public void setHubRegionProcessingOnly(boolean hubRegionProcessingOnly) {
this.headers.put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, Boolean.toString(hubRegionProcessingOnly));
}

public Duration getResponseTimeout() {
return responseTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,8 @@ public enum RntbdRequestHeader implements RntbdHeader {
ChangeFeedWireFormatVersion((short) 0x00B2, RntbdTokenType.String, false),
PriorityLevel((short) 0x00BF, RntbdTokenType.Byte, false),
GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false),
ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false);
ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false),
HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false);

public static final List<RntbdRequestHeader> thinClientHeadersInOrderList = Arrays.asList(
EffectivePartitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ final class RntbdRequestHeaders extends RntbdTokenStream<RntbdRequestHeader> {
this.addPriorityLevel(headers);
this.addGlobalDatabaseAccountName(headers);
this.addThroughputBucket(headers);
this.addHubRegionProcessingOnly(headers);

// Normal headers (Strings, Ints, Longs, etc.)

Expand Down Expand Up @@ -294,6 +295,8 @@ private RntbdToken getCorrelatedActivityId() {

private RntbdToken getThroughputBucket() { return this.get(RntbdRequestHeader.ThroughputBucket); }

private RntbdToken getHubRegionProcessingOnly() { return this.get(RntbdRequestHeader.HubRegionProcessingOnly); }

private RntbdToken getGlobalDatabaseAccountName() {
return this.get(RntbdRequestHeader.GlobalDatabaseAccountName);
}
Expand Down Expand Up @@ -795,8 +798,7 @@ private void addPriorityLevel(final Map<String, String> headers)
}
}

private void addThroughputBucket(final Map<String, String> headers)
{
private void addThroughputBucket(final Map<String, String> headers) {
final String value = headers.get(HttpHeaders.THROUGHPUT_BUCKET);

if (StringUtils.isNotEmpty(value)) {
Expand All @@ -805,6 +807,15 @@ private void addThroughputBucket(final Map<String, String> headers)
}
}

private void addHubRegionProcessingOnly(final Map<String, String> headers) {
final String value = headers.get(HttpHeaders.HUB_REGION_PROCESSING_ONLY);

if (StringUtils.isNotEmpty(value)) {
final boolean hubRegionProcessingOnly = Boolean.parseBoolean(value);
this.getHubRegionProcessingOnly().setValue(hubRegionProcessingOnly);
}
}

private void addGlobalDatabaseAccountName(final Map<String, String> headers)
{
final String value = headers.get(HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME);
Expand Down
Loading
Loading