Skip to content

Commit 25efd50

Browse files
committed
Fix checkpoints during uploads not being applied
1 parent 3403329 commit 25efd50

File tree

5 files changed

+180
-50
lines changed

5 files changed

+180
-50
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { Schema } from '../db/schema/Schema.js';
1515
import { BaseObserver } from '../utils/BaseObserver.js';
1616
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1717
import { mutexRunExclusive } from '../utils/mutex.js';
18-
import { throttleTrailing } from '../utils/throttle.js';
18+
import { throttleTrailing } from '../utils/async.js';
1919
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2020
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2121
import { runOnSchemaChange } from './runOnSchemaChange.js';

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

+63-44
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import Logger, { ILogger } from 'js-logger';
22

3-
import { SyncPriorityStatus, SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
3+
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
44
import { AbortOperation } from '../../../utils/AbortOperation.js';
55
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
6-
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
6+
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
77
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
@@ -161,6 +161,7 @@ export abstract class AbstractStreamingSyncImplementation
161161
protected abortController: AbortController | null;
162162
protected crudUpdateListener?: () => void;
163163
protected streamingSyncPromise?: Promise<void>;
164+
private pendingCrudUpload?: Promise<void>;
164165

165166
syncStatus: SyncStatus;
166167
triggerCrudUpload: () => void;
@@ -181,10 +182,16 @@ export abstract class AbstractStreamingSyncImplementation
181182
this.abortController = null;
182183

183184
this.triggerCrudUpload = throttleLeadingTrailing(() => {
184-
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
185+
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
185186
return;
186187
}
187-
this._uploadAllCrud();
188+
189+
this.pendingCrudUpload = new Promise((resolve) => {
190+
this._uploadAllCrud().finally(() => {
191+
this.pendingCrudUpload = undefined;
192+
resolve();
193+
});
194+
});
188195
}, this.options.crudUploadThrottleMs!);
189196
}
190197

@@ -582,30 +589,12 @@ The next upload iteration will be delayed.`);
582589
await this.options.adapter.removeBuckets([...bucketsToDelete]);
583590
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
584591
} else if (isStreamingSyncCheckpointComplete(line)) {
585-
this.logger.debug('Checkpoint complete', targetCheckpoint);
586-
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
587-
if (!result.checkpointValid) {
588-
// This means checksums failed. Start again with a new checkpoint.
589-
// TODO: better back-off
590-
await new Promise((resolve) => setTimeout(resolve, 50));
592+
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
593+
if (result.endIteration) {
591594
return { retry: true };
592-
} else if (!result.ready) {
593-
// Checksums valid, but need more data for a consistent checkpoint.
594-
// Continue waiting.
595-
// landing here the whole time
596-
} else {
595+
} else if (result.applied) {
597596
appliedCheckpoint = targetCheckpoint;
598-
this.logger.debug('validated checkpoint', appliedCheckpoint);
599-
this.updateSyncStatus({
600-
connected: true,
601-
lastSyncedAt: new Date(),
602-
dataFlow: {
603-
downloading: false,
604-
downloadError: undefined
605-
}
606-
});
607597
}
608-
609598
validatedCheckpoint = targetCheckpoint;
610599
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
611600
const priority = line.partial_checkpoint_complete.priority;
@@ -617,7 +606,8 @@ The next upload iteration will be delayed.`);
617606
await new Promise((resolve) => setTimeout(resolve, 50));
618607
return { retry: true };
619608
} else if (!result.ready) {
620-
// Need more data for a consistent partial sync within a priority - continue waiting.
609+
// If we have pending uploads, we can't complete new checkpoints outside of priority 0.
610+
// We'll resolve this for a complete checkpoint.
621611
} else {
622612
// We'll keep on downloading, but can report that this priority is synced now.
623613
this.logger.debug('partial checkpoint validation succeeded');
@@ -707,26 +697,13 @@ The next upload iteration will be delayed.`);
707697
}
708698
});
709699
} else if (validatedCheckpoint === targetCheckpoint) {
710-
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
711-
if (!result.checkpointValid) {
712-
// This means checksums failed. Start again with a new checkpoint.
713-
// TODO: better back-off
714-
await new Promise((resolve) => setTimeout(resolve, 50));
700+
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
701+
if (result.endIteration) {
702+
// TODO: Why is this one retry: false? That's the only change from when we receive
703+
// the line above?
715704
return { retry: false };
716-
} else if (!result.ready) {
717-
// Checksums valid, but need more data for a consistent checkpoint.
718-
// Continue waiting.
719-
} else {
705+
} else if (result.applied) {
720706
appliedCheckpoint = targetCheckpoint;
721-
this.updateSyncStatus({
722-
connected: true,
723-
lastSyncedAt: new Date(),
724-
priorityStatusEntries: [],
725-
dataFlow: {
726-
downloading: false,
727-
downloadError: undefined
728-
}
729-
});
730707
}
731708
}
732709
}
@@ -738,6 +715,48 @@ The next upload iteration will be delayed.`);
738715
});
739716
}
740717

