Skip to content

Commit 5b187ba

Browse files
authored
Merge pull request #908 from murgatroid99/pure_js_goaway_handling
Pure JS: Fixed two bugs with goaway handling
2 parents d015d1c + 1ee218c commit 5b187ba

File tree

4 files changed

+120
-9
lines changed

4 files changed

+120
-9
lines changed

packages/grpc-js/src/call-stream.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,16 @@ export class Http2CallStream extends Duplex implements Call {
130130
private endCall(status: StatusObject): void {
131131
if (this.finalStatus === null) {
132132
this.finalStatus = status;
133-
this.emit('status', status);
133+
/* We do this asynchronously to ensure that no async function is in the
134+
* call stack when we return control to the application. If an async
135+
* function is in the call stack, any exception thrown by the application
136+
* (or our tests) will bubble up and turn into promise rejection, which
137+
* will result in an UnhandledPromiseRejectionWarning. Because that is
138+
* a warning, the error will be effectively swallowed and execution will
139+
* continue */
140+
process.nextTick(() => {
141+
this.emit('status', status);
142+
});
134143
}
135144
}
136145

@@ -298,10 +307,10 @@ export class Http2CallStream extends Duplex implements Call {
298307
stream.on('end', () => {
299308
this.tryPush(null);
300309
});
301-
stream.on('close', async errorCode => {
310+
stream.on('close', async () => {
302311
let code: Status;
303312
let details = '';
304-
switch (errorCode) {
313+
switch (stream.rstCode) {
305314
case http2.constants.NGHTTP2_REFUSED_STREAM:
306315
code = Status.UNAVAILABLE;
307316
break;
@@ -329,11 +338,9 @@ export class Http2CallStream extends Duplex implements Call {
329338
this.endCall({ code, details, metadata: new Metadata() });
330339
});
331340
stream.on('error', (err: Error) => {
332-
this.endCall({
333-
code: Status.INTERNAL,
334-
details: 'Internal HTTP2 error',
335-
metadata: new Metadata(),
336-
});
341+
/* We need an error handler here to stop "Uncaught Error" exceptions
342+
* from bubbling up. However, errors here should all correspond to
343+
* "close" events, where we will handle the error more granularly */
337344
});
338345
if (!this.pendingRead) {
339346
stream.pause();

packages/grpc-js/src/subchannel.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ export class Http2SubChannel extends EventEmitter implements SubChannel {
7878
this.stopKeepalivePings();
7979
this.emit('close');
8080
});
81+
this.session.on('goaway', () => {
82+
this.stopKeepalivePings();
83+
this.emit('close');
84+
});
8185
this.userAgent = userAgent;
8286

8387
if (channelArgs['grpc.keepalive_time_ms']) {

packages/grpc-js/test/test-call-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ describe('CallStream', () => {
196196
reject(e);
197197
}
198198
});
199-
http2Stream.emit('close', Number(key));
199+
http2Stream.rstCode = Number(key);
200+
http2Stream.emit('close');
200201
});
201202
});
202203
});

test/api/connectivity_test.js

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2019 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
const options = {
19+
keepCase: true,
20+
longs: String,
21+
enums: String,
22+
defaults: true,
23+
oneofs: true
24+
};
25+
const path = require('path');
26+
const fs = require('fs');
27+
const assert = require('assert');
28+
const _ = require('lodash');
29+
const anyGrpc = require('../any_grpc');
30+
const clientGrpc = anyGrpc.client;
31+
const serverGrpc = anyGrpc.server;
32+
const protoLoader = require('../../packages/proto-loader', options);
33+
const testServiceDef = protoLoader.loadSync(__dirname + '/../proto/test_service.proto');
34+
const TestService = serverGrpc.loadPackageDefinition(testServiceDef).TestService.service;
35+
const TestServiceClient = clientGrpc.loadPackageDefinition(testServiceDef).TestService;
36+
37+
const clientCreds = clientGrpc.credentials.createInsecure();
38+
const serverCreds = serverGrpc.ServerCredentials.createInsecure();
39+
40+
const serviceImpl = {
41+
unary: function(call, cb) {
42+
cb(null, {});
43+
},
44+
clientStream: function(stream, cb){
45+
stream.on('data', function(data) {});
46+
stream.on('end', function() {
47+
cb(null, {});
48+
});
49+
},
50+
serverStream: function(stream) {
51+
stream.end();
52+
},
53+
bidiStream: function(stream) {
54+
stream.on('data', function(data) {});
55+
stream.on('end', function() {
56+
stream.end();
57+
});
58+
}
59+
};
60+
61+
describe('Reconnection', function() {
62+
let server1;
63+
let server2;
64+
let port;
65+
before(function() {
66+
server1 = new serverGrpc.Server();
67+
server1.addService(TestService, serviceImpl);
68+
server2 = new serverGrpc.Server();
69+
server2.addService(TestService, serviceImpl);
70+
port = server1.bind('localhost:0', serverCreds);
71+
server1.start();
72+
client = new TestServiceClient(`localhost:${port}`, clientCreds);
73+
});
74+
after(function() {
75+
server1.forceShutdown();
76+
server2.forceShutdown();
77+
});
78+
it('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) {
79+
client.unary({}, (err, data) => {
80+
assert.ifError(err);
81+
server1.tryShutdown(() => {
82+
server2.bind(`localhost:${port}`, serverCreds);
83+
server2.start();
84+
client.unary({}, (err, data) => {
85+
assert.ifError(err);
86+
clearInterval(callInterval);
87+
done();
88+
});
89+
});
90+
let callInterval = setInterval(() => {
91+
client.unary({}, (err, data) => {
92+
if (err) {
93+
assert.strictEqual(err.code, clientGrpc.status.UNAVAILABLE);
94+
}
95+
});
96+
}, 0);
97+
});
98+
});
99+
});

0 commit comments

Comments
 (0)