Skip to content

Commit 5a1c3d9

Browse files
committed
Add jitter() setting to DatumLoader to add random delays on parallel requests, and help avoid rate throttling errors.
1 parent 88293da commit 5a1c3d9

File tree

4 files changed

+209
-6
lines changed

4 files changed

+209
-6
lines changed

src/main/net/jsonClientSupport.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,16 @@ export default abstract class JsonClientSupport<API extends UrlHelper, T> {
5555
*
5656
* @param url the URL to request.
5757
* @param signUrl the URL to sign (might be different to `url` if a proxy is used)
58+
* @param delay an optional number of milliseconds to sleep before initiating the request
5859
* @returns a function that accepts a callback argument
5960
*/
6061
protected requestor<V>(
6162
url: string,
62-
signUrl?: string
63+
signUrl?: string,
64+
delay?: number
6365
): (cb: LoaderDataCallbackFn<V>) => void {
6466
const auth = this.authBuilder;
65-
return (cb: LoaderDataCallbackFn<V>) => {
67+
const fn = (cb: LoaderDataCallbackFn<V>) => {
6668
const headers: any = {
6769
Accept: "application/json",
6870
};
@@ -101,5 +103,13 @@ export default abstract class JsonClientSupport<API extends UrlHelper, T> {
101103
}, errorHandler);
102104
}, errorHandler);
103105
};
106+
if (delay && delay > 0) {
107+
return (cb: LoaderDataCallbackFn<V>) => {
108+
setTimeout(() => {
109+
fn.call(this, cb);
110+
}, delay);
111+
};
112+
}
113+
return fn;
104114
}
105115
}

src/main/tool/datumLoader.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ interface QueryResultsData {
2121
}
2222

2323
const DEFAULT_PAGE_SIZE: number = 1000;
24+
const DEFAULT_JITTER: number = 150;
2425

2526
/**
2627
* An enumeration of loader state values.
@@ -88,6 +89,12 @@ export default class DatumLoader
8889
*/
8990
#concurrency: number;
9091

92+
/**
93+
* When > 0 then add a random amount of milliseconds up to this amount before initiating
94+
* parallel requests (thus #concurrency must also be configured).
95+
*/
96+
#jitter: number;
97+
9198
/**
9299
* A queue to use for parallel mode, when `concurrency` configured > 0.
93100
*/
@@ -123,6 +130,7 @@ export default class DatumLoader
123130
this.#readingsMode = false;
124131
this.#proxyUrl = null;
125132
this.#concurrency = 0;
133+
this.#jitter = DEFAULT_JITTER;
126134
this.#state = DatumLoaderState.Ready;
127135
}
128136

@@ -152,12 +160,41 @@ export default class DatumLoader
152160
if (value === undefined) {
153161
return this.#concurrency;
154162
}
155-
if (!isNaN(value) && Number(value) > 0) {
163+
if (!isNaN(value) && Number(value) >= 0) {
156164
this.#concurrency = Number(value);
157165
}
158166
return this;
159167
}
160168

169+
/**
170+
* Get the concurrency jitter value to use for parallel requests.
171+
*
172+
* @returns the current concurrency jitter value (milliseconds); defaults to `150`
173+
*/
174+
jitter(): number;
175+
176+
/**
177+
* Set the concurrency jitter amount to use for parallel requests.
178+
*
179+
* When parallel mode is enabled by setting `concurrency()` to a positive value, a random amount
180+
* of "pause" time can be added before parallel requests are made by configuring this
181+
* to a positive value. This can be helpful to avoid API rate limiting errors.
182+
*
183+
* @param value the concurrency jitter amount to use, in milliseconds, or `0` to disable
184+
* @returns this object
185+
*/
186+
jitter(value: number): this;
187+
188+
jitter(value?: number): number | this {
189+
if (value === undefined) {
190+
return this.#jitter;
191+
}
192+
if (!isNaN(value) && Number(value) >= 0) {
193+
this.#jitter = Number(value);
194+
}
195+
return this;
196+
}
197+
161198
/**
162199
* Get the optional callback function.
163200
*
@@ -485,7 +522,13 @@ export default class DatumLoader
485522
? url.replace(/^[^:]+:\/\/[^/]+/, this.#proxyUrl)
486523
: url;
487524

488-
const query = this.requestor<QueryResultsData>(reqUrl, url);
525+
// add delay for parallel mode if jitter configured
526+
const delay =
527+
q && this.#jitter > 0
528+
? Math.floor(Math.random() * this.#jitter) + 1
529+
: 0;
530+
531+
const query = this.requestor<QueryResultsData>(reqUrl, url, delay);
489532

490533
const handler = (error?: Error, data?: QueryResultsData) => {
491534
if (error) {

src/test/net/jsonClientSupportTests.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ log.level = LogLevel.DEBUG;
1515

1616
class TestClient extends JsonClientSupport<SolarQueryApi, any> {
1717
#url: string;
18+
#jitter?: number;
1819

19-
constructor(api: SolarQueryApi, url: string) {
20+
constructor(api: SolarQueryApi, url: string, jitter?: number) {
2021
super(api);
2122
this.#url = url;
23+
this.#jitter = jitter;
2224
}
2325

2426
/**
@@ -28,7 +30,11 @@ class TestClient extends JsonClientSupport<SolarQueryApi, any> {
2830
*/
2931
fetch(): Promise<any> {
3032
return new Promise((resolve, reject) => {
31-
this.requestor(this.#url)((error, results) => {
33+
this.requestor(
34+
this.#url,
35+
undefined,
36+
this.#jitter
37+
)((error, results) => {
3238
if (error) {
3339
reject(error);
3440
} else {
@@ -71,6 +77,31 @@ test.serial("fetch:http404", async (t) => {
7177
);
7278
});
7379

80+
test.serial("fetch:jitter", async (t) => {
81+
// GIVEN
82+
const url = "http://localhost/foo";
83+
const http = t.context.agent.get("http://localhost");
84+
http.intercept({
85+
path: "/foo",
86+
method: "GET",
87+
}).reply(200, {
88+
success: true,
89+
data: "hi",
90+
});
91+
92+
// WHEN
93+
const client = new TestClient(t.context.api, url, 1000);
94+
const start = new Date();
95+
const result = await client.fetch();
96+
const end = new Date();
97+
98+
t.is(result, "hi", "Requested URL with delay returned data result.");
99+
t.true(
100+
end.getTime() - start.getTime() >= 1000,
101+
"At least delay ms has elapsed."
102+
);
103+
});
104+
74105
test.serial("fetch:result:error:message", async (t) => {
75106
// GIVEN
76107
const url = "http://localhost/foo";

src/test/tool/datumLoaderTests.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,20 @@ test.serial("load:parallel:multiPage:gapDataFirstPage", async (t) => {
10081008
t.deepEqual(results, allResults.slice(0, 4));
10091009
});
10101010

1011+
test.serial("jitter", (t) => {
1012+
const filter = testFilter();
1013+
const loader = new DatumLoader(t.context.api, filter);
1014+
t.truthy(loader);
1015+
t.is(loader.jitter("Ten" as any).jitter(), 150, "NaN ignored");
1016+
t.is(loader.jitter(-1).jitter(), 150, "Negative ignored");
1017+
t.is(
1018+
loader.jitter("500" as any).jitter(),
1019+
500,
1020+
"Number string parsed as number"
1021+
);
1022+
t.is(loader.jitter(1).jitter(), 1, "Positive set");
1023+
});
1024+
10111025
test.serial("load:proxy:multiPage:parallel", (t) => {
10121026
const http = t.context.agent.get("https://query-proxy");
10131027
const allResults = [
@@ -1092,6 +1106,111 @@ test.serial("load:proxy:multiPage:parallel", (t) => {
10921106
.proxyUrl(proxyUrl)
10931107
.paginationSize(2)
10941108
.concurrency(Infinity)
1109+
.jitter(500)
1110+
.includeTotalResultsCount(true);
1111+
t.truthy(loader);
1112+
t.is(loader.jitter(), 500);
1113+
t.is(loader.proxyUrl(), proxyUrl);
1114+
t.is(loader.includeTotalResultsCount(), true);
1115+
1116+
return new Promise((resolve, reject) => {
1117+
loader.load((error, results) => {
1118+
try {
1119+
t.falsy(error, "No error generated.");
1120+
t.deepEqual(results, allResults, "All pages returned.");
1121+
resolve();
1122+
} catch (error) {
1123+
reject(error);
1124+
}
1125+
});
1126+
});
1127+
});
1128+
1129+
test.serial("load:proxy:multiPage:parallel:noJitter", (t) => {
1130+
const http = t.context.agent.get("https://query-proxy");
1131+
const allResults = [
1132+
{
1133+
created: "2017-07-04 12:00:00.000Z",
1134+
nodeId: 123,
1135+
sourceId: "test-source",
1136+
val: 0,
1137+
},
1138+
{
1139+
created: "2017-07-04 13:00:00.000Z",
1140+
nodeId: 123,
1141+
sourceId: "test-source",
1142+
val: 1,
1143+
},
1144+
{
1145+
created: "2017-07-04 14:00:00.000Z",
1146+
nodeId: 123,
1147+
sourceId: "test-source",
1148+
val: 0,
1149+
},
1150+
{
1151+
created: "2017-07-04 15:00:00.000Z",
1152+
nodeId: 123,
1153+
sourceId: "test-source",
1154+
val: 1,
1155+
},
1156+
{
1157+
created: "2017-07-04 16:00:00.000Z",
1158+
nodeId: 123,
1159+
sourceId: "test-source",
1160+
val: 0,
1161+
},
1162+
{
1163+
created: "2017-07-04 17:00:00.000Z",
1164+
nodeId: 123,
1165+
sourceId: "test-source",
1166+
val: 1,
1167+
},
1168+
];
1169+
// expect 3 page queries: one for first page and total result count, 2 more for remaining pages
1170+
http.intercept({
1171+
path: "/path/solarquery/api/v1/pub/datum/list?nodeId=123&sourceId=test-source&startDate=2017-04-01T12%3A00&endDate=2017-05-01T12%3A00&aggregation=Hour&withoutTotalResultsCount=false&max=2",
1172+
method: "GET",
1173+
}).reply(200, {
1174+
success: true,
1175+
data: {
1176+
totalResults: 6,
1177+
startingOffset: 0,
1178+
returnedResultCount: 2,
1179+
results: allResults.slice(0, 2),
1180+
},
1181+
});
1182+
http.intercept({
1183+
path: "/path/solarquery/api/v1/pub/datum/list?nodeId=123&sourceId=test-source&startDate=2017-04-01T12%3A00&endDate=2017-05-01T12%3A00&aggregation=Hour&withoutTotalResultsCount=true&max=2&offset=2",
1184+
method: "GET",
1185+
}).reply(200, {
1186+
success: true,
1187+
data: {
1188+
totalResults: null,
1189+
startingOffset: 2,
1190+
returnedResultCount: 2,
1191+
results: allResults.slice(2, 4),
1192+
},
1193+
});
1194+
http.intercept({
1195+
path: "/path/solarquery/api/v1/pub/datum/list?nodeId=123&sourceId=test-source&startDate=2017-04-01T12%3A00&endDate=2017-05-01T12%3A00&aggregation=Hour&withoutTotalResultsCount=true&max=2&offset=4",
1196+
method: "GET",
1197+
}).reply(200, {
1198+
success: true,
1199+
data: {
1200+
totalResults: null,
1201+
startingOffset: 4,
1202+
returnedResultCount: 2,
1203+
results: allResults.slice(4, 6),
1204+
},
1205+
});
1206+
1207+
const proxyUrl = "https://query-proxy/path";
1208+
const filter = testFilter();
1209+
const loader = new DatumLoader(t.context.api, filter)
1210+
.proxyUrl(proxyUrl)
1211+
.paginationSize(2)
1212+
.concurrency(Infinity)
1213+
.jitter(0)
10951214
.includeTotalResultsCount(true);
10961215
t.truthy(loader);
10971216
t.is(loader.proxyUrl(), proxyUrl);

0 commit comments

Comments
 (0)