Skip to content

Commit c6734a8

Browse files
committed
Only link delayed transport AFTER real transport has called transportReady().
If TransportSet fails to connect a transport (i.e., transportShutdown() called without transportReady()), TransportSet will automatically schedule reconnection for the next address, unless it has reached the end of the address list, in which case it will fail the delayed transport. This will reduce stream errors caused by bad addresses appearing before good addresses in the resolved address list. TODO: add unit test for such logic in TransportSetTest.
1 parent 016d4a5 commit c6734a8

File tree

4 files changed

+239
-142
lines changed

4 files changed

+239
-142
lines changed

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.LinkedHashSet;
4646
import java.util.concurrent.Executor;
4747

48+
import javax.annotation.Nullable;
4849
import javax.annotation.concurrent.GuardedBy;
4950

5051
/**
@@ -232,6 +233,12 @@ public String getLogId() {
232233
return GrpcUtil.getLogId(this);
233234
}
234235

236+
@VisibleForTesting
237+
@Nullable
238+
Supplier<ClientTransport> getTransportSupplier() {
239+
return transportSupplier;
240+
}
241+
235242
private class PendingStream extends DelayedStream {
236243
private final MethodDescriptor<?, ?> method;
237244
private final Metadata headers;

core/src/main/java/io/grpc/internal/TransportSet.java

Lines changed: 60 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.base.Preconditions;
3636
import com.google.common.base.Stopwatch;
37-
import com.google.common.base.Throwables;
3837

3938
import io.grpc.EquivalentAddressGroup;
4039
import io.grpc.LoadBalancer;
@@ -44,7 +43,6 @@
4443
import java.util.ArrayList;
4544
import java.util.Collection;
4645
import java.util.List;
47-
import java.util.concurrent.Callable;
4846
import java.util.concurrent.ScheduledExecutorService;
4947
import java.util.concurrent.ScheduledFuture;
5048
import java.util.concurrent.TimeUnit;
@@ -59,8 +57,9 @@
5957
* Transports for a single {@link SocketAddress}.
6058
*/
6159
@ThreadSafe
62-
final class TransportSet implements WithLogId {
60+
final class TransportSet {
6361
private static final Logger log = Logger.getLogger(TransportSet.class.getName());
62+
6463
private static final ClientTransport SHUTDOWN_TRANSPORT =
6564
new FailingClientTransport(Status.UNAVAILABLE.withDescription("TransportSet is shutdown"));
6665

@@ -108,20 +107,16 @@ final class TransportSet implements WithLogId {
108107
/*
109108
* The transport for new outgoing requests.
110109
* - If shutdown == true, activeTransport is null (shutdown)
111-
* - Otherwise, if delayedTransport != null,
112-
* activeTransport is delayedTransport (waiting to connect)
110+
* - Otherwise, if a connection is pending or connecting,
111+
* activeTransport is a DelayedClientTransport
113112
* - Otherwise, activeTransport is either null (initially or when idle)
114-
* or points to a real transport (when connecting or connected).
113+
* or points to a real transport (when ready).
115114
*
116115
* 'lock' must be held when assigning to it.
117116
*/
118117
@Nullable
119118
private volatile ManagedClientTransport activeTransport;
120119

121-
@GuardedBy("lock")
122-
@Nullable
123-
private DelayedClientTransport delayedTransport;
124-
125120
TransportSet(EquivalentAddressGroup addressGroup, String authority,
126121
LoadBalancer<ClientTransport> loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
127122
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
@@ -155,40 +150,24 @@ final ClientTransport obtainActiveTransport() {
155150
if (savedTransport != null) {
156151
return savedTransport;
157152
}
158-
Callable<ClientTransport> immediateConnectionTask = null;
159153
synchronized (lock) {
160154
// Check again, since it could have changed before acquiring the lock
161155
if (activeTransport == null) {
162156
if (shutdown) {
163157
return SHUTDOWN_TRANSPORT;
164158
}
165-
delayedTransport = new DelayedClientTransport();
159+
DelayedClientTransport delayedTransport = new DelayedClientTransport();
166160
transports.add(delayedTransport);
167161
delayedTransport.start(new BaseTransportListener(delayedTransport));
168162
activeTransport = delayedTransport;
169-
immediateConnectionTask = scheduleConnection();
163+
scheduleConnection(delayedTransport);
170164
}
171-
savedTransport = activeTransport;
165+
return activeTransport;
172166
}
173-
if (immediateConnectionTask != null) {
174-
try {
175-
return immediateConnectionTask.call();
176-
} catch (Exception e) {
177-
throw Throwables.propagate(e);
178-
}
179-
}
180-
return savedTransport;
181167
}
182168

183-
/**
184-
* Schedule a task that creates a new transport.
185-
*
186-
* @return if not {@code null}, caller should run the returned callable outside of lock. The
187-
* callable returns the real transport that has been created.
188-
*/
189-
@Nullable
190169
@GuardedBy("lock")
191-
private Callable<ClientTransport> scheduleConnection() {
170+
private void scheduleConnection(final DelayedClientTransport delayedTransport) {
192171
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
193172
"previous reconnectTask is not done");
194173

@@ -203,47 +182,20 @@ private Callable<ClientTransport> scheduleConnection() {
203182
nextAddressIndex = 0;
204183
}
205184

206-
final Callable<ClientTransport> createTransportCallable = new Callable<ClientTransport>() {
185+
Runnable createTransportRunnable = new Runnable() {
207186
@Override
208-
public ClientTransport call() {
209-
DelayedClientTransport savedDelayedTransport;
210-
ManagedClientTransport newActiveTransport;
211-
boolean savedShutdown;
187+
public void run() {
212188
synchronized (lock) {
213-
savedShutdown = shutdown;
214189
reconnectTask = null;
215190
if (currentAddressIndex == 0) {
216191
backoffWatch.reset().start();
217192
}
218-
newActiveTransport = transportFactory.newClientTransport(address, authority);
219-
if (log.isLoggable(Level.FINE)) {
220-
log.log(Level.FINE, "[{0}] Created {1} for {2}",
221-
new Object[] {getLogId(), newActiveTransport.getLogId(), address});
222-
}
223-
transports.add(newActiveTransport);
224-
newActiveTransport.start(
225-
new TransportListener(newActiveTransport, address));
226-
if (shutdown) {
227-
// If TransportSet already shutdown, newActiveTransport is only to take care of pending
228-
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
229-
// as soon as it's set to the delayedTransport.
230-
// activeTransport should have already been set to null by shutdown(). We keep it null.
231-
Preconditions.checkState(activeTransport == null,
232-
"Unexpected non-null activeTransport");
233-
} else {
234-
activeTransport = newActiveTransport;
235-
}
236-
savedDelayedTransport = delayedTransport;
237-
delayedTransport = null;
238-
}
239-
savedDelayedTransport.setTransport(newActiveTransport);
240-
// This delayed transport will terminate and be removed from transports.
241-
savedDelayedTransport.shutdown();
242-
if (savedShutdown) {
243-
// See comments in the synchronized block above on why we shutdown here.
244-
newActiveTransport.shutdown();
193+
ManagedClientTransport transport =
194+
transportFactory.newClientTransport(address, authority);
195+
log.log(Level.FINE, "Created transport {0} for {1}", new Object[] {transport, address});
196+
transports.add(transport);
197+
transport.start(new TransportListener(transport, delayedTransport, address));
245198
}
246-
return newActiveTransport;
247199
}
248200
};
249201

@@ -259,27 +211,13 @@ public ClientTransport call() {
259211
}
260212
}
261213
firstAttempt = false;
262-
if (log.isLoggable(Level.FINE)) {
263-
log.log(Level.FINE, "[{0}] Scheduling connection after {1} ms for {2}",
264-
new Object[]{getLogId(), delayMillis, address});
265-
}
266214
if (delayMillis <= 0) {
267215
reconnectTask = null;
268216
// No back-off this time.
269-
// Note createTransportRunnable is not supposed to run under the lock.
270-
return createTransportCallable;
217+
createTransportRunnable.run();
271218
} else {
272219
reconnectTask = scheduledExecutor.schedule(
273-
new Runnable() {
274-
@Override public void run() {
275-
try {
276-
createTransportCallable.call();
277-
} catch (Exception e) {
278-
throw Throwables.propagate(e);
279-
}
280-
}
281-
}, delayMillis, TimeUnit.MILLISECONDS);
282-
return null;
220+
createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS);
283221
}
284222
}
285223

@@ -301,7 +239,6 @@ final void shutdown() {
301239
if (transports.isEmpty()) {
302240
runCallback = true;
303241
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
304-
Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport");
305242
} // else: the callback will be run once all transports have been terminated
306243
}
307244
if (savedActiveTransport != null) {
@@ -320,11 +257,6 @@ private void cancelReconnectTask() {
320257
}
321258
}
322259

323-
@Override
324-
public String getLogId() {
325-
return GrpcUtil.getLogId(this);
326-
}
327-
328260
/** Shared base for both delayed and real transports. */
329261
private class BaseTransportListener implements ManagedClientTransport.Listener {
330262
protected final ManagedClientTransport transport;
@@ -345,9 +277,6 @@ public void transportTerminated() {
345277
synchronized (lock) {
346278
transports.remove(transport);
347279
if (shutdown && transports.isEmpty()) {
348-
if (log.isLoggable(Level.FINE)) {
349-
log.log(Level.FINE, "[{0}] Terminated", getLogId());
350-
}
351280
runCallback = true;
352281
cancelReconnectTask();
353282
}
@@ -361,56 +290,75 @@ public void transportTerminated() {
361290
/** Listener for real transports. */
362291
private class TransportListener extends BaseTransportListener {
363292
private final SocketAddress address;
293+
private final DelayedClientTransport delayedTransport;
364294

365-
public TransportListener(ManagedClientTransport transport, SocketAddress address) {
295+
public TransportListener(ManagedClientTransport transport,
296+
DelayedClientTransport delayedTransport, SocketAddress address) {
366297
super(transport);
367298
this.address = address;
368-
}
369-
370-
private boolean isAttachedToActiveTransport() {
371-
return activeTransport == transport;
299+
this.delayedTransport = delayedTransport;
372300
}
373301

374302
@Override
375303
public void transportReady() {
376-
if (log.isLoggable(Level.FINE)) {
377-
log.log(Level.FINE, "[{0}] {1} for {2} is ready",
378-
new Object[] {getLogId(), transport.getLogId(), address});
379-
}
304+
log.log(Level.FINE, "Transport {0} for {1} is ready", new Object[] {transport, address});
380305
super.transportReady();
306+
boolean savedShutdown;
381307
synchronized (lock) {
382-
if (isAttachedToActiveTransport()) {
383-
firstAttempt = true;
308+
savedShutdown = shutdown;
309+
firstAttempt = true;
310+
if (shutdown) {
311+
// If TransportSet already shutdown, transport is only to take care of pending
312+
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
313+
// as soon as it's set to the delayedTransport.
314+
// activeTransport should have already been set to null by shutdown(). We keep it null.
315+
Preconditions.checkState(activeTransport == null,
316+
"Unexpected non-null activeTransport");
317+
} else if (activeTransport == delayedTransport) {
318+
activeTransport = transport;
384319
}
385320
}
321+
delayedTransport.setTransport(transport);
322+
// This delayed transport will terminate and be removed from transports.
323+
delayedTransport.shutdown();
324+
if (savedShutdown) {
325+
// See comments in the synchronized block above on why we shutdown here.
326+
transport.shutdown();
327+
}
386328
loadBalancer.handleTransportReady(addressGroup);
387329
}
388330

389331
@Override
390332
public void transportShutdown(Status s) {
391-
if (log.isLoggable(Level.FINE)) {
392-
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
393-
new Object[] {getLogId(), transport.getLogId(), address, s});
394-
}
333+
log.log(Level.FINE, "Transport {0} for {1} is being shutdown with {2}",
334+
new Object[] {transport, address, s});
395335
super.transportShutdown(s);
396336
synchronized (lock) {
397-
if (isAttachedToActiveTransport()) {
337+
if (activeTransport == transport) {
398338
activeTransport = null;
339+
} else if (activeTransport == delayedTransport) {
340+
// Continue reconnect if there are still addresses to try.
341+
// Fail if all addresses have been tried and failed in a row.
342+
if (nextAddressIndex == 0) {
343+
delayedTransport.setTransport(new FailingClientTransport(s));
344+
delayedTransport.shutdown();
345+
activeTransport = null;
346+
} else {
347+
scheduleConnection(delayedTransport);
348+
}
399349
}
400350
}
401351
loadBalancer.handleTransportShutdown(addressGroup, s);
402352
}
403353

404354
@Override
405355
public void transportTerminated() {
406-
if (log.isLoggable(Level.FINE)) {
407-
log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
408-
new Object[] {getLogId(), transport.getLogId(), address});
409-
}
356+
log.log(Level.FINE, "Transport {0} for {1} is terminated",
357+
new Object[] {transport, address});
410358
super.transportTerminated();
411-
Preconditions.checkState(!isAttachedToActiveTransport(),
412-
"Listener is still attached to activeTransport. "
413-
+ "Seems transportTerminated was not called.");
359+
Preconditions.checkState(activeTransport != transport,
360+
"activeTransport still points to the delayedTransport. "
361+
+ "Seems transportShutdown() was not called.");
414362
}
415363
}
416364

0 commit comments

Comments
 (0)