Skip to content

Commit 944ee93

Browse files
fix: CRUD Upload Queue Monitoring (#311)
1 parent d5f755d commit 944ee93

File tree

6 files changed

+166
-22
lines changed

6 files changed

+166
-22
lines changed

.changeset/afraid-apples-learn.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/web': patch
4+
'@powersync/react-native': patch
5+
---
6+
7+
Fixed issue where sequentially mutating the same row multiple times could cause the CRUD upload queue monitoring to think CRUD operations have not been processed correctly by the `BackendConnector` `uploadData` method.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import Logger, { ILogger } from 'js-logger';
33
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
44
import { AbortOperation } from '../../../utils/AbortOperation.js';
55
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
6+
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
67
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
78
import { CrudEntry } from '../bucket/CrudEntry.js';
89
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
@@ -16,7 +17,6 @@ import {
1617
isStreamingSyncCheckpointDiff,
1718
isStreamingSyncData
1819
} from './streaming-sync-types.js';
19-
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
2020

2121
export enum LockType {
2222
CRUD = 'crud',
@@ -230,7 +230,7 @@ export abstract class AbstractStreamingSyncImplementation
230230
*/
231231
const nextCrudItem = await this.options.adapter.nextCrudItem();
232232
if (nextCrudItem) {
233-
if (nextCrudItem.id == checkedCrudItem?.id) {
233+
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
234234
// This will force a higher log level than exceptions which are caught here.
235235
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
236236
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.

packages/web/src/db/PowerSyncDatabase.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import {
1616
import { Mutex } from 'async-mutex';
1717
import { WASQLiteOpenFactory } from './adapters/wa-sqlite/WASQLiteOpenFactory';
1818
import {
19-
ResolvedWebSQLOpenOptions,
2019
DEFAULT_WEB_SQL_FLAGS,
20+
ResolvedWebSQLOpenOptions,
2121
resolveWebSQLFlags,
2222
WebSQLFlags
2323
} from './adapters/web-sql-flags';

packages/web/tests/stream.test.ts

+42-18
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,39 @@
1-
import { Schema, TableV2, column } from '@powersync/common';
1+
import { Schema, Table, column } from '@powersync/common';
2+
import { WebPowerSyncOpenFactoryOptions } from '@powersync/web';
23
import Logger from 'js-logger';
34
import { v4 as uuid } from 'uuid';
45
import { beforeAll, describe, expect, it, vi } from 'vitest';
56
import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory';
67

8+
type UnwrapPromise<T> = T extends Promise<infer U> ? U : T;
9+
10+
export type ConnectedDatabaseUtils = UnwrapPromise<ReturnType<typeof generateConnectedDatabase>>;
11+
export type GenerateConnectedDatabaseOptions = {
12+
powerSyncOptions: Partial<WebPowerSyncOpenFactoryOptions>;
13+
};
14+
715
const UPLOAD_TIMEOUT_MS = 3000;
816

9-
export async function generateConnectedDatabase({ useWebWorker } = { useWebWorker: true }) {
17+
export const DEFAULT_CONNECTED_POWERSYNC_OPTIONS = {
18+
powerSyncOptions: {
19+
dbFilename: 'test-stream-connection.db',
20+
flags: {
21+
enableMultiTabs: false,
22+
useWebWorker: true
23+
},
24+
// Makes tests faster
25+
crudUploadThrottleMs: 0,
26+
schema: new Schema({
27+
users: new Table({ name: column.text })
28+
})
29+
}
30+
};
31+
32+
export async function generateConnectedDatabase(
33+
options: GenerateConnectedDatabaseOptions = DEFAULT_CONNECTED_POWERSYNC_OPTIONS
34+
) {
35+
const { powerSyncOptions } = options;
36+
const { powerSyncOptions: defaultPowerSyncOptions } = DEFAULT_CONNECTED_POWERSYNC_OPTIONS;
1037
/**
1138
* Very basic implementation of a listener pattern.
1239
* Required since we cannot extend multiple classes.
@@ -16,24 +43,14 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
1643
const uploadSpy = vi.spyOn(connector, 'uploadData');
1744
const remote = new MockRemote(connector, () => callbacks.forEach((c) => c()));
1845

19-
const users = new TableV2({
20-
name: column.text
21-
});
22-
23-
const schema = new Schema({
24-
users
25-
});
26-
2746
const factory = new MockStreamOpenFactory(
2847
{
29-
dbFilename: 'test-stream-connection.db',
48+
...defaultPowerSyncOptions,
49+
...powerSyncOptions,
3050
flags: {
31-
enableMultiTabs: false,
32-
useWebWorker
33-
},
34-
// Makes tests faster
35-
crudUploadThrottleMs: 0,
36-
schema
51+
...(defaultPowerSyncOptions.flags ?? {}),
52+
...(powerSyncOptions.flags ?? {})
53+
}
3754
},
3855
remote
3956
);
@@ -83,7 +100,14 @@ describe('Streaming', () => {
83100
test: (createConnectedDatabase: () => ReturnType<typeof generateConnectedDatabase>) => Promise<void>
84101
) => {
85102
const funcWithWebWorker = generateConnectedDatabase;
86-
const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false });
103+
const funcWithoutWebWorker = () =>
104+
generateConnectedDatabase({
105+
powerSyncOptions: {
106+
flags: {
107+
useWebWorker: false
108+
}
109+
}
110+
});
87111

88112
it(`${name} - with web worker`, () => test(funcWithWebWorker));
89113
it(`${name} - without web worker`, () => test(funcWithoutWebWorker));

packages/web/tests/uploads.test.ts

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import Logger from 'js-logger';
2+
import p from 'p-defer';
3+
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
4+
import { ConnectedDatabaseUtils, generateConnectedDatabase } from './stream.test';
5+
6+
// Don't want to actually export the warning string from the package
7+
const PARTIAL_WARNING = 'Potentially previously uploaded CRUD entries are still present';
8+
9+
describe('CRUD Uploads', () => {
10+
let connectedUtils: ConnectedDatabaseUtils;
11+
const logger = Logger.get('crud-logger');
12+
13+
beforeAll(() => Logger.useDefaults());
14+
15+
beforeEach(async () => {
16+
connectedUtils = await generateConnectedDatabase({
17+
powerSyncOptions: {
18+
logger,
19+
crudUploadThrottleMs: 1_000,
20+
flags: {
21+
enableMultiTabs: false
22+
}
23+
}
24+
});
25+
});
26+
27+
afterEach(async () => {
28+
connectedUtils.remote.streamController?.close();
29+
await connectedUtils.powersync.disconnectAndClear();
30+
await connectedUtils.powersync.close();
31+
});
32+
33+
it('should warn for missing upload operations in uploadData', async () => {
34+
const { powersync, uploadSpy } = connectedUtils;
35+
const loggerSpy = vi.spyOn(logger, 'warn');
36+
37+
const deferred = p();
38+
39+
uploadSpy.mockImplementation(async (db) => {
40+
// This upload method does not perform an upload
41+
deferred.resolve();
42+
});
43+
44+
// Create something with CRUD in it.
45+
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']);
46+
47+
// The empty upload handler should have been called
48+
// Timeouts seem to be weird in Vitest Browser mode.
49+
// This makes the check below more stable.
50+
await deferred.promise;
51+
52+
await vi.waitFor(
53+
() => {
54+
expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).exist;
55+
},
56+
{
57+
timeout: 500
58+
}
59+
);
60+
});
61+
62+
it('should immediately upload sequential transactions', async () => {
63+
const { powersync, uploadSpy } = connectedUtils;
64+
const loggerSpy = vi.spyOn(logger, 'warn');
65+
66+
const deferred = p();
67+
68+
uploadSpy.mockImplementation(async (db) => {
69+
const nextTransaction = await db.getNextCrudTransaction();
70+
if (!nextTransaction) {
71+
return;
72+
}
73+
// Mockingly delete the crud op in order to progress through the CRUD queue
74+
for (const op of nextTransaction.crud) {
75+
await db.execute(`DELETE FROM ps_crud WHERE id = ?`, [op.clientId]);
76+
}
77+
78+
deferred.resolve();
79+
});
80+
81+
// Create the first item
82+
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']);
83+
84+
// Modify the first item in a new transaction
85+
await powersync.execute(`
86+
UPDATE
87+
users
88+
SET
89+
name = 'Mugi'
90+
WHERE
91+
name = 'steven'`);
92+
93+
// Create a second item
94+
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven2']);
95+
96+
// The empty upload handler should have been called.
97+
// Timeouts seem to be weird in Vitest Browser mode.
98+
// This makes the check below more stable.
99+
await deferred.promise;
100+
101+
await vi.waitFor(
102+
() => {
103+
expect(uploadSpy.mock.calls.length).eq(3);
104+
},
105+
{
106+
timeout: 5_000
107+
}
108+
);
109+
110+
expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).toBeUndefined;
111+
});
112+
});

packages/web/tests/utils/MockStreamOpenFactory.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,15 @@ export class MockedStreamPowerSync extends PowerSyncDatabase {
133133
connector: PowerSyncBackendConnector
134134
): AbstractStreamingSyncImplementation {
135135
return new WebStreamingSyncImplementation({
136+
logger: this.options.logger,
136137
adapter: this.bucketStorageAdapter,
137138
remote: this.remote,
138139
uploadCrud: async () => {
139140
await this.waitForReady();
140141
await connector.uploadData(this);
141142
},
142143
identifier: this.database.name,
143-
retryDelayMs: 0
144+
retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster
144145
});
145146
}
146147
}

0 commit comments

Comments
 (0)