Skip to content

Commit eccd231

Browse files
committed
Don't hold a lock in DelayedStream when calling realStream
Our current lock ordering rules prohibit holding a lock when calling the channel and stream. This change avoids the lock for both DelayedClientTransport and DelayedStream. It is effectively a rewrite of DelayedStream. The fixes to ClientCallImpl were to ensure sane state in DelayedStream. Fixes #1510
1 parent b9c1232 commit eccd231

File tree

5 files changed

+308
-229
lines changed

5 files changed

+308
-229
lines changed

core/src/main/java/io/grpc/internal/ClientCallImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package io.grpc.internal;
3333

34+
import static com.google.common.base.Preconditions.checkArgument;
3435
import static com.google.common.base.Preconditions.checkNotNull;
3536
import static com.google.common.base.Preconditions.checkState;
3637
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -208,11 +209,11 @@ public void runInContext() {
208209
stream.setAuthority(callOptions.getAuthority());
209210
}
210211
stream.setCompressor(compressor);
212+
213+
stream.start(new ClientStreamListenerImpl(observer));
211214
if (compressor != Codec.Identity.NONE) {
212215
stream.setMessageCompression(true);
213216
}
214-
215-
stream.start(new ClientStreamListenerImpl(observer));
216217
// Delay any sources of cancellation after start(), because most of the transports are broken if
217218
// they receive cancel before start. Issue #1343 has more details
218219

@@ -269,6 +270,7 @@ private static Long getRemainingTimeoutNanos(@Nullable Long deadlineNanoTime) {
269270
@Override
270271
public void request(int numMessages) {
271272
Preconditions.checkState(stream != null, "Not started");
273+
checkArgument(numMessages >= 0, "Number requested must be non-negative");
272274
stream.request(numMessages);
273275
}
274276

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,44 +80,41 @@ public void start(Listener listener) {
8080
@Override
8181
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
8282
Supplier<ClientTransport> supplier = transportSupplier;
83+
if (supplier == null) {
84+
synchronized (lock) {
85+
// Check again, since it may have changed while waiting for lock
86+
supplier = transportSupplier;
87+
if (supplier == null && !shutdown) {
88+
PendingStream pendingStream = new PendingStream(method, headers);
89+
pendingStreams.add(pendingStream);
90+
return pendingStream;
91+
}
92+
}
93+
}
8394
if (supplier != null) {
8495
return supplier.get().newStream(method, headers);
8596
}
86-
synchronized (lock) {
87-
// Check again, since it may have changed while waiting for lock
88-
supplier = transportSupplier;
89-
if (supplier != null) {
90-
return supplier.get().newStream(method, headers);
91-
}
92-
if (!shutdown) {
93-
PendingStream pendingStream = new PendingStream(method, headers);
94-
pendingStreams.add(pendingStream);
95-
return pendingStream;
96-
}
97-
}
9897
return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown"));
9998
}
10099

101100
@Override
102101
public void ping(final PingCallback callback, Executor executor) {
103102
Supplier<ClientTransport> supplier = transportSupplier;
103+
if (supplier == null) {
104+
synchronized (lock) {
105+
// Check again, since it may have changed while waiting for lock
106+
supplier = transportSupplier;
107+
if (supplier == null && !shutdown) {
108+
PendingPing pendingPing = new PendingPing(callback, executor);
109+
pendingPings.add(pendingPing);
110+
return;
111+
}
112+
}
113+
}
104114
if (supplier != null) {
105115
supplier.get().ping(callback, executor);
106116
return;
107117
}
108-
synchronized (lock) {
109-
// Check again, since it may have changed while waiting for lock
110-
supplier = transportSupplier;
111-
if (supplier != null) {
112-
supplier.get().ping(callback, executor);
113-
return;
114-
}
115-
if (!shutdown) {
116-
PendingPing pendingPing = new PendingPing(callback, executor);
117-
pendingPings.add(pendingPing);
118-
return;
119-
}
120-
}
121118
executor.execute(new Runnable() {
122119
@Override public void run() {
123120
callback.onFailure(

0 commit comments

Comments
 (0)