diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index e9fda4e9ec3..bdade61efc6 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -95,6 +95,7 @@ import io.grpc.internal.RetriableStream.ChannelBufferMeter; import io.grpc.internal.RetriableStream.Throttle; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -167,11 +168,9 @@ public Result selectConfig(PickSubchannelArgs args) { @Nullable private final ChannelCredentials originalChannelCreds; private final ClientTransportFactory transportFactory; - private final ClientTransportFactory oobTransportFactory; private final RestrictedScheduledExecutor scheduledExecutor; private final Executor executor; private final ObjectPool executorPool; - private final ObjectPool balancerRpcExecutorPool; private final ExecutorHolder balancerRpcExecutorHolder; private final ExecutorHolder offloadExecutorHolder; private final TimeProvider timeProvider; @@ -240,9 +239,6 @@ public void uncaughtException(Thread t, Throwable e) { private Collection> pendingCalls; private final Object pendingCallsInUseObject = new Object(); - // Must be mutated from syncContext - private final Set oobChannels = new HashSet<>(1, .75f); - // reprocess() must be run from syncContext private final DelayedClientTransport delayedTransport; private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry @@ -312,9 +308,6 @@ private void maybeShutdownNowSubchannels() { for (InternalSubchannel subchannel : subchannels) { subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); } - for (OobChannel oobChannel : oobChannels) { - oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS); - } } } @@ -334,7 +327,6 @@ public void run() { builder.setTarget(target).setState(channelStateManager.getState()); List children = new ArrayList<>(); children.addAll(subchannels); - children.addAll(oobChannels); builder.setSubchannels(children); ret.set(builder.build()); } @@ -564,8 +556,6 @@ ClientStream newSubstream( new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool")); this.transportFactory = new CallCredentialsApplyingTransportFactory( clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder); - this.oobTransportFactory = new CallCredentialsApplyingTransportFactory( - clientTransportFactory, null, this.offloadExecutorHolder); this.scheduledExecutor = new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService()); maxTraceEvents = builder.maxTraceEvents; @@ -604,8 +594,8 @@ ClientStream newSubstream( this.nameResolverArgs = nameResolverArgsBuilder.build(); this.nameResolver = getNameResolver( targetUri, authorityOverride, nameResolverProvider, nameResolverArgs); - this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); - this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); + this.balancerRpcExecutorHolder = new ExecutorHolder( + checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool")); this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); this.delayedTransport.start(delayedTransportListener); this.backoffPolicyProvider = backoffPolicyProvider; @@ -1187,7 +1177,7 @@ private void maybeTerminateChannel() { if (terminated) { return; } - if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) { + if (shutdown.get() && subchannels.isEmpty()) { channelLogger.log(ChannelLogLevel.INFO, "Terminated"); channelz.removeRootChannel(this); executorPool.returnObject(executor); @@ -1201,13 +1191,6 @@ private void maybeTerminateChannel() { } } - // Must be called from syncContext - private void handleInternalSubchannelState(ConnectivityStateInfo newState) { - if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { - refreshNameResolution(); - } - } - @Override public ConnectivityState getState(boolean requestConnection) { ConnectivityState savedChannelState = channelStateManager.getState(); @@ -1253,9 +1236,6 @@ public void run() { for (InternalSubchannel subchannel : subchannels) { subchannel.resetConnectBackoff(); } - for (OobChannel oobChannel : oobChannels) { - oobChannel.resetConnectBackoff(); - } } } @@ -1413,86 +1393,28 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, Stri @Override public ManagedChannel createOobChannel(List addressGroup, String authority) { - // TODO(ejona): can we be even stricter? Like terminating? - checkState(!terminated, "Channel is terminated"); - long oobChannelCreationTime = timeProvider.currentTimeNanos(); - InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null); - InternalLogId subchannelLogId = - InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority); - ChannelTracer oobChannelTracer = - new ChannelTracer( - oobLogId, maxTraceEvents, oobChannelCreationTime, - "OobChannel for " + addressGroup); - final OobChannel oobChannel = new OobChannel( - authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(), - syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider); - channelTracer.reportEvent(new ChannelTrace.Event.Builder() - .setDescription("Child OobChannel created") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(oobChannelCreationTime) - .setChannelRef(oobChannel) - .build()); - ChannelTracer subchannelTracer = - new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime, - "Subchannel for " + addressGroup); - ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider); - final class ManagedOobChannelCallback extends InternalSubchannel.Callback { - @Override - void onTerminated(InternalSubchannel is) { - oobChannels.remove(oobChannel); - channelz.removeSubchannel(is); - oobChannel.handleSubchannelTerminated(); - maybeTerminateChannel(); - } - - @Override - void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's - // state and refresh name resolution if necessary. - handleInternalSubchannelState(newState); - oobChannel.handleSubchannelStateChange(newState); - } - } - - final InternalSubchannel internalSubchannel = new InternalSubchannel( - CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(), - authority, userAgent, backoffPolicyProvider, oobTransportFactory, - oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext, - // All callback methods are run from syncContext - new ManagedOobChannelCallback(), - channelz, - callTracerFactory.create(), - subchannelTracer, - subchannelLogId, - subchannelLogger, - transportFilters, - target, - lbHelper.getMetricRecorder()); - oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() - .setDescription("Child Subchannel created") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(oobChannelCreationTime) - .setSubchannelRef(internalSubchannel) - .build()); - channelz.addSubchannel(oobChannel); - channelz.addSubchannel(internalSubchannel); - oobChannel.setSubchannel(internalSubchannel); - final class AddOobChannel implements Runnable { - @Override - public void run() { - if (terminating) { - oobChannel.shutdown(); - } - if (!terminated) { - // If channel has not terminated, it will track the subchannel and block termination - // for it. - oobChannels.add(oobChannel); - } - } - } - - syncContext.execute(new AddOobChannel()); - return oobChannel; + NameResolverRegistry nameResolverRegistry = new NameResolverRegistry(); + OobNameResolverProvider resolverProvider = + new OobNameResolverProvider(authority, addressGroup, syncContext); + nameResolverRegistry.register(resolverProvider); + // We could use a hard-coded target, as the name resolver won't actually use this string. + // However, that would make debugging less clear, as we use the target to identify the + // channel. + String target; + try { + target = new URI("oob", "", "/" + authority, null, null).toString(); + } catch (URISyntaxException ex) { + // Any special characters in the path will be percent encoded. So this should be impossible. + throw new AssertionError(ex); + } + ManagedChannel delegate = createResolvingOobChannelBuilder( + target, new DefaultChannelCreds(), nameResolverRegistry) + // TODO(zdapeng): executors should not outlive the parent channel. + .executor(balancerRpcExecutorHolder.getExecutor()) + .idleTimeout(Integer.MAX_VALUE, TimeUnit.SECONDS) + .disableRetry() + .build(); + return new OobChannel(delegate, resolverProvider); } @Deprecated @@ -1504,11 +1426,17 @@ public ManagedChannelBuilder createResolvingOobChannelBuilder(String target) .overrideAuthority(getAuthority()); } - // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated - // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz. @Override public ManagedChannelBuilder createResolvingOobChannelBuilder( final String target, final ChannelCredentials channelCreds) { + return createResolvingOobChannelBuilder(target, channelCreds, nameResolverRegistry); + } + + // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated + // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz. + private ManagedChannelBuilder createResolvingOobChannelBuilder( + final String target, final ChannelCredentials channelCreds, + NameResolverRegistry nameResolverRegistry) { checkNotNull(channelCreds, "channelCreds"); final class ResolvingOobChannelBuilder @@ -1641,6 +1569,19 @@ public ChannelCredentials withoutBearerTokens() { } } + static final class OobChannel extends ForwardingManagedChannel { + private final OobNameResolverProvider resolverProvider; + + public OobChannel(ManagedChannel delegate, OobNameResolverProvider resolverProvider) { + super(delegate); + this.resolverProvider = checkNotNull(resolverProvider, "resolverProvider"); + } + + public void updateAddresses(List eags) { + resolverProvider.updateAddresses(eags); + } + } + final class NameResolverListener extends NameResolver.Listener2 { final LbHelperImpl helper; final NameResolver resolver; diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java deleted file mode 100644 index b2272a89672..00000000000 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Copyright 2016 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.ClientCall; -import io.grpc.ClientStreamTracer; -import io.grpc.ConnectivityState; -import io.grpc.ConnectivityStateInfo; -import io.grpc.Context; -import io.grpc.EquivalentAddressGroup; -import io.grpc.InternalChannelz; -import io.grpc.InternalChannelz.ChannelStats; -import io.grpc.InternalChannelz.ChannelTrace; -import io.grpc.InternalInstrumented; -import io.grpc.InternalLogId; -import io.grpc.InternalWithLogId; -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancer.FixedResultPicker; -import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelPicker; -import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.internal.ClientCallImpl.ClientStreamProvider; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.concurrent.ThreadSafe; - -/** - * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer} - * to its own RPC needs. - */ -@ThreadSafe -final class OobChannel extends ManagedChannel implements InternalInstrumented { - private static final Logger log = Logger.getLogger(OobChannel.class.getName()); - - private InternalSubchannel subchannel; - private AbstractSubchannel subchannelImpl; - private SubchannelPicker subchannelPicker; - - private final InternalLogId logId; - private final String authority; - private final DelayedClientTransport delayedTransport; - private final InternalChannelz channelz; - private final ObjectPool executorPool; - private final Executor executor; - private final ScheduledExecutorService deadlineCancellationExecutor; - private final CountDownLatch terminatedLatch = new CountDownLatch(1); - private volatile boolean shutdown; - private final CallTracer channelCallsTracer; - private final ChannelTracer channelTracer; - private final TimeProvider timeProvider; - - private final ClientStreamProvider transportProvider = new ClientStreamProvider() { - @Override - public ClientStream newStream(MethodDescriptor method, - CallOptions callOptions, Metadata headers, Context context) { - ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( - callOptions, headers, 0, /* isTransparentRetry= */ false, - /* isHedging= */ false); - Context origContext = context.attach(); - // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't - // matter here because OOB communication should be sparse, and it's not on application RPC's - // critical path. - try { - return delayedTransport.newStream(method, headers, callOptions, tracers); - } finally { - context.detach(origContext); - } - } - }; - - OobChannel( - String authority, ObjectPool executorPool, - ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext, - CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz, - TimeProvider timeProvider) { - this.authority = checkNotNull(authority, "authority"); - this.logId = InternalLogId.allocate(getClass(), authority); - this.executorPool = checkNotNull(executorPool, "executorPool"); - this.executor = checkNotNull(executorPool.getObject(), "executor"); - this.deadlineCancellationExecutor = checkNotNull( - deadlineCancellationExecutor, "deadlineCancellationExecutor"); - this.delayedTransport = new DelayedClientTransport(executor, syncContext); - this.channelz = Preconditions.checkNotNull(channelz); - this.delayedTransport.start(new ManagedClientTransport.Listener() { - @Override - public void transportShutdown(Status s, DisconnectError e) { - // Don't care - } - - @Override - public void transportTerminated() { - subchannelImpl.shutdown(); - } - - @Override - public void transportReady() { - // Don't care - } - - @Override - public Attributes filterTransport(Attributes attributes) { - return attributes; - } - - @Override - public void transportInUse(boolean inUse) { - // Don't care - } - }); - this.channelCallsTracer = callsTracer; - this.channelTracer = checkNotNull(channelTracer, "channelTracer"); - this.timeProvider = checkNotNull(timeProvider, "timeProvider"); - } - - // Must be called only once, right after the OobChannel is created. - void setSubchannel(final InternalSubchannel subchannel) { - log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel}); - this.subchannel = subchannel; - subchannelImpl = new AbstractSubchannel() { - @Override - public void shutdown() { - subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown")); - } - - @Override - InternalInstrumented getInstrumentedInternalSubchannel() { - return subchannel; - } - - @Override - public void requestConnection() { - subchannel.obtainActiveTransport(); - } - - @Override - public List getAllAddresses() { - return subchannel.getAddressGroups(); - } - - @Override - public Attributes getAttributes() { - return Attributes.EMPTY; - } - - @Override - public Object getInternalSubchannel() { - return subchannel; - } - }; - - subchannelPicker = new FixedResultPicker(PickResult.withSubchannel(subchannelImpl)); - delayedTransport.reprocess(subchannelPicker); - } - - void updateAddresses(List eag) { - subchannel.updateAddresses(eag); - } - - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, CallOptions callOptions) { - return new ClientCallImpl<>(methodDescriptor, - callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), - callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, null); - } - - @Override - public String authority() { - return authority; - } - - @Override - public boolean isTerminated() { - return terminatedLatch.getCount() == 0; - } - - @Override - public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException { - return terminatedLatch.await(time, unit); - } - - @Override - public ConnectivityState getState(boolean requestConnectionIgnored) { - if (subchannel == null) { - return ConnectivityState.IDLE; - } - return subchannel.getState(); - } - - @Override - public ManagedChannel shutdown() { - shutdown = true; - delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called")); - return this; - } - - @Override - public boolean isShutdown() { - return shutdown; - } - - @Override - public ManagedChannel shutdownNow() { - shutdown = true; - delayedTransport.shutdownNow( - Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called")); - return this; - } - - void handleSubchannelStateChange(final ConnectivityStateInfo newState) { - channelTracer.reportEvent( - new ChannelTrace.Event.Builder() - .setDescription("Entering " + newState.getState() + " state") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timeProvider.currentTimeNanos()) - .build()); - switch (newState.getState()) { - case READY: - case IDLE: - delayedTransport.reprocess(subchannelPicker); - break; - case TRANSIENT_FAILURE: - delayedTransport.reprocess( - new FixedResultPicker(PickResult.withError(newState.getStatus()))); - break; - default: - // Do nothing - } - } - - // must be run from channel executor - void handleSubchannelTerminated() { - channelz.removeSubchannel(this); - // When delayedTransport is terminated, it shuts down subchannel. Therefore, at this point - // both delayedTransport and subchannel have terminated. - executorPool.returnObject(executor); - terminatedLatch.countDown(); - } - - @VisibleForTesting - Subchannel getSubchannel() { - return subchannelImpl; - } - - InternalSubchannel getInternalSubchannel() { - return subchannel; - } - - @Override - public ListenableFuture getStats() { - final SettableFuture ret = SettableFuture.create(); - final ChannelStats.Builder builder = new ChannelStats.Builder(); - channelCallsTracer.updateBuilder(builder); - channelTracer.updateBuilder(builder); - builder - .setTarget(authority) - .setState(subchannel.getState()) - .setSubchannels(Collections.singletonList(subchannel)); - ret.set(builder.build()); - return ret; - } - - @Override - public InternalLogId getLogId() { - return logId; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("logId", logId.getId()) - .add("authority", authority) - .toString(); - } - - @Override - public void resetConnectBackoff() { - subchannel.resetConnectBackoff(); - } -} diff --git a/core/src/main/java/io/grpc/internal/OobNameResolverProvider.java b/core/src/main/java/io/grpc/internal/OobNameResolverProvider.java new file mode 100644 index 00000000000..408b92e0c84 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/OobNameResolverProvider.java @@ -0,0 +1,121 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static java.util.Objects.requireNonNull; + +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import io.grpc.StatusOr; +import io.grpc.SynchronizationContext; +import java.net.URI; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +/** + * A provider that is passed addresses and relays those addresses to its created resolvers. + */ +final class OobNameResolverProvider extends NameResolverProvider { + private final String authority; + private final SynchronizationContext parentSyncContext; + // Only accessed from parentSyncContext + @SuppressWarnings("JdkObsolete") // LinkedList uses O(n) memory, including after deletions + private final Collection resolvers = new LinkedList<>(); + // Only accessed from parentSyncContext + private List lastEags; + + public OobNameResolverProvider( + String authority, List eags, SynchronizationContext syncContext) { + this.authority = requireNonNull(authority, "authority"); + this.lastEags = requireNonNull(eags, "eags"); + this.parentSyncContext = requireNonNull(syncContext, "syncContext"); + } + + @Override + public String getDefaultScheme() { + return "oob"; + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; // Doesn't matter, as we expect only one provider in the registry + } + + public void updateAddresses(List eags) { + requireNonNull(eags, "eags"); + parentSyncContext.execute(() -> { + this.lastEags = eags; + for (OobNameResolver resolver : resolvers) { + resolver.updateAddresses(eags); + } + }); + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new OobNameResolver(args.getSynchronizationContext()); + } + + final class OobNameResolver extends NameResolver { + private final SynchronizationContext syncContext; + // Null before started, and after shutdown. Only accessed from syncContext + private Listener2 listener; + + public OobNameResolver(SynchronizationContext syncContext) { + this.syncContext = requireNonNull(syncContext, "syncContext"); + } + + @Override + public String getServiceAuthority() { + return authority; + } + + @Override + public void start(Listener2 listener) { + this.listener = requireNonNull(listener, "listener"); + parentSyncContext.execute(() -> { + resolvers.add(this); + updateAddresses(lastEags); + }); + } + + void updateAddresses(List eags) { + parentSyncContext.throwIfNotInThisSynchronizationContext(); + syncContext.execute(() -> { + if (listener == null) { + return; + } + listener.onResult2(ResolutionResult.newBuilder() + .setAddressesOrError(StatusOr.fromValue(lastEags)) + .build()); + }); + } + + @Override + public void shutdown() { + this.listener = null; + parentSyncContext.execute(() -> resolvers.remove(this)); + } + } +} diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 7b3e725991c..731fac94044 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -285,10 +285,6 @@ public String getPolicyName() { @Mock private ClientCall.Listener mockCallListener3; @Mock - private ClientCall.Listener mockCallListener4; - @Mock - private ClientCall.Listener mockCallListener5; - @Mock private ObjectPool executorPool; @Mock private ObjectPool balancerRpcExecutorPool; @@ -817,47 +813,6 @@ public void channelzMembership_subchannel() throws Exception { assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); } - @Test - public void channelzMembership_oob() throws Exception { - createChannel(); - OobChannel oob = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), AUTHORITY); - // oob channels are not root channels - assertNull(channelz.getRootChannel(oob.getLogId().getId())); - assertTrue(channelz.containsSubchannel(oob.getLogId())); - assertThat(getStats(channel).subchannels).containsExactly(oob); - assertTrue(channelz.containsSubchannel(oob.getLogId())); - - AbstractSubchannel subchannel = (AbstractSubchannel) oob.getSubchannel(); - assertTrue( - channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); - assertThat(getStats(oob).subchannels) - .containsExactly(subchannel.getInstrumentedInternalSubchannel()); - assertTrue( - channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); - - oob.getSubchannel().requestConnection(); - MockClientTransportInfo transportInfo = transports.poll(); - assertNotNull(transportInfo); - assertTrue(channelz.containsClientSocket(transportInfo.transport.getLogId())); - - // terminate transport - transportInfo.listener.transportShutdown(Status.INTERNAL, - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - transportInfo.listener.transportTerminated(); - assertFalse(channelz.containsClientSocket(transportInfo.transport.getLogId())); - - // terminate oobchannel - oob.shutdown(); - assertFalse(channelz.containsSubchannel(oob.getLogId())); - assertThat(getStats(channel).subchannels).isEmpty(); - assertFalse( - channelz.containsSubchannel(subchannel.getInstrumentedInternalSubchannel().getLogId())); - - // channel still appears - assertNotNull(channelz.getRootChannel(channel.getLogId().getId())); - } - @Test public void callsAndShutdown() { subtestCallsAndShutdown(false, false); @@ -1801,7 +1756,7 @@ public void subchannelsNoConnectionShutdownNow() { channel.shutdownNow(); verify(mockLoadBalancer).shutdown(); - // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. + // Channel's shutdownNow() will call shutdownNow() on all subchannels. // Therefore, channel is terminated without relying on LoadBalancer to shutdown subchannels. assertTrue(channel.isTerminated()); verify(mockTransportFactory, never()) @@ -1809,114 +1764,6 @@ public void subchannelsNoConnectionShutdownNow() { any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); } - @Test - public void oobchannels() { - createChannel(); - - ManagedChannel oob1 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob1authority"); - ManagedChannel oob2 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob2authority"); - verify(balancerRpcExecutorPool, times(2)).getObject(); - - assertEquals("oob1authority", oob1.authority()); - assertEquals("oob2authority", oob2.authority()); - - // OOB channels create connections lazily. A new call will initiate the connection. - Metadata headers = new Metadata(); - ClientCall call = oob1.newCall(method, CallOptions.DEFAULT); - call.start(mockCallListener, headers); - verify(mockTransportFactory) - .newClientTransport( - eq(socketAddress), - eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)), - isA(ChannelLogger.class)); - MockClientTransportInfo transportInfo = transports.poll(); - assertNotNull(transportInfo); - - assertEquals(0, balancerRpcExecutor.numPendingTasks()); - transportInfo.listener.transportReady(); - assertEquals(1, balancerRpcExecutor.runDueTasks()); - verify(transportInfo.transport).newStream( - same(method), same(headers), same(CallOptions.DEFAULT), - ArgumentMatchers.any()); - - // The transport goes away - transportInfo.listener.transportShutdown(Status.UNAVAILABLE, - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - transportInfo.listener.transportTerminated(); - - // A new call will trigger a new transport - ClientCall call2 = oob1.newCall(method, CallOptions.DEFAULT); - call2.start(mockCallListener2, headers); - ClientCall call3 = - oob1.newCall(method, CallOptions.DEFAULT.withWaitForReady()); - call3.start(mockCallListener3, headers); - verify(mockTransportFactory, times(2)).newClientTransport( - eq(socketAddress), - eq(new ClientTransportOptions().setAuthority("oob1authority").setUserAgent(USER_AGENT)), - isA(ChannelLogger.class)); - transportInfo = transports.poll(); - assertNotNull(transportInfo); - - // This transport fails - Status transportError = Status.UNAVAILABLE.withDescription("Connection refused"); - assertEquals(0, balancerRpcExecutor.numPendingTasks()); - transportInfo.listener.transportShutdown(transportError, - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - assertTrue(balancerRpcExecutor.runDueTasks() > 0); - - // Fail-fast RPC will fail, while wait-for-ready RPC will still be pending - verify(mockCallListener2).onClose(same(transportError), any(Metadata.class)); - verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); - - // Shutdown - assertFalse(oob1.isShutdown()); - assertFalse(oob2.isShutdown()); - oob1.shutdown(); - oob2.shutdownNow(); - assertTrue(oob1.isShutdown()); - assertTrue(oob2.isShutdown()); - assertTrue(oob2.isTerminated()); - verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); - - // New RPCs will be rejected. - assertEquals(0, balancerRpcExecutor.numPendingTasks()); - ClientCall call4 = oob1.newCall(method, CallOptions.DEFAULT); - ClientCall call5 = oob2.newCall(method, CallOptions.DEFAULT); - call4.start(mockCallListener4, headers); - call5.start(mockCallListener5, headers); - assertTrue(balancerRpcExecutor.runDueTasks() > 0); - verify(mockCallListener4).onClose(statusCaptor.capture(), any(Metadata.class)); - Status status4 = statusCaptor.getValue(); - assertEquals(Status.Code.UNAVAILABLE, status4.getCode()); - verify(mockCallListener5).onClose(statusCaptor.capture(), any(Metadata.class)); - Status status5 = statusCaptor.getValue(); - assertEquals(Status.Code.UNAVAILABLE, status5.getCode()); - - // The pending RPC will still be pending - verify(mockCallListener3, never()).onClose(any(Status.class), any(Metadata.class)); - - // This will shutdownNow() the delayed transport, terminating the pending RPC - assertEquals(0, balancerRpcExecutor.numPendingTasks()); - oob1.shutdownNow(); - assertTrue(balancerRpcExecutor.runDueTasks() > 0); - verify(mockCallListener3).onClose(any(Status.class), any(Metadata.class)); - - // Shut down the channel, and it will not terminated because OOB channel has not. - channel.shutdown(); - assertFalse(channel.isTerminated()); - // Delayed transport has already terminated. Terminating the transport terminates the - // subchannel, which in turn terimates the OOB channel, which terminates the channel. - assertFalse(oob1.isTerminated()); - verify(balancerRpcExecutorPool).returnObject(balancerRpcExecutor.getScheduledExecutorService()); - transportInfo.listener.transportTerminated(); - assertTrue(oob1.isTerminated()); - assertTrue(channel.isTerminated()); - verify(balancerRpcExecutorPool, times(2)) - .returnObject(balancerRpcExecutor.getScheduledExecutorService()); - } - @Test public void oobChannelHasNoChannelCallCredentials() { Metadata.Key metadataKey = @@ -1968,7 +1815,7 @@ public void oobChannelHasNoChannelCallCredentials() { balancerRpcExecutor.runDueTasks(); verify(transportInfo.transport).newStream( - same(method), same(headers), same(callOptions), + same(method), same(headers), ArgumentMatchers.any(), ArgumentMatchers.any()); assertThat(headers.getAll(metadataKey)).containsExactly(callCredValue); oob.shutdownNow(); @@ -2095,76 +1942,6 @@ public SwapChannelCredentialsResult answer(InvocationOnMock invocation) { oob.shutdownNow(); } - @Test - public void oobChannelsWhenChannelShutdownNow() { - createChannel(); - ManagedChannel oob1 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob1Authority"); - ManagedChannel oob2 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob2Authority"); - - oob1.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata()); - oob2.newCall(method, CallOptions.DEFAULT).start(mockCallListener2, new Metadata()); - - assertThat(transports).hasSize(2); - MockClientTransportInfo ti1 = transports.poll(); - MockClientTransportInfo ti2 = transports.poll(); - - ti1.listener.transportReady(); - ti2.listener.transportReady(); - - channel.shutdownNow(); - verify(ti1.transport).shutdownNow(any(Status.class)); - verify(ti2.transport).shutdownNow(any(Status.class)); - - ti1.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - ti2.listener.transportShutdown(Status.UNAVAILABLE.withDescription("shutdown now"), - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - ti1.listener.transportTerminated(); - - assertFalse(channel.isTerminated()); - ti2.listener.transportTerminated(); - assertTrue(channel.isTerminated()); - } - - @Test - public void oobChannelsNoConnectionShutdown() { - createChannel(); - ManagedChannel oob1 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob1Authority"); - ManagedChannel oob2 = helper.createOobChannel( - Collections.singletonList(addressGroup), "oob2Authority"); - channel.shutdown(); - - verify(mockLoadBalancer).shutdown(); - oob1.shutdown(); - assertTrue(oob1.isTerminated()); - assertFalse(channel.isTerminated()); - oob2.shutdown(); - assertTrue(oob2.isTerminated()); - assertTrue(channel.isTerminated()); - verify(mockTransportFactory, never()) - .newClientTransport( - any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); - } - - @Test - public void oobChannelsNoConnectionShutdownNow() { - createChannel(); - helper.createOobChannel(Collections.singletonList(addressGroup), "oob1Authority"); - helper.createOobChannel(Collections.singletonList(addressGroup), "oob2Authority"); - channel.shutdownNow(); - - verify(mockLoadBalancer).shutdown(); - assertTrue(channel.isTerminated()); - // Channel's shutdownNow() will call shutdownNow() on all subchannels and oobchannels. - // Therefore, channel is terminated without relying on LoadBalancer to shutdown oobchannels. - verify(mockTransportFactory, never()) - .newClientTransport( - any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); - } - @Test public void subchannelChannel_normalUsage() { createChannel(); @@ -2310,69 +2087,6 @@ public void lbHelper_getNonDefaultNameResolverRegistry() { .isNotSameInstanceAs(NameResolverRegistry.getDefaultRegistry()); } - @Test - public void refreshNameResolution_whenOobChannelConnectionFailed_notIdle() { - subtestNameResolutionRefreshWhenConnectionFailed(false); - } - - @Test - public void notRefreshNameResolution_whenOobChannelConnectionFailed_idle() { - subtestNameResolutionRefreshWhenConnectionFailed(true); - } - - private void subtestNameResolutionRefreshWhenConnectionFailed(boolean isIdle) { - FakeNameResolverFactory nameResolverFactory = - new FakeNameResolverFactory.Builder(expectedUri) - .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress))) - .build(); - channelBuilder.nameResolverFactory(nameResolverFactory); - createChannel(); - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "oobAuthority"); - oobChannel.getSubchannel().requestConnection(); - - MockClientTransportInfo transportInfo = transports.poll(); - assertNotNull(transportInfo); - - FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); - - if (isIdle) { - channel.enterIdle(); - // Entering idle mode will result in a new resolver - resolver = nameResolverFactory.resolvers.remove(0); - } - - assertEquals(0, nameResolverFactory.resolvers.size()); - - int expectedRefreshCount = 0; - - // Transport closed when connecting - assertEquals(expectedRefreshCount, resolver.refreshCalled); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE, - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - // When channel enters idle, new resolver is created but not started. - if (!isIdle) { - expectedRefreshCount++; - } - assertEquals(expectedRefreshCount, resolver.refreshCalled); - - timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); - transportInfo = transports.poll(); - assertNotNull(transportInfo); - - transportInfo.listener.transportReady(); - - // Transport closed when ready - assertEquals(expectedRefreshCount, resolver.refreshCalled); - transportInfo.listener.transportShutdown(Status.UNAVAILABLE, - SimpleDisconnectError.SUBCHANNEL_SHUTDOWN); - // When channel enters idle, new resolver is created but not started. - if (!isIdle) { - expectedRefreshCount++; - } - assertEquals(expectedRefreshCount, resolver.refreshCalled); - } - /** * Test that information such as the Call's context, MethodDescriptor, authority, executor are * propagated to newStream() and applyRequestMetadata(). @@ -3525,48 +3239,6 @@ public void channelTracing_subchannelStateChangeEvent() throws Exception { .build()); } - @Test - public void channelTracing_oobChannelStateChangeEvent() throws Exception { - channelBuilder.maxTraceEvents(10); - createChannel(); - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "authority"); - timer.forwardNanos(1234); - oobChannel.handleSubchannelStateChange( - ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING)); - assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() - .setDescription("Entering CONNECTING state") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timer.getTicker().read()) - .build()); - } - - @Test - public void channelTracing_oobChannelCreationEvents() throws Exception { - channelBuilder.maxTraceEvents(10); - createChannel(); - timer.forwardNanos(1234); - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "authority"); - assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder() - .setDescription("Child OobChannel created") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timer.getTicker().read()) - .setChannelRef(oobChannel) - .build()); - assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder() - .setDescription("OobChannel for [[[test-addr]/{}]] created") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timer.getTicker().read()) - .build()); - assertThat(getStats(oobChannel.getInternalSubchannel()).channelTrace.events).contains( - new ChannelTrace.Event.Builder() - .setDescription("Subchannel for [[[test-addr]/{}]] created") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timer.getTicker().read()) - .build()); - } - @Test public void channelsAndSubchannels_instrumented_state() throws Exception { createChannel(); @@ -3682,115 +3354,6 @@ private void channelsAndSubchannels_instrumented0(boolean success) throws Except } } - @Test - public void channelsAndSubchannels_oob_instrumented_success() throws Exception { - channelsAndSubchannels_oob_instrumented0(true); - } - - @Test - public void channelsAndSubchannels_oob_instrumented_fail() throws Exception { - channelsAndSubchannels_oob_instrumented0(false); - } - - private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception { - // set up - ClientStream mockStream = mock(ClientStream.class); - createChannel(); - - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "oobauthority"); - AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); - FakeClock callExecutor = new FakeClock(); - CallOptions options = - CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); - ClientCall call = oobChannel.newCall(method, options); - Metadata headers = new Metadata(); - - // Channel stat bumped when ClientCall.start() called - assertEquals(0, getStats(oobChannel).callsStarted); - call.start(mockCallListener, headers); - assertEquals(1, getStats(oobChannel).callsStarted); - - MockClientTransportInfo transportInfo = transports.poll(); - ConnectionClientTransport mockTransport = transportInfo.transport; - ManagedClientTransport.Listener transportListener = transportInfo.listener; - when(mockTransport.newStream( - same(method), same(headers), any(CallOptions.class), - ArgumentMatchers.any())) - .thenReturn(mockStream); - - // subchannel stat bumped when call gets assigned to it - assertEquals(0, getStats(oobSubchannel).callsStarted); - transportListener.transportReady(); - callExecutor.runDueTasks(); - verify(mockStream).start(streamListenerCaptor.capture()); - assertEquals(1, getStats(oobSubchannel).callsStarted); - - ClientStreamListener streamListener = streamListenerCaptor.getValue(); - call.halfClose(); - - // closing stream listener affects subchannel stats immediately - assertEquals(0, getStats(oobSubchannel).callsSucceeded); - assertEquals(0, getStats(oobSubchannel).callsFailed); - streamListener.closed(success ? Status.OK : Status.UNKNOWN, PROCESSED, new Metadata()); - if (success) { - assertEquals(1, getStats(oobSubchannel).callsSucceeded); - assertEquals(0, getStats(oobSubchannel).callsFailed); - } else { - assertEquals(0, getStats(oobSubchannel).callsSucceeded); - assertEquals(1, getStats(oobSubchannel).callsFailed); - } - - // channel stats bumped when the ClientCall.Listener is notified - assertEquals(0, getStats(oobChannel).callsSucceeded); - assertEquals(0, getStats(oobChannel).callsFailed); - callExecutor.runDueTasks(); - if (success) { - assertEquals(1, getStats(oobChannel).callsSucceeded); - assertEquals(0, getStats(oobChannel).callsFailed); - } else { - assertEquals(0, getStats(oobChannel).callsSucceeded); - assertEquals(1, getStats(oobChannel).callsFailed); - } - // oob channel is separate from the original channel - assertEquals(0, getStats(channel).callsSucceeded); - assertEquals(0, getStats(channel).callsFailed); - } - - @Test - public void channelsAndSubchannels_oob_instrumented_name() throws Exception { - createChannel(); - - String authority = "oobauthority"; - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), authority); - assertEquals(authority, getStats(oobChannel).target); - } - - @Test - public void channelsAndSubchannels_oob_instrumented_state() throws Exception { - createChannel(); - - OobChannel oobChannel = (OobChannel) helper.createOobChannel( - Collections.singletonList(addressGroup), "oobauthority"); - assertEquals(IDLE, getStats(oobChannel).state); - - oobChannel.getSubchannel().requestConnection(); - assertEquals(CONNECTING, getStats(oobChannel).state); - - MockClientTransportInfo transportInfo = transports.poll(); - ManagedClientTransport.Listener transportListener = transportInfo.listener; - - transportListener.transportReady(); - assertEquals(READY, getStats(oobChannel).state); - - // oobchannel state is separate from the ManagedChannel - assertEquals(CONNECTING, getStats(channel).state); - channel.shutdownNow(); - assertEquals(SHUTDOWN, getStats(channel).state); - assertEquals(SHUTDOWN, getStats(oobChannel).state); - } - @Test public void binaryLogInstalled() throws Exception { final SettableFuture intercepted = SettableFuture.create(); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 2fc2a492ac8..5ed84ade2f8 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -386,11 +386,12 @@ private void useFallbackBackends() { } private void shutdownLbComm() { + shutdownLbRpc(); if (lbCommChannel != null) { - lbCommChannel.shutdown(); + // The channel should have no RPCs at this point + lbCommChannel.shutdownNow(); lbCommChannel = null; } - shutdownLbRpc(); } private void shutdownLbRpc() {