Skip to content

Commit 08c74d4

Browse files
committed
Only link delayed transport AFTER real transport has called transportReady().
If TransportSet fails to connect a transport (i.e., transportShutdown() called without transportReady()), TransportSet will automatically schedule reconnection for the next address, unless it has reached the end of the address list, in which case it will fail the delayed transport. This will reduce stream errors caused by bad addresses appearing before good addresses in the resolved address list. Before this change, TransportSet would return the real transport on the first call of obtainActiveTransport(). After this change, it will return the delayed transport instead.
1 parent 27d8489 commit 08c74d4

File tree

5 files changed

+236
-120
lines changed

5 files changed

+236
-120
lines changed

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.LinkedHashSet;
4646
import java.util.concurrent.Executor;
4747

48+
import javax.annotation.Nullable;
4849
import javax.annotation.concurrent.GuardedBy;
4950

5051
/**
@@ -232,6 +233,12 @@ public String getLogId() {
232233
return GrpcUtil.getLogId(this);
233234
}
234235

236+
@VisibleForTesting
237+
@Nullable
238+
Supplier<ClientTransport> getTransportSupplier() {
239+
return transportSupplier;
240+
}
241+
235242
private class PendingStream extends DelayedStream {
236243
private final MethodDescriptor<?, ?> method;
237244
private final Metadata headers;

core/src/main/java/io/grpc/internal/TransportSet.java

Lines changed: 53 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.base.Preconditions;
3636
import com.google.common.base.Stopwatch;
37-
import com.google.common.base.Throwables;
3837

3938
import io.grpc.EquivalentAddressGroup;
4039
import io.grpc.LoadBalancer;
@@ -44,7 +43,6 @@
4443
import java.util.ArrayList;
4544
import java.util.Collection;
4645
import java.util.List;
47-
import java.util.concurrent.Callable;
4846
import java.util.concurrent.ScheduledExecutorService;
4947
import java.util.concurrent.ScheduledFuture;
5048
import java.util.concurrent.TimeUnit;
@@ -108,20 +106,16 @@ final class TransportSet implements WithLogId {
108106
/*
109107
* The transport for new outgoing requests.
110108
* - If shutdown == true, activeTransport is null (shutdown)
111-
* - Otherwise, if delayedTransport != null,
112-
* activeTransport is delayedTransport (waiting to connect)
109+
* - Otherwise, if a connection is pending or connecting,
110+
* activeTransport is a DelayedClientTransport
113111
* - Otherwise, activeTransport is either null (initially or when idle)
114-
* or points to a real transport (when connecting or connected).
112+
* or points to a real transport (when ready).
115113
*
116114
* 'lock' must be held when assigning to it.
117115
*/
118116
@Nullable
119117
private volatile ManagedClientTransport activeTransport;
120118