718+
private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
719+
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
720+
if (!result.checkpointValid) {
721+
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
722+
// This means checksums failed. Start again with a new checkpoint.
723+
// TODO: better back-off
724+
await new Promise((resolve) => setTimeout(resolve, 50));
725+
return { applied: false, endIteration: true };
726+
} else if (!result.ready) {
727+
// We have pending entries in the local upload queue or are waiting to confirm a write
728+
// checkpoint. See if that is happening right now.
729+
const pending = this.pendingCrudUpload;
730+
if (pending != null) {
731+
await Promise.race([pending, onAbortPromise(abort)]);
732+
}
733+
734+
if (abort.aborted || pending == null) {
735+
return { applied: false, endIteration: true };
736+
}
737+
738+
// Try again now that uploads have completed.
739+
result = await this.options.adapter.syncLocalDatabase(checkpoint);
740+
}
741+
742+
if (result.checkpointValid && result.ready) {
743+
this.logger.debug('validated checkpoint', checkpoint);
744+
this.updateSyncStatus({
745+
connected: true,
746+
lastSyncedAt: new Date(),
747+
dataFlow: {
748+
downloading: false,
749+
downloadError: undefined
750+
}
751+
});
752+
753+
return { applied: true, endIteration: false };
754+
} else {
755+
this.logger.debug('Could not apply checkpoint even after waiting for uploads. Waiting for next sync complete line.');
756+
return { applied: false, endIteration: false };
757+
}
758+
}
759+
741760
protected updateSyncStatus(options: SyncStatusOptions) {
742761
const updatedStatus = new SyncStatus({
743762
connected: options.connected ?? this.syncStatus.connected,

packages/common/src/utils/throttle.ts renamed to packages/common/src/utils/async.ts

+10
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,13 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
4848
}
4949
};
5050
}
51+
52+
export function onAbortPromise(signal: AbortSignal): Promise<void> {
53+
return new Promise<void>((resolve) => {
54+
if (signal.aborted) {
55+
resolve();
56+
} else {
57+
signal.onabort = () => resolve();
58+
}
59+
})
60+
}

packages/web/tests/stream.test.ts

