Skip to content

Commit d209a34

Browse files
authored
Merge branch 'grpc:master' into master
2 parents 1247129 + 271c848 commit d209a34

34 files changed

+2584
-609
lines changed

packages/grpc-js-xds/interop/xds-interop-client.ts

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,27 @@ class CallStatsTracker {
180180
}
181181
}
182182

183+
class RecentTimestampList {
184+
private timeList: bigint[] = [];
185+
private nextIndex = 0;
186+
187+
constructor(private readonly size: number) {}
188+
189+
isFull() {
190+
return this.timeList.length === this.size;
191+
}
192+
193+
insertTimestamp(timestamp: bigint) {
194+
this.timeList[this.nextIndex] = timestamp;
195+
this.nextIndex = (this.nextIndex + 1) % this.size;
196+
}
197+
198+
getSpan(): bigint {
199+
const lastIndex = (this.nextIndex + this.size - 1) % this.size;
200+
return this.timeList[lastIndex] - this.timeList[this.nextIndex];
201+
}
202+
}
203+
183204
type CallType = 'EmptyCall' | 'UnaryCall';
184205

185206
interface ClientConfiguration {
@@ -246,27 +267,34 @@ const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = {
246267
EmptyCall: {}
247268
}
248269

249-
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
270+
/**
271+
* Timestamps output by process.hrtime.bigint() are a bigint number of
272+
* nanoseconds. This is the representation of 1 second in that context.
273+
*/
274+
const TIMESTAMP_ONE_SECOND = BigInt(1e9);
275+
276+
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker, callStartTimestamps: RecentTimestampList) {
250277
const callEnumName = callTypeEnumMapReverse[type];
251278
addAccumulatedCallStarted(callEnumName);
252279
const notifier = callStatsTracker.startCall();
253280
let gotMetadata: boolean = false;
254281
let hostname: string | null = null;
255282
let completed: boolean = false;
256283
let completedWithError: boolean = false;
257-
const startTime = process.hrtime();
284+
const startTime = process.hrtime.bigint();
258285
const deadline = new Date();
259286
deadline.setSeconds(deadline.getSeconds() + currentConfig.timeoutSec);
260287
const callback = (error: grpc.ServiceError | undefined, value: Empty__Output | undefined) => {
261288
const statusCode = error?.code ?? grpc.status.OK;
262-
const duration = process.hrtime(startTime);
289+
const duration = process.hrtime.bigint() - startTime;
290+
const durationSeconds = Number(duration / TIMESTAMP_ONE_SECOND) | 0;
263291
if (!callTimeHistogram[type][statusCode]) {
264292
callTimeHistogram[type][statusCode] = [];
265293
}
266-
if (callTimeHistogram[type][statusCode][duration[0]]) {
267-
callTimeHistogram[type][statusCode][duration[0]] += 1;
294+
if (callTimeHistogram[type][statusCode][durationSeconds]) {
295+
callTimeHistogram[type][statusCode][durationSeconds] += 1;
268296
} else {
269-
callTimeHistogram[type][statusCode][duration[0]] = 1;
297+
callTimeHistogram[type][statusCode][durationSeconds] = 1;
270298
}
271299
addAccumulatedCallEnded(callEnumName, statusCode);
272300
if (error) {
@@ -301,13 +329,28 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
301329
}
302330
}
303331
});
304-
332+
/* callStartTimestamps tracks the last N timestamps of started calls, where N
333+
* is the target QPS. If the measured span of time between the first and last
334+
* of those N calls is greater than 1 second, we make another call
335+
* ~immediately to correct for that. */
336+
callStartTimestamps.insertTimestamp(startTime);
337+
if (callStartTimestamps.isFull()) {
338+
if (callStartTimestamps.getSpan() > TIMESTAMP_ONE_SECOND) {
339+
setImmediate(() => {
340+
makeSingleRequest(client, type, failOnFailedRpcs, callStatsTracker, callStartTimestamps);
341+
});
342+
}
343+
}
305344
}
306345

307346
function sendConstantQps(client: TestServiceClient, qps: number, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
347+
const callStartTimestampsTrackers: {[callType: string]: RecentTimestampList} = {};
348+
for (const callType of ['EmptyCall', 'UnaryCall']) {
349+
callStartTimestampsTrackers[callType] = new RecentTimestampList(qps);
350+
}
308351
setInterval(() => {
309352
for (const callType of currentConfig.callTypes) {
310-
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker);
353+
makeSingleRequest(client, callType, failOnFailedRpcs, callStatsTracker, callStartTimestampsTrackers[callType]);
311354
}
312355
}, 1000/qps);
313356
setInterval(() => {

packages/grpc-js-xds/scripts/xds.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ git clone -b master --single-branch --depth=1 https://github.com/grpc/grpc.git
4848

4949
grpc/tools/run_tests/helper_scripts/prep_xds.sh
5050

51+
mkdir -p "${KOKORO_ARTIFACTS_DIR}/github/grpc/reports"
52+
5153
GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weighted_target,round_robin,resolving_load_balancer,subchannel,keepalive,dns_resolver,fault_injection,http_filter,csds \
5254
GRPC_NODE_VERBOSITY=DEBUG \
5355
NODE_XDS_INTEROP_VERBOSITY=1 \
@@ -59,7 +61,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
5961
--gcp_suffix=$(date '+%s') \
6062
--verbose \
6163
${XDS_V3_OPT-} \
62-
--client_cmd="$(which node) --enable-source-maps grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
64+
--client_cmd="$(which node) --enable-source-maps --prof --logfile=${KOKORO_ARTIFACTS_DIR}/github/grpc/reports/prof.log grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \
6365
--server=xds:///{server_uri} \
6466
--stats_port={stats_port} \
6567
--qps={qps} \

packages/grpc-js-xds/src/xds-client.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import * as protoLoader from '@grpc/proto-loader';
1919
// This is a non-public, unstable API, but it's very convenient
2020
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util';
21-
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel } from '@grpc/grpc-js';
21+
import { loadPackageDefinition, StatusObject, status, logVerbosity, Metadata, experimental, ChannelOptions, ClientDuplexStream, ServiceError, ChannelCredentials, Channel, connectivityState } from '@grpc/grpc-js';
2222
import * as adsTypes from './generated/ads';
2323
import * as lrsTypes from './generated/lrs';
2424
import { loadBootstrapInfo } from './xds-bootstrap';
@@ -255,6 +255,7 @@ export class XdsClient {
255255
DiscoveryRequest,
256256
DiscoveryResponse__Output
257257
> | null = null;
258+
private receivedAdsResponseOnCurrentStream = false;
258259

259260
private lrsNode: Node | null = null;
260261
private lrsClient: LoadReportingServiceClient | null = null;
@@ -373,6 +374,9 @@ export class XdsClient {
373374
{channelOverride: channel}
374375
);
375376
this.maybeStartAdsStream();
377+
channel.watchConnectivityState(channel.getConnectivityState(false), Infinity, () => {
378+
this.handleAdsConnectivityStateUpdate();
379+
})
376380

377381
this.lrsClient = new protoDefinitions.envoy.service.load_stats.v3.LoadReportingService(
378382
serverUri,
@@ -394,7 +398,29 @@ export class XdsClient {
394398
clearInterval(this.statsTimer);
395399
}
396400

401+
private handleAdsConnectivityStateUpdate() {
402+
if (!this.adsClient) {
403+
return;
404+
}
405+
const state = this.adsClient.getChannel().getConnectivityState(false);
406+
if (state === connectivityState.READY && this.adsCall) {
407+
this.reportAdsStreamStarted();
408+
}
409+
if (state === connectivityState.TRANSIENT_FAILURE) {
410+
this.reportStreamError({
411+
code: status.UNAVAILABLE,
412+
details: 'No connection established to xDS server',
413+
metadata: new Metadata()
414+
});
415+
}
416+
this.adsClient.getChannel().watchConnectivityState(state, Infinity, () => {
417+
this.handleAdsConnectivityStateUpdate();
418+
});
419+
}
420+
397421
private handleAdsResponse(message: DiscoveryResponse__Output) {
422+
this.receivedAdsResponseOnCurrentStream = true;
423+
this.adsBackoff.reset();
398424
let handleResponseResult: {
399425
result: HandleResponseResult;
400426
serviceKind: AdsServiceKind;
@@ -466,7 +492,7 @@ export class XdsClient {
466492
'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details
467493
);
468494
this.adsCall = null;
469-
if (streamStatus.code !== status.OK) {
495+
if (streamStatus.code !== status.OK && !this.receivedAdsResponseOnCurrentStream) {
470496
this.reportStreamError(streamStatus);
471497
}
472498
/* If the backoff timer is no longer running, we do not need to wait any
@@ -496,7 +522,9 @@ export class XdsClient {
496522
if (this.adsCall !== null) {
497523
return;
498524
}
499-
this.adsCall = this.adsClient.StreamAggregatedResources();
525+
this.receivedAdsResponseOnCurrentStream = false;
526+
const metadata = new Metadata({waitForReady: true});
527+
this.adsCall = this.adsClient.StreamAggregatedResources(metadata);
500528
this.adsCall.on('data', (message: DiscoveryResponse__Output) => {
501529
this.handleAdsResponse(message);
502530
});
@@ -515,7 +543,9 @@ export class XdsClient {
515543
this.updateNames(service);
516544
}
517545
}
518-
this.reportAdsStreamStarted();
546+
if (this.adsClient.getChannel().getConnectivityState(false) === connectivityState.READY) {
547+
this.reportAdsStreamStarted();
548+
}
519549
}
520550

521551
private maybeSendAdsMessage(typeUrl: string, resourceNames: string[], responseNonce: string, versionInfo: string, errorMessage?: string) {
@@ -547,10 +577,6 @@ export class XdsClient {
547577
* version info are updated so that it sends the post-update values.
548578
*/
549579
ack(serviceKind: AdsServiceKind) {
550-
/* An ack is the best indication of a successful interaction between the
551-
* client and the server, so we can reset the backoff timer here. */
552-
this.adsBackoff.reset();
553-
554580
this.updateNames(serviceKind);
555581
}
556582

packages/grpc-js-xds/src/xds-stream-state/xds-stream-state.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ export abstract class BaseXdsStreamState<ResponseType> implements XdsStreamState
213213
}
214214

215215
reportAdsStreamStart() {
216+
if (this.isAdsStreamRunning) {
217+
return;
218+
}
216219
this.isAdsStreamRunning = true;
217220
for (const subscriptionEntry of this.subscriptions.values()) {
218221
if (subscriptionEntry.cachedResponse === null) {

packages/grpc-js/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
5959
- `grpc.default_compression_algorithm`
6060
- `grpc.enable_channelz`
6161
- `grpc.dns_min_time_between_resolutions_ms`
62+
- `grpc.enable_retries`
63+
- `grpc.per_rpc_retry_buffer_size`
64+
- `grpc.retry_buffer_size`
6265
- `grpc-node.max_session_memory`
6366
- `channelOverride`
6467
- `channelFactoryOverride`

packages/grpc-js/package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@grpc/grpc-js",
3-
"version": "1.7.2",
3+
"version": "1.7.3",
44
"description": "gRPC Library for Node - pure JS implementation",
55
"homepage": "https://grpc.io/",
66
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
@@ -17,14 +17,14 @@
1717
"devDependencies": {
1818
"@types/gulp": "^4.0.6",
1919
"@types/gulp-mocha": "0.0.32",
20-
"@types/lodash": "^4.14.108",
20+
"@types/lodash": "^4.14.186",
2121
"@types/mocha": "^5.2.6",
2222
"@types/ncp": "^2.0.1",
2323
"@types/pify": "^3.0.2",
2424
"@types/semver": "^7.3.9",
2525
"clang-format": "^1.0.55",
2626
"execa": "^2.0.3",
27-
"gts": "^2.0.0",
27+
"gts": "^3.1.1",
2828
"gulp": "^4.0.2",
2929
"gulp-mocha": "^6.0.0",
3030
"lodash": "^4.17.4",
@@ -35,7 +35,7 @@
3535
"rimraf": "^3.0.2",
3636
"semver": "^7.3.5",
3737
"ts-node": "^8.3.0",
38-
"typescript": "^3.7.2"
38+
"typescript": "^4.8.4"
3939
},
4040
"contributors": [
4141
{

packages/grpc-js/src/call-credentials.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ export abstract class CallCredentials {
115115
reject(err);
116116
return;
117117
}
118+
if (!headers) {
119+
reject(new Error('Headers not set by metadata plugin'));
120+
return;
121+
}
118122
resolve(headers);
119123
}
120124
);

packages/grpc-js/src/call-interface.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ export interface StatusObject {
3636
metadata: Metadata;
3737
}
3838

39+
export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
40+
metadata: Metadata | null;
41+
}
42+
3943
export const enum WriteFlags {
4044
BufferHint = 1,
4145
NoCompress = 2,

packages/grpc-js/src/channel-options.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ export interface ChannelOptions {
4444
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
4545
'grpc.enable_channelz'?: number;
4646
'grpc.dns_min_time_between_resolutions_ms'?: number;
47+
'grpc.enable_retries'?: number;
48+
'grpc.per_rpc_retry_buffer_size'?: number;
49+
/* This option is pattered like a core option, but the core does not have
50+
* this option. It is closely related to the option
51+
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
52+
* implement this functionality using the ResourceQuota mechanism, so there
53+
* will probably not be any collision or other inconsistency. */
54+
'grpc.retry_buffer_size'?: number;
55+
'grpc.max_connection_age_ms'?: number;
56+
'grpc.max_connection_age_grace_ms'?: number;
4757
'grpc-node.max_session_memory'?: number;
4858
// eslint-disable-next-line @typescript-eslint/no-explicit-any
4959
[key: string]: any;
@@ -71,6 +81,11 @@ export const recognizedOptions = {
7181
'grpc.enable_http_proxy': true,
7282
'grpc.enable_channelz': true,
7383
'grpc.dns_min_time_between_resolutions_ms': true,
84+
'grpc.enable_retries': true,
85+
'grpc.per_rpc_retry_buffer_size': true,
86+
'grpc.retry_buffer_size': true,
87+
'grpc.max_connection_age_ms': true,
88+
'grpc.max_connection_age_grace_ms': true,
7489
'grpc-node.max_session_memory': true,
7590
};
7691

packages/grpc-js/src/client-interceptors.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { Channel } from './channel';
3434
import { CallOptions } from './client';
3535
import { CallCredentials } from './call-credentials';
3636
import { ClientMethodDefinition } from './make-client';
37+
import { getErrorMessage } from './error';
3738

3839
/**
3940
* Error class associated with passing both interceptors and interceptor
@@ -374,7 +375,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
374375
} catch (e) {
375376
this.call.cancelWithStatus(
376377
Status.INTERNAL,
377-
`Request message serialization failure: ${e.message}`
378+
`Request message serialization failure: ${getErrorMessage(e)}`
378379
);
379380
return;
380381
}
@@ -401,7 +402,7 @@ class BaseInterceptingCall implements InterceptingCallInterface {
401402
} catch (e) {
402403
readError = {
403404
code: Status.INTERNAL,
404-
details: `Response message parsing error: ${e.message}`,
405+
details: `Response message parsing error: ${getErrorMessage(e)}`,
405406
metadata: new Metadata(),
406407
};
407408
this.call.cancelWithStatus(readError.code, readError.details);

packages/grpc-js/src/error.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2022 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
export function getErrorMessage(error: unknown): string {
19+
if (error instanceof Error) {
20+
return error.message;
21+
} else {
22+
return String(error);
23+
}
24+
}
25+
26+
export function getErrorCode(error: unknown): number | null {
27+
if (
28+
typeof error === 'object' &&
29+
error !== null &&
30+
'code' in error &&
31+
typeof (error as Record<string, unknown>).code === 'number'
32+
) {
33+
return (error as Record<string, number>).code;
34+
} else {
35+
return null;
36+
}
37+
}

packages/grpc-js/src/filter-stack.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export class FilterStackFactory implements FilterFactory<FilterStack> {
8888
this.factories.unshift(...filterFactories);
8989
}
9090

91+
clone(): FilterStackFactory {
92+
return new FilterStackFactory([...this.factories]);
93+
}
94+
9195
createFilter(): FilterStack {
9296
return new FilterStack(
9397
this.factories.map((factory) => factory.createFilter())

0 commit comments

Comments
 (0)