Skip to content

Commit e571bd9

Browse files
authored
Merge pull request #923 from cjihrig/expose-server
grpc-js: expose Server implementation publicly
2 parents 170b72f + 68fbffa commit e571bd9

File tree

15 files changed

+165
-100
lines changed

15 files changed

+165
-100
lines changed

packages/grpc-js/src/compression-filter.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,7 @@ class UnknownHandler extends CompressionHandler {
138138
compressMessage(message: Buffer): Promise<Buffer> {
139139
return Promise.reject<Buffer>(
140140
new Error(
141-
`Received message compressed wth unsupported compression method ${
142-
this.compressionName
143-
}`
141+
`Received message compressed wth unsupported compression method ${this.compressionName}`
144142
)
145143
);
146144
}

packages/grpc-js/src/index.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
Serialize,
3939
} from './make-client';
4040
import { Metadata } from './metadata';
41+
import { Server } from './server';
4142
import { KeyCertPair, ServerCredentials } from './server-credentials';
4243
import { StatusBuilder } from './status-builder';
4344

@@ -259,10 +260,7 @@ export const setLogVerbosity = (verbosity: LogVerbosity): void => {
259260
logging.setLoggerVerbosity(verbosity);
260261
};
261262

262-
export const Server = (options: any) => {
263-
throw new Error('Not yet implemented');
264-
};
265-
263+
export { Server };
266264
export { ServerCredentials };
267265
export { KeyCertPair };
268266

packages/grpc-js/src/server.ts

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export class Server {
9191
string,
9292
UntypedHandler
9393
>();
94+
private sessions = new Set<http2.ServerHttp2Session>();
9495
private started = false;
9596

9697
constructor(options?: object) {}
@@ -223,7 +224,22 @@ export class Server {
223224
}
224225

225226
forceShutdown(): void {
226-
throw new Error('Not yet implemented');
227+
// Close the server if it is still running.
228+
if (this.http2Server && this.http2Server.listening) {
229+
this.http2Server.close();
230+
}
231+
232+
this.started = false;
233+
234+
// Always destroy any available sessions. It's possible that one or more
235+
// tryShutdown() calls are in progress. Don't wait on them to finish.
236+
this.sessions.forEach(session => {
237+
// Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to
238+
// recognize destroy(code) as a valid signature.
239+
// tslint:disable-next-line:no-any
240+
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
241+
});
242+
this.sessions.clear();
227243
}
228244

229245
register<RequestType, ResponseType>(
@@ -259,17 +275,34 @@ export class Server {
259275
}
260276

261277
tryShutdown(callback: (error?: Error) => void): void {
262-
callback = typeof callback === 'function' ? callback : noop;
278+
let pendingChecks = 0;
263279

264-
if (this.http2Server === null) {
265-
callback(new Error('server is not running'));
266-
return;
280+
function maybeCallback(): void {
281+
pendingChecks--;
282+
283+
if (pendingChecks === 0) {
284+
callback();
285+
}
267286
}
268287

269-
this.http2Server.close((err?: Error) => {
270-
this.started = false;
271-
callback(err);
288+
// Close the server if necessary.
289+
this.started = false;
290+
291+
if (this.http2Server && this.http2Server.listening) {
292+
pendingChecks++;
293+
this.http2Server.close(maybeCallback);
294+
}
295+
296+
// If any sessions are active, close them gracefully.
297+
pendingChecks += this.sessions.size;
298+
this.sessions.forEach(session => {
299+
session.close(maybeCallback);
272300
});
301+
302+
// If the server is closed and there are no active sessions, just call back.
303+
if (pendingChecks === 0) {
304+
callback();
305+
}
273306
}
274307

275308
addHttp2Port(): void {
@@ -341,7 +374,11 @@ export class Server {
341374
}
342375
} catch (err) {
343376
const call = new Http2ServerCallStream(stream, null!);
344-
err.code = Status.INTERNAL;
377+
378+
if (err.code === undefined) {
379+
err.code = Status.INTERNAL;
380+
}
381+
345382
call.sendError(err);
346383
}
347384
}
@@ -352,6 +389,8 @@ export class Server {
352389
session.destroy();
353390
return;
354391
}
392+
393+
this.sessions.add(session);
355394
});
356395
}
357396
}

packages/grpc-js/test/test-call-stream.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,7 @@ describe('CallStream', () => {
269269
frameLengths: range(0, 20).map(() => 1),
270270
},
271271
].forEach((testCase: { description: string; frameLengths: number[] }) => {
272-
it(`should handle a short message where ${
273-
testCase.description
274-
}`, done => {
272+
it(`should handle a short message where ${testCase.description}`, done => {
275273
const callStream = new Http2CallStream(
276274
'foo',
277275
{} as Http2Channel,

packages/grpc-js/test/test-server-deadlines.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import * as assert from 'assert';
2121
import * as path from 'path';
2222

2323
import * as grpc from '../src';
24-
import { ServerCredentials } from '../src';
24+
import { Server, ServerCredentials } from '../src';
2525
import { ServiceError } from '../src/call';
2626
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
27-
import { Server } from '../src/server';
2827
import {
2928
sendUnaryData,
3029
ServerUnaryCall,

packages/grpc-js/test/test-server-errors.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import * as assert from 'assert';
2121
import { join } from 'path';
2222

2323
import * as grpc from '../src';
24+
import { Server } from '../src';
2425
import { ServiceError } from '../src/call';
2526
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
26-
import { Server } from '../src/server';
2727
import {
2828
sendUnaryData,
2929
ServerDuplexStream,
@@ -386,9 +386,9 @@ describe('Other conditions', () => {
386386
});
387387
});
388388

389-
after(done => {
389+
after(() => {
390390
client.close();
391-
server.tryShutdown(done);
391+
server.forceShutdown();
392392
});
393393

394394
describe('Server receiving bad input', () => {

packages/grpc-js/test/test-server.ts

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import * as http2 from 'http2';
2323
import * as path from 'path';
2424

2525
import * as grpc from '../src';
26-
import { ServerCredentials } from '../src';
26+
import { Server, ServerCredentials } from '../src';
2727
import { ServiceError } from '../src/call';
2828
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
29-
import { Server } from '../src/server';
3029
import { sendUnaryData, ServerUnaryCall } from '../src/server-call';
3130

3231
import { loadProtoFile } from './common';
@@ -128,17 +127,6 @@ describe('Server', () => {
128127
});
129128
});
130129

131-
describe('tryShutdown', () => {
132-
it('calls back with an error if the server is not running', done => {
133-
const server = new Server();
134-
135-
server.tryShutdown(err => {
136-
assert(err !== undefined && err.message === 'server is not running');
137-
done();
138-
});
139-
});
140-
});
141-
142130
describe('start', () => {
143131
let server: Server;
144132

@@ -238,10 +226,6 @@ describe('Server', () => {
238226
server.addProtoService();
239227
}, /Not implemented. Use addService\(\) instead/);
240228

241-
assert.throws(() => {
242-
server.forceShutdown();
243-
}, /Not yet implemented/);
244-
245229
assert.throws(() => {
246230
server.addHttp2Port();
247231
}, /Not yet implemented/);

test/api/connectivity_test.js

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,29 @@ describe('Reconnection', function() {
6262
let server1;
6363
let server2;
6464
let port;
65-
before(function() {
65+
before(function(done) {
6666
server1 = new serverGrpc.Server();
6767
server1.addService(TestService, serviceImpl);
6868
server2 = new serverGrpc.Server();
6969
server2.addService(TestService, serviceImpl);
70-
port = server1.bind('localhost:0', serverCreds);
71-
server1.start();
72-
client = new TestServiceClient(`localhost:${port}`, clientCreds);
70+
server1.bindAsync('localhost:0', serverCreds, (err, _port) => {
71+
assert.ifError(err);
72+
server1.start();
73+
port = _port;
74+
client = new TestServiceClient(`localhost:${port}`, clientCreds);
75+
done();
76+
});
7377
});
7478
after(function() {
79+
client.close();
7580
server1.forceShutdown();
7681
server2.forceShutdown();
7782
});
78-
it('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) {
83+
it.skip('Should end with either OK or UNAVAILABLE when querying a server that is shutting down', function(done) {
84+
this.timeout(10000);
7985
let pendingCalls = 0;
8086
let testDone = false;
87+
let callInterval;
8188
function maybeDone() {
8289
if (testDone && pendingCalls === 0) {
8390
done();
@@ -86,16 +93,20 @@ describe('Reconnection', function() {
8693
client.unary({}, (err, data) => {
8794
assert.ifError(err);
8895
server1.tryShutdown(() => {
89-
server2.bind(`localhost:${port}`, serverCreds);
90-
server2.start();
91-
client.unary({}, (err, data) => {
96+
server2.bindAsync(`localhost:${port}`, serverCreds, (err) => {
9297
assert.ifError(err);
93-
clearInterval(callInterval);
94-
testDone = true;
95-
maybeDone();
98+
server2.start();
99+
const metadata = new clientGrpc.Metadata({ waitForReady: true });
100+
client.unary({}, metadata, (err, data) => {
101+
assert.ifError(err);
102+
clearInterval(callInterval);
103+
testDone = true;
104+
maybeDone();
105+
});
96106
});
97107
});
98-
let callInterval = setInterval(() => {
108+
callInterval = setInterval(() => {
109+
assert.strictEqual(testDone, false);
99110
pendingCalls += 1;
100111
client.unary({}, (err, data) => {
101112
pendingCalls -= 1;
@@ -107,4 +118,4 @@ describe('Reconnection', function() {
107118
}, 0);
108119
});
109120
});
110-
});
121+
});

test/api/error_test.js

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ describe('Client malformed response handling', function() {
3737
var server;
3838
var client;
3939
var badArg = Buffer.from([0xFF]);
40-
before(function() {
40+
before(function(done) {
4141
var malformed_test_service = {
4242
unary: {
4343
path: '/TestService/Unary',
@@ -93,9 +93,12 @@ describe('Client malformed response handling', function() {
9393
});
9494
}
9595
});
96-
var port = server.bind('localhost:0', serverInsecureCreds);
97-
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
98-
server.start();
96+
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
97+
assert.ifError(err);
98+
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
99+
server.start();
100+
done();
101+
});
99102
});
100103
after(function() {
101104
server.forceShutdown();
@@ -141,7 +144,7 @@ describe('Client malformed response handling', function() {
141144
}
142145
var client;
143146
var server;
144-
before(function() {
147+
before(function(done) {
145148
var malformed_test_service = {
146149
unary: {
147150
path: '/TestService/Unary',
@@ -197,9 +200,12 @@ describe('Client malformed response handling', function() {
197200
});
198201
}
199202
});
200-
var port = server.bind('localhost:0', serverInsecureCreds);
201-
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
202-
server.start();
203+
server.bindAsync('localhost:0', serverInsecureCreds, (err, port) => {
204+
assert.ifError(err);
205+
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
206+
server.start();
207+
done();
208+
});
203209
});
204210
after(function() {
205211
server.forceShutdown();
@@ -244,7 +250,7 @@ describe('Client malformed response handling', function() {
244250
var client;
245251
var server;
246252
var port;
247-
before(function() {
253+
before(function(done) {
248254
server = new serverGrpc.Server();
249255
var trailer_metadata = new serverGrpc.Metadata();
250256
trailer_metadata.add('trailer-present', 'yes');
@@ -323,9 +329,13 @@ describe('Client malformed response handling', function() {
323329
});
324330
}
325331
});
326-
port = server.bind('localhost:0', serverInsecureCreds);
327-
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
328-
server.start();
332+
server.bindAsync('localhost:0', serverInsecureCreds, (err, _port) => {
333+
assert.ifError(err);
334+
port = _port;
335+
client = new TestServiceClient('localhost:' + port, clientInsecureCreds);
336+
server.start();
337+
done();
338+
});
329339
});
330340
after(function() {
331341
server.forceShutdown();

test/api/interop_helper/server.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
'use strict';
2020

21+
const assert = require('assert');
2122
const interopServer = require('../../interop/interop_server.js');
2223

23-
const serverObj = interopServer.getServer(0, true);
24-
serverObj.server.start();
25-
process.send({port: serverObj.port});
26-
// The only message from the driver should be to stop the server
27-
process.on('message', (message) => {
28-
serverObj.server.forceShutdown();
24+
interopServer.getServer(0, true, (err, serverObj) => {
25+
assert.ifError(err);
26+
serverObj.server.start();
27+
process.send({port: serverObj.port});
28+
// The only message from the driver should be to stop the server
29+
process.on('message', (message) => {
30+
serverObj.server.forceShutdown();
31+
});
2932
});

0 commit comments

Comments
 (0)