Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,43 @@ protected void updateOverallBalancingState() {
overallState = TRANSIENT_FAILURE;
}

// gRFC A61: trigger lazy initialization outside picker when no READY child exists,
// at least one child is in TRANSIENT_FAILURE, and no child is currently CONNECTING
maybeTriggerIdleChildConnection(numReady, numTF, numConnecting, numIdle);

RingHashPicker picker =
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeaderKey, random);
getHelper().updateBalancingState(overallState, picker);
this.currentConnectivityState = overallState;
}

/**
* Triggers proactive connection of an IDLE child load balancer when the following conditions are
* met.
*
* <ul>
* <li>there is no READY child
* <li>at least one child is in TRANSIENT_FAILURE
* <li>no child is currently CONNECTING
* <li>there exists at least one IDLE child
* </ul>
*
* <p>This corresponds to the proactive connection logic described in gRFC A61, where recovery
* from TRANSIENT_FAILURE must be triggered outside the picker when no active connection attempt
* is in progress.
*/
private void maybeTriggerIdleChildConnection(
int numReady, int numTF, int numConnecting, int numIdle) {
if (numReady == 0 && numTF > 0 && numConnecting == 0 && numIdle > 0) {
for (ChildLbState child : getChildLbStates()) {
if (child.getCurrentState() == ConnectivityState.IDLE) {
child.getLb().requestConnection();
return;
}
}
}
}

@Override
protected ChildLbState createChildLbState(Object key) {
return new ChildLbState(key, lazyLbFactory);
Expand Down
62 changes: 47 additions & 15 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void aggregateSubchannelStates_connectingReadyIdleFailure() {
inOrder.verify(helper).refreshNameResolution();
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any());
}
verifyConnection(0);
verifyConnection(1);
}

private void verifyConnection(int times) {
Expand Down Expand Up @@ -537,7 +537,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha
// Bring one subchannel to TRANSIENT_FAILURE.
deliverSubchannelUnreachable(getSubChannel(servers.get(0)));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Pick subchannel with random hash does trigger connection by walking the ring
// and choosing the first (at most one) IDLE subchannel along the way.
Expand Down Expand Up @@ -583,7 +583,7 @@ public void skipFailingHosts_pickNextNonFailingHost() {
getSubChannel(servers.get(0)),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(helper, atLeastOnce()).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
Expand Down Expand Up @@ -649,7 +649,7 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
ConnectivityStateInfo.forTransientFailure(
Status.PERMISSION_DENIED.withDescription("permission denied")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);
PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel
assertThat(result.getStatus().isOk()).isTrue();
int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1;
Expand Down Expand Up @@ -721,7 +721,7 @@ public void allSubchannelsInTransientFailure() {
}
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = getDefaultPickSubchannelArgsForServer(0);
Expand All @@ -740,12 +740,13 @@ public void firstSubchannelIdle() {
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
initializeLbSubchannels(config, servers);

// Go to TF does nothing, though PF will try to reconnect after backoff
// As per gRFC A61, entering TF triggers a proactive connection attempt
// on an IDLE subchannel because no other subchannel is currently CONNECTING.
deliverSubchannelState(getSubchannel(servers, 1),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand Down Expand Up @@ -796,7 +797,7 @@ public void firstSubchannelFailure() {
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

// Per GRFC A61 Picking subchannel should no longer request connections that were failing
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand All @@ -805,8 +806,6 @@ public void firstSubchannelFailure() {
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isNull();
verify(subchannelList.get(0), never()).requestConnection(); // In TF
verify(subchannelList.get(1)).requestConnection();
verify(subchannelList.get(2), never()).requestConnection(); // Not one of the first 2
}

@Test
Expand All @@ -824,7 +823,7 @@ public void secondSubchannelConnecting() {

Subchannel firstSubchannel = getSubchannel(servers, 0);
deliverSubchannelUnreachable(firstSubchannel);
verifyConnection(0);
verifyConnection(1);

deliverSubchannelState(getSubchannel(servers, 2), CSI_CONNECTING);
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Expand All @@ -833,7 +832,7 @@ public void secondSubchannelConnecting() {
// Picking subchannel when idle triggers connection.
deliverSubchannelState(getSubchannel(servers, 2),
ConnectivityStateInfo.forNonError(IDLE));
verifyConnection(0);
verifyConnection(1);
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
Expand All @@ -857,7 +856,7 @@ public void secondSubchannelFailure() {
deliverSubchannelUnreachable(firstSubchannel);
deliverSubchannelUnreachable(getSubchannel(servers, 2));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel triggers connection.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand Down Expand Up @@ -887,7 +886,7 @@ public void thirdSubchannelConnecting() {
deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING);
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(2);

// Picking subchannel should not trigger connection per gRFC A61.
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
Expand All @@ -909,7 +908,7 @@ public void stickyTransientFailure() {
deliverSubchannelUnreachable(firstSubchannel);

verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(0);
verifyConnection(1);

reset(helper);
deliverSubchannelState(firstSubchannel, ConnectivityStateInfo.forNonError(IDLE));
Expand Down Expand Up @@ -1127,6 +1126,39 @@ public void config_equalsTester() {
.testEquals();
}

@Test
public void tfWithoutConnectingChild_triggersIdleChildConnection() {
RingHashConfig config = new RingHashConfig(10, 100, "");
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1);

initializeLbSubchannels(config, servers);

Subchannel tfSubchannel = getSubchannel(servers, 0);
Subchannel idleSubchannel = getSubchannel(servers, 1);

deliverSubchannelUnreachable(tfSubchannel);

Subchannel requested = connectionRequestedQueue.poll();
assertThat(requested).isSameInstanceAs(idleSubchannel);
assertThat(connectionRequestedQueue.poll()).isNull();
}

@Test
public void tfWithReadyChild_doesNotTriggerIdleChildConnection() {
RingHashConfig config = new RingHashConfig(10, 100, "");
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);

initializeLbSubchannels(config, servers);

Subchannel tfSubchannel = getSubchannel(servers, 0);
Subchannel readySubchannel = getSubchannel(servers, 1);

deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelUnreachable(tfSubchannel);

assertThat(connectionRequestedQueue.poll()).isNull();
}

private List<Subchannel> initializeLbSubchannels(RingHashConfig config,
List<EquivalentAddressGroup> servers, InitializationFlags... initFlags) {

Expand Down