Skip to content

Commit 2bb268a

Browse files
authored
Merge pull request #868 from cjihrig/bidi
grpc-js: add bidirectional streaming RPC support
2 parents e5acca5 + 1aa1152 commit 2bb268a

File tree

4 files changed

+114
-3
lines changed

4 files changed

+114
-3
lines changed

packages/grpc-js/src/server-call.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,21 @@ export class ServerWritableStreamImpl<RequestType, ResponseType> extends
204204
export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
205205
implements ServerDuplexStream<RequestType, ResponseType> {
206206
cancelled: boolean;
207+
private trailingMetadata: Metadata;
207208

208209
constructor(
209210
private call: Http2ServerCallStream<RequestType, ResponseType>,
210-
public metadata: Metadata, private _serialize: Serialize<ResponseType>,
211-
private _deserialize: Deserialize<RequestType>) {
211+
public metadata: Metadata, public serialize: Serialize<ResponseType>,
212+
public deserialize: Deserialize<RequestType>) {
212213
super({objectMode: true});
213214
this.cancelled = false;
215+
this.trailingMetadata = new Metadata();
216+
this.call.setupReadable(this);
217+
218+
this.on('error', (err) => {
219+
this.call.sendError(err as ServiceError);
220+
this.end();
221+
});
214222
}
215223

216224
getPeer(): string {
@@ -222,6 +230,14 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> extends Duplex
222230
}
223231
}
224232

233+
ServerDuplexStreamImpl.prototype._read =
234+
ServerReadableStreamImpl.prototype._read;
235+
ServerDuplexStreamImpl.prototype._write =
236+
ServerWritableStreamImpl.prototype._write;
237+
ServerDuplexStreamImpl.prototype._final =
238+
ServerWritableStreamImpl.prototype._final;
239+
ServerDuplexStreamImpl.prototype.end = ServerWritableStreamImpl.prototype.end;
240+
225241

226242
// Unary response callback signature.
227243
export type sendUnaryData<ResponseType> =

packages/grpc-js/src/server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,5 +355,12 @@ function handleBidiStreaming<RequestType, ResponseType>(
355355
call: Http2ServerCallStream<RequestType, ResponseType>,
356356
handler: BidiStreamingHandler<RequestType, ResponseType>,
357357
metadata: Metadata): void {
358-
throw new Error('not implemented yet');
358+
const stream = new ServerDuplexStreamImpl<RequestType, ResponseType>(
359+
call, metadata, handler.serialize, handler.deserialize);
360+
361+
if (call.cancelled) {
362+
return;
363+
}
364+
365+
handler.func(stream);
359366
}

packages/grpc-js/test/test-server-errors.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ describe('Client malformed response handling', () => {
147147
done();
148148
});
149149
});
150+
151+
it('should get an INTERNAL status with a bidi stream call', (done) => {
152+
const call = client.bidiStream();
153+
154+
call.on('data', noop);
155+
call.on('error', (err: ServiceError) => {
156+
assert(err);
157+
assert.strictEqual(err.code, grpc.status.INTERNAL);
158+
done();
159+
});
160+
161+
call.write({});
162+
call.end();
163+
});
150164
});
151165

152166
describe('Server serialization failure handling', () => {
@@ -444,6 +458,23 @@ describe('Other conditions', () => {
444458
done();
445459
});
446460
});
461+
462+
it('should respond correctly to a bidi stream', (done) => {
463+
const call = misbehavingClient.bidiStream();
464+
465+
call.on('data', (data: any) => {
466+
assert.fail(data);
467+
});
468+
469+
call.on('error', (err: ServiceError) => {
470+
assert(err);
471+
assert.strictEqual(err.code, grpc.status.INTERNAL);
472+
done();
473+
});
474+
475+
call.write(badArg);
476+
call.end();
477+
});
447478
});
448479

449480
describe('Trailing metadata', () => {
@@ -561,6 +592,33 @@ describe('Other conditions', () => {
561592
done();
562593
});
563594
});
595+
596+
it('should be present when a bidi stream succeeds', (done) => {
597+
const call = client.bidiStream();
598+
599+
call.write({error: false});
600+
call.write({error: false});
601+
call.end();
602+
call.on('data', noop);
603+
call.on('status', (status: grpc.StatusObject) => {
604+
assert.strictEqual(status.code, grpc.status.OK);
605+
assert.deepStrictEqual(status.metadata.get('trailer-present'), ['yes']);
606+
done();
607+
});
608+
});
609+
610+
it('should be present when a bidi stream fails', (done) => {
611+
const call = client.bidiStream();
612+
613+
call.write({error: false});
614+
call.write({error: true});
615+
call.end();
616+
call.on('data', noop);
617+
call.on('error', (error: ServiceError) => {
618+
assert.deepStrictEqual(error.metadata.get('trailer-present'), ['yes']);
619+
done();
620+
});
621+
});
564622
});
565623

566624
describe('Error object should contain the status', () => {
@@ -597,6 +655,20 @@ describe('Other conditions', () => {
597655
});
598656
});
599657

658+
it('for a bidi stream call', (done) => {
659+
const call = client.bidiStream();
660+
661+
call.write({error: false});
662+
call.write({error: true});
663+
call.end();
664+
call.on('data', noop);
665+
call.on('error', (error: ServiceError) => {
666+
assert.strictEqual(error.code, grpc.status.UNKNOWN);
667+
assert.strictEqual(error.details, 'Requested error');
668+
done();
669+
});
670+
});
671+
600672
it('for a UTF-8 error message', (done) => {
601673
client.unary(
602674
{error: true, message: '測試字符串'},

packages/grpc-js/test/test-server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,22 @@ describe('Server', () => {
286286
done();
287287
});
288288
});
289+
290+
it('should respond to a bidi call with UNIMPLEMENTED', (done) => {
291+
const call = client.divMany();
292+
293+
call.on('data', (value: any) => {
294+
assert.fail('No messages expected');
295+
});
296+
297+
call.on('error', (err: ServiceError) => {
298+
assert(err);
299+
assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED);
300+
done();
301+
});
302+
303+
call.end();
304+
});
289305
});
290306
});
291307

0 commit comments

Comments
 (0)