121-
@GuardedBy("lock")
122-
@Nullable
123-
private DelayedClientTransport delayedTransport;
124-
125119
TransportSet(EquivalentAddressGroup addressGroup, String authority,
126120
LoadBalancer<ClientTransport> loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
127121
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
@@ -155,40 +149,24 @@ final ClientTransport obtainActiveTransport() {
155149
if (savedTransport != null) {
156150
return savedTransport;
157151
}
158-
Callable<ClientTransport> immediateConnectionTask = null;
159152
synchronized (lock) {
160153
// Check again, since it could have changed before acquiring the lock
161154
if (activeTransport == null) {
162155
if (shutdown) {
163156
return SHUTDOWN_TRANSPORT;
164157
}
165-
delayedTransport = new DelayedClientTransport();
158+
DelayedClientTransport delayedTransport = new DelayedClientTransport();
166159
transports.add(delayedTransport);
167160
delayedTransport.start(new BaseTransportListener(delayedTransport));
168161
activeTransport = delayedTransport;
169-
immediateConnectionTask = scheduleConnection();
170-
}
171-
savedTransport = activeTransport;
172-
}
173-
if (immediateConnectionTask != null) {
174-
try {
175-
return immediateConnectionTask.call();
176-
} catch (Exception e) {
177-
throw Throwables.propagate(e);
162+
scheduleConnection(delayedTransport);
178163
}
164+
return activeTransport;
179165
}
180-
return savedTransport;
181166
}
182167

183-
/**
184-
* Schedule a task that creates a new transport.
185-
*
186-
* @return if not {@code null}, caller should run the returned callable outside of lock. The
187-
* callable returns the real transport that has been created.
188-
*/
189-
@Nullable
190168
@GuardedBy("lock")
191-
private Callable<ClientTransport> scheduleConnection() {
169+
private void scheduleConnection(final DelayedClientTransport delayedTransport) {
192170
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
193171
"previous reconnectTask is not done");
194172

@@ -203,47 +181,23 @@ private Callable<ClientTransport> scheduleConnection() {
203181
nextAddressIndex = 0;
204182
}
205183

206-
final Callable<ClientTransport> createTransportCallable = new Callable<ClientTransport>() {
184+
Runnable createTransportRunnable = new Runnable() {
207185
@Override
208-
public ClientTransport call() {
209-
DelayedClientTransport savedDelayedTransport;
210-
ManagedClientTransport newActiveTransport;
211-
boolean savedShutdown;
186+
public void run() {
212187
synchronized (lock) {
213-
savedShutdown = shutdown;
214188
reconnectTask = null;
215189
if (currentAddressIndex == 0) {
216190
backoffWatch.reset().start();
217191
}
218-
newActiveTransport = transportFactory.newClientTransport(address, authority);
192+
ManagedClientTransport transport =
193+
transportFactory.newClientTransport(address, authority);
219194
if (log.isLoggable(Level.FINE)) {
220195
log.log(Level.FINE, "[{0}] Created {1} for {2}",
221-
new Object[] {getLogId(), newActiveTransport.getLogId(), address});
196+
new Object[] {getLogId(), transport.getLogId(), address});
222197
}
223-
transports.add(newActiveTransport);
224-
newActiveTransport.start(
225-
new TransportListener(newActiveTransport, address));
226-
if (shutdown) {
227-
// If TransportSet already shutdown, newActiveTransport is only to take care of pending
228-
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
229-
// as soon as it's set to the delayedTransport.
230-
// activeTransport should have already been set to null by shutdown(). We keep it null.
231-
Preconditions.checkState(activeTransport == null,
232-
"Unexpected non-null activeTransport");
233-
} else {
234-
activeTransport = newActiveTransport;
235-
}
236-
savedDelayedTransport = delayedTransport;
237-
delayedTransport = null;
238-
}
239-
savedDelayedTransport.setTransport(newActiveTransport);
240-
// This delayed transport will terminate and be removed from transports.
241-
savedDelayedTransport.shutdown();
242-
if (savedShutdown) {
243-
// See comments in the synchronized block above on why we shutdown here.
244-
newActiveTransport.shutdown();
198+
transports.add(transport);
199+
transport.start(new TransportListener(transport, delayedTransport, address));
245200
}
246-
return newActiveTransport;
247201
}
248202
};
249203

@@ -266,20 +220,10 @@ public ClientTransport call() {
266220
if (delayMillis <= 0) {
267221
reconnectTask = null;
268222
// No back-off this time.
269-
// Note createTransportRunnable is not supposed to run under the lock.
270-
return createTransportCallable;
223+
createTransportRunnable.run();
271224
} else {
272225
reconnectTask = scheduledExecutor.schedule(
273-
new Runnable() {
274-
@Override public void run() {
275-
try {
276-
createTransportCallable.call();
277-
} catch (Exception e) {
278-
throw Throwables.propagate(e);
279-
}
280-
}
281-
}, delayMillis, TimeUnit.MILLISECONDS);
282-
return null;
226+
createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS);
283227
}
284228
}
285229

@@ -301,7 +245,6 @@ final void shutdown() {
301245
if (transports.isEmpty()) {
302246
runCallback = true;
303247
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
304-
Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport");
305248
} // else: the callback will be run once all transports have been terminated
306249
}
307250
if (savedActiveTransport != null) {
@@ -361,14 +304,13 @@ public void transportTerminated() {
361304
/** Listener for real transports. */
362305
private class TransportListener extends BaseTransportListener {
363306
private final SocketAddress address;
307+
private final DelayedClientTransport delayedTransport;
364308

365-
public TransportListener(ManagedClientTransport transport, SocketAddress address) {
309+
public TransportListener(ManagedClientTransport transport,
310+
DelayedClientTransport delayedTransport, SocketAddress address) {
366311
super(transport);
367312
this.address = address;
368-
}
369-
370-
private boolean isAttachedToActiveTransport() {
371-
return activeTransport == transport;
313+
this.delayedTransport = delayedTransport;
372314
}
373315

374316
@Override
@@ -378,11 +320,28 @@ public void transportReady() {
378320
new Object[] {getLogId(), transport.getLogId(), address});
379321
}
380322
super.transportReady();
323+
boolean savedShutdown;
381324
synchronized (lock) {
382-
if (isAttachedToActiveTransport()) {
383-
firstAttempt = true;
325+
savedShutdown = shutdown;
326+
firstAttempt = true;
327+
if (shutdown) {
328+
// If TransportSet already shutdown, transport is only to take care of pending
329+
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
330+
// as soon as it's set to the delayedTransport.
331+
// activeTransport should have already been set to null by shutdown(). We keep it null.
332+
Preconditions.checkState(activeTransport == null,
333+
"Unexpected non-null activeTransport");
334+
} else if (activeTransport == delayedTransport) {
335+
activeTransport = transport;
384336
}
385337
}
338+
delayedTransport.setTransport(transport);
339+
// This delayed transport will terminate and be removed from transports.
340+
delayedTransport.shutdown();
341+
if (savedShutdown) {
342+
// See comments in the synchronized block above on why we shutdown here.
343+
transport.shutdown();
344+
}
386345
loadBalancer.handleTransportReady(addressGroup);
387346
}
388347

@@ -394,8 +353,18 @@ public void transportShutdown(Status s) {
394353
}
395354
super.transportShutdown(s);
396355
synchronized (lock) {
397-
if (isAttachedToActiveTransport()) {
356+
if (activeTransport == transport) {
398357
activeTransport = null;
358+
} else if (activeTransport == delayedTransport) {
359+
// Continue reconnect if there are still addresses to try.
360+
// Fail if all addresses have been tried and failed in a row.
361+
if (nextAddressIndex == 0) {
362+
delayedTransport.setTransport(new FailingClientTransport(s));
363+
delayedTransport.shutdown();
364+
activeTransport = null;
365+
} else {
366+
scheduleConnection(delayedTransport);
367+
}
399368
}
400369
}
401370
loadBalancer.handleTransportShutdown(addressGroup, s);
@@ -408,9 +377,9 @@ public void transportTerminated() {
408377
new Object[] {getLogId(), transport.getLogId(), address});
409378
}
410379
super.transportTerminated();
411-
Preconditions.checkState(!isAttachedToActiveTransport(),
412-
"Listener is still attached to activeTransport. "
413-
+ "Seems transportTerminated was not called.");
380+
Preconditions.checkState(activeTransport != transport,
381+
"activeTransport still points to the delayedTransport. "
382+
+ "Seems transportShutdown() was not called.");
414383
}
415384
}
416385

0 commit comments

Comments
 (0)