+95
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,101 @@ describe('Streaming', { sequential: true }, () => {
4646
})
4747
)
4848
);
49+
50+
it('Should handle checkpoints during the upload process', async () => {
51+
const { powersync, remote, uploadSpy } = await generateConnectedDatabase();
52+
expect(powersync.connected).toBe(true);
53+
54+
let resolveUploadPromise: () => void;
55+
let resolveUploadStartedPromise: () => void;
56+
const completeUploadPromise = new Promise<void>((resolve) => {
57+
resolveUploadPromise = resolve
58+
});
59+
const uploadStartedPromise = new Promise<void>((resolve) => {
60+
resolveUploadStartedPromise = resolve
61+
});
62+
63+
async function expectUserRows(amount: number) {
64+
const row = await powersync.get<{r: number}>('SELECT COUNT(*) AS r FROM users');
65+
expect(row.r).toBe(amount);
66+
}
67+
68+
uploadSpy.mockImplementation(async (db) => {
69+
const batch = await db.getCrudBatch();
70+
if (!batch) return;
71+
72+
resolveUploadStartedPromise();
73+
await completeUploadPromise;
74+
await batch?.complete();
75+
});
76+
77+
// trigger an upload
78+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['from local']);
79+
await expectUserRows(1);
80+
await uploadStartedPromise;
81+
82+
// A connector could have uploaded data (triggering a checkpoint) before finishing
83+
remote.enqueueLine({
84+
checkpoint: {
85+
write_checkpoint: '1',
86+
last_op_id: '2',
87+
buckets: [{ bucket: 'a', priority: 3, checksum: 0 }]
88+
}
89+
});
90+
remote.generateCheckpoint.mockImplementation(() => {
91+
return {
92+
data: {
93+
write_checkpoint: '1',
94+
}
95+
};
96+
});
97+
98+
remote.enqueueLine({
99+
data: {
100+
bucket: 'a',
101+
data: [
102+
{
103+
checksum: 0,
104+
op_id: '1',
105+
op: 'PUT',
106+
object_id: '1',
107+
object_type: 'users',
108+
data: '{"id": "test1", "name": "from local"}'
109+
},
110+
{
111+
checksum: 0,
112+
op_id: '2',
113+
op: 'PUT',
114+
object_id: '2',
115+
object_type: 'users',
116+
data: '{"id": "test1", "name": "additional entry"}'
117+
},
118+
]
119+
}
120+
});
121+
remote.enqueueLine({
122+
checkpoint_complete: {
123+
last_op_id: '2'
124+
}
125+
});
126+
127+
// Give the sync client some time to process these
128+
await new Promise<void>((resolve) => setTimeout(resolve, 500));
129+
130+
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
131+
await expectUserRows(1);
132+
133+
// Mark the upload as completed. This should trigger a write_checkpoint.json request
134+
resolveUploadPromise!();
135+
await vi.waitFor(() => {
136+
expect(remote.generateCheckpoint.mock.calls.length).equals(1);
137+
});
138+
139+
// Completing the upload should also make the checkpoint visible without it being sent again.
140+
await vi.waitFor(async () => {
141+
await expectUserRows(2);
142+
});
143+
});
49144
});
50145

51146
function describeStreamingTests(createConnectedDatabase: () => Promise<ConnectedDatabaseUtils>) {

packages/web/tests/utils/MockStreamOpenFactory.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
WebPowerSyncOpenFactoryOptions,
1919
WebStreamingSyncImplementation
2020
} from '@powersync/web';
21+
import { MockedFunction, vi } from 'vitest';
2122

2223
export class TestConnector implements PowerSyncBackendConnector {
2324
async fetchCredentials(): Promise<PowerSyncCredentials> {
@@ -35,12 +36,21 @@ export class TestConnector implements PowerSyncBackendConnector {
3536
export class MockRemote extends AbstractRemote {
3637
streamController: ReadableStreamDefaultController<StreamingSyncLine> | null;
3738
errorOnStreamStart = false;
39+
generateCheckpoint: MockedFunction<() => any>;
40+
3841
constructor(
3942
connector: RemoteConnector,
4043
protected onStreamRequested: () => void
4144
) {
4245
super(connector);
4346
this.streamController = null;
47+
this.generateCheckpoint = vi.fn(() => {
48+
return {
49+
data: {
50+
write_checkpoint: '1000',
51+
}
52+
};
53+
});
4454
}
4555

4656
async getBSON(): Promise<BSONImplementation> {
@@ -53,11 +63,7 @@ export class MockRemote extends AbstractRemote {
5363
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
5464
// mock a response for write checkpoint API
5565
if (path.includes('checkpoint')) {
56-
return {
57-
data: {
58-
write_checkpoint: '1000'
59-
}
60-
};
66+
return this.generateCheckpoint();
6167
}
6268
throw new Error('Not implemented');
6369
}

0 commit comments

Comments
 (0)