Skip to content

Commit 32dc7e3

Browse files
ChriztiaanChristiaan Landman
and
Christiaan Landman
authored
update: added a mechanism for throttling callback executions (#190)
Co-authored-by: Christiaan Landman <[email protected]>
1 parent dc9f3b0 commit 32dc7e3

File tree

6 files changed

+134
-10
lines changed

6 files changed

+134
-10
lines changed

.changeset/forty-avocados-yell.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/web': major
3+
---
4+
5+
Bump version for Stable release.

.changeset/itchy-gifts-dance.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Added a mechanism for throttling watch callback executions.

packages/common/src/client/AbstractPowerSyncDatabase.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
StreamingSyncImplementation,
2929
PowerSyncConnectionOptions
3030
} from './sync/stream/AbstractStreamingSyncImplementation';
31+
import { ControlledExecutor } from '../utils/ControlledExecutor';
3132

3233
export interface DisconnectAndClearOptions {
3334
/** When set to false, data in local-only tables is preserved. */
@@ -74,7 +75,7 @@ export interface WatchHandler {
7475
}
7576

7677
export interface WatchOnChangeHandler {
77-
onChange: (event: WatchOnChangeEvent) => void;
78+
onChange: (event: WatchOnChangeEvent) => Promise<void> | void;
7879
onError?: (error: Error) => void;
7980
}
8081

@@ -845,12 +846,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
845846
const changedTables = new Set<string>();
846847
const throttleMs = resolvedOptions.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS;
847848

849+
const executor = new ControlledExecutor(async (e: WatchOnChangeEvent) => {
850+
await onChange(e);
851+
});
852+
848853
const flushTableUpdates = throttle(
849854
() =>
850855
this.handleTableChanges(changedTables, watchedTables, (intersection) => {
851856
if (resolvedOptions?.signal?.aborted) return;
852-
853-
onChange({ changedTables: intersection });
857+
executor.schedule({ changedTables: intersection });
854858
}),
855859
throttleMs,
856860
{ leading: false, trailing: true }
@@ -869,6 +873,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
869873
});
870874

871875
resolvedOptions.signal?.addEventListener('abort', () => {
876+
executor.dispose();
872877
dispose();
873878
});
874879

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
export interface ControlledExecutorOptions {
2+
/**
3+
* If throttling is enabled, it ensures only one task runs at a time,
4+
* and only one additional task can be scheduled to run after the current task completes. The pending task will be overwritten by the latest task.
5+
* Enabled by default.
6+
*/
7+
throttleEnabled?: boolean;
8+
}
9+
10+
export class ControlledExecutor<T> {
11+
private task: (param: T) => Promise<void> | void;
12+
13+
/**
14+
* Represents the currently running task, which could be a Promise or undefined if no task is running.
15+
*/
16+
private runningTask: undefined | (Promise<void> | void);
17+
18+
private pendingTaskParam: T | undefined;
19+
20+
/**
21+
* Flag to determine if throttling is enabled, which controls whether tasks are queued or run immediately.
22+
*/
23+
private isThrottling: boolean;
24+
25+
private closed: boolean;
26+
27+
constructor(task: (param: T) => Promise<void> | void, options?: ControlledExecutorOptions) {
28+
this.task = task;
29+
const { throttleEnabled = true } = options ?? {};
30+
this.isThrottling = throttleEnabled;
31+
this.closed = false;
32+
}
33+
34+
schedule(param: T) {
35+
if (this.closed) {
36+
return;
37+
}
38+
if (!this.isThrottling) {
39+
this.task(param);
40+
return;
41+
}
42+
43+
if (this.runningTask) {
44+
// set or replace the pending task param with latest one
45+
this.pendingTaskParam = param;
46+
return;
47+
}
48+
49+
this.execute(param);
50+
}
51+
52+
dispose() {
53+
this.closed = true;
54+
55+
if (this.runningTask) {
56+
this.runningTask = undefined;
57+
}
58+
}
59+
60+
private async execute(param: T) {
61+
this.runningTask = this.task(param);
62+
await this.runningTask;
63+
this.runningTask = undefined;
64+
65+
if (this.pendingTaskParam) {
66+
const pendingParam = this.pendingTaskParam;
67+
this.pendingTaskParam = undefined;
68+
69+
this.execute(pendingParam);
70+
}
71+
}
72+
}

packages/web/README.md

-4
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ This package (`packages/web`) is the PowerSync SDK for JavaScript Web clients. I
1010

1111
See a summary of features [here](https://docs.powersync.com/client-sdk-references/js-web).
1212

13-
## Beta Release
14-
15-
The web SDK package is currently in a Beta release.
16-
1713
# Installation
1814

1915
## Install Package

packages/web/tests/watch.test.ts

+44-3
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ describe('Watch Tests', () => {
146146
const updatesCount = 5;
147147
let receivedUpdatesCount = 0;
148148

149-
const onUpdate = () => receivedUpdatesCount++;
149+
const onUpdate = () => {
150+
receivedUpdatesCount++;
151+
};
150152

151153
powersync.watch(
152154
'SELECT count() AS count FROM assets INNER JOIN customers ON customers.id = assets.customer_id',
@@ -216,7 +218,9 @@ describe('Watch Tests', () => {
216218
const assetsAbortController = new AbortController();
217219

218220
let receivedAssetsUpdatesCount = 0;
219-
const onWatchAssets = () => receivedAssetsUpdatesCount++;
221+
const onWatchAssets = () => {
222+
receivedAssetsUpdatesCount++;
223+
};
220224

221225
powersync.watch(
222226
'SELECT count() AS count FROM assets',
@@ -230,7 +234,9 @@ describe('Watch Tests', () => {
230234
const customersAbortController = new AbortController();
231235

232236
let receivedCustomersUpdatesCount = 0;
233-
const onWatchCustomers = () => receivedCustomersUpdatesCount++;
237+
const onWatchCustomers = () => {
238+
receivedCustomersUpdatesCount++;
239+
};
234240

235241
powersync.watch(
236242
'SELECT count() AS count FROM customers',
@@ -280,4 +286,39 @@ describe('Watch Tests', () => {
280286
await receivedError;
281287
expect(receivedErrorCount).equals(1);
282288
});
289+
290+
it('should throttle watch callback overflow', async () => {
291+
const abortController = new AbortController();
292+
293+
const updatesCount = 25;
294+
295+
let receivedWithManagedOverflowCount = 0;
296+
const onResultOverflow = () => {
297+
receivedWithManagedOverflowCount++;
298+
};
299+
300+
const overflowAbortController = new AbortController();
301+
powersync.watch(
302+
'SELECT count() AS count FROM assets',
303+
[],
304+
{ onResult: onResultOverflow },
305+
{ signal: overflowAbortController.signal, throttleMs: 1 }
306+
);
307+
308+
// Allows us to count the number of updates received without the initial trigger
309+
await new Promise<void>((resolve) => setTimeout(resolve, 1 * throttleDuration));
310+
311+
// Perform a large number of inserts to trigger overflow
312+
for (let i = 0; i < updatesCount; i++) {
313+
powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);
314+
}
315+
316+
await new Promise<void>((resolve) => setTimeout(resolve, 1 * throttleDuration));
317+
318+
abortController.abort();
319+
overflowAbortController.abort();
320+
321+
// Initial onResult plus two left after overflow was throttled for onChange triggers
322+
expect(receivedWithManagedOverflowCount).toBe(3);
323+
});
283324
});

0 commit comments

Comments
 (0)