Skip to content

Commit ec5137e

Browse files
authored
Merge pull request #558 from powersync-ja/fix-download-race-condition
Fix checkpoints during uploads not being applied
2 parents 3403329 + 5f877a3 commit ec5137e

File tree

6 files changed

+196
-68
lines changed

6 files changed

+196
-68
lines changed

.changeset/violet-falcons-build.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fix a race condition causing sync changes during uploads not to be applied.

packages/common/src/client/AbstractPowerSyncDatabase.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { Schema } from '../db/schema/Schema.js';
1515
import { BaseObserver } from '../utils/BaseObserver.js';
1616
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1717
import { mutexRunExclusive } from '../utils/mutex.js';
18-
import { throttleTrailing } from '../utils/throttle.js';
18+
import { throttleTrailing } from '../utils/async.js';
1919
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2020
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2121
import { runOnSchemaChange } from './runOnSchemaChange.js';

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

+74-62
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import Logger, { ILogger } from 'js-logger';
22

3-
import { SyncPriorityStatus, SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
3+
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
44
import { AbortOperation } from '../../../utils/AbortOperation.js';
55
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
6-
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
6+
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
77
import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
88
import { CrudEntry } from '../bucket/CrudEntry.js';
99
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
@@ -161,6 +161,7 @@ export abstract class AbstractStreamingSyncImplementation
161161
protected abortController: AbortController | null;
162162
protected crudUpdateListener?: () => void;
163163
protected streamingSyncPromise?: Promise<void>;
164+
private pendingCrudUpload?: Promise<void>;
164165

165166
syncStatus: SyncStatus;
166167
triggerCrudUpload: () => void;
@@ -181,10 +182,16 @@ export abstract class AbstractStreamingSyncImplementation
181182
this.abortController = null;
182183

183184
this.triggerCrudUpload = throttleLeadingTrailing(() => {
184-
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
185+
if (!this.syncStatus.connected || this.pendingCrudUpload != null) {
185186
return;
186187
}
187-
this._uploadAllCrud();
188+
189+
this.pendingCrudUpload = new Promise((resolve) => {
190+
this._uploadAllCrud().finally(() => {
191+
this.pendingCrudUpload = undefined;
192+
resolve();
193+
});
194+
});
188195
}, this.options.crudUploadThrottleMs!);
189196
}
190197

@@ -434,16 +441,8 @@ The next upload iteration will be delayed.`);
434441
if (signal?.aborted) {
435442
break;
436443
}
437-
const { retry } = await this.streamingSyncIteration(nestedAbortController.signal, options);
438-
if (!retry) {
439-
/**
440-
* A sync error ocurred that we cannot recover from here.
441-
* This loop must terminate.
442-
* The nestedAbortController will close any open network requests and streams below.
443-
*/
444-
break;
445-
}
446-
// Continue immediately
444+
await this.streamingSyncIteration(nestedAbortController.signal, options);
445+
// Continue immediately, streamingSyncIteration will wait before completing if necessary.
447446
} catch (ex) {
448447
/**
449448
* Either:
@@ -501,8 +500,8 @@ The next upload iteration will be delayed.`);
501500
protected async streamingSyncIteration(
502501
signal: AbortSignal,
503502
options?: PowerSyncConnectionOptions
504-
): Promise<{ retry?: boolean }> {
505-
return await this.obtainLock({
503+
): Promise<void> {
504+
await this.obtainLock({
506505
type: LockType.SYNC,
507506
signal,
508507
callback: async () => {
@@ -552,7 +551,7 @@ The next upload iteration will be delayed.`);
552551
const line = await stream.read();
553552
if (!line) {
554553
// The stream has closed while waiting
555-
return { retry: true };
554+
return;
556555
}
557556

558557
// A connection is active and messages are being received
@@ -582,30 +581,12 @@ The next upload iteration will be delayed.`);
582581
await this.options.adapter.removeBuckets([...bucketsToDelete]);
583582
await this.options.adapter.setTargetCheckpoint(targetCheckpoint);
584583
} else if (isStreamingSyncCheckpointComplete(line)) {
585-
this.logger.debug('Checkpoint complete', targetCheckpoint);
586-
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
587-
if (!result.checkpointValid) {
588-
// This means checksums failed. Start again with a new checkpoint.
589-
// TODO: better back-off
590-
await new Promise((resolve) => setTimeout(resolve, 50));
591-
return { retry: true };
592-
} else if (!result.ready) {
593-
// Checksums valid, but need more data for a consistent checkpoint.
594-
// Continue waiting.
595-
// landing here the whole time
596-
} else {
584+
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
585+
if (result.endIteration) {
586+
return;
587+
} else if (result.applied) {
597588
appliedCheckpoint = targetCheckpoint;
598-
this.logger.debug('validated checkpoint', appliedCheckpoint);
599-
this.updateSyncStatus({
600-
connected: true,
601-
lastSyncedAt: new Date(),
602-
dataFlow: {
603-
downloading: false,
604-
downloadError: undefined
605-
}
606-
});
607589
}
608-
609590
validatedCheckpoint = targetCheckpoint;
610591
} else if (isStreamingSyncCheckpointPartiallyComplete(line)) {
611592
const priority = line.partial_checkpoint_complete.priority;
@@ -615,9 +596,10 @@ The next upload iteration will be delayed.`);
615596
// This means checksums failed. Start again with a new checkpoint.
616597
// TODO: better back-off
617598
await new Promise((resolve) => setTimeout(resolve, 50));
618-
return { retry: true };
599+
return;
619600
} else if (!result.ready) {
620-
// Need more data for a consistent partial sync within a priority - continue waiting.
601+
// If we have pending uploads, we can't complete new checkpoints outside of priority 0.
602+
// We'll resolve this for a complete checkpoint.
621603
} else {
622604
// We'll keep on downloading, but can report that this priority is synced now.
623605
this.logger.debug('partial checkpoint validation succeeded');
@@ -691,7 +673,7 @@ The next upload iteration will be delayed.`);
691673
* (uses the same one), this should have some delay.
692674
*/
693675
await this.delayRetry();
694-
return { retry: true };
676+
return ;
695677
}
696678
this.triggerCrudUpload();
697679
} else {
@@ -707,37 +689,67 @@ The next upload iteration will be delayed.`);
707689
}
708690
});
709691
} else if (validatedCheckpoint === targetCheckpoint) {
710-
const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!);
711-
if (!result.checkpointValid) {
712-
// This means checksums failed. Start again with a new checkpoint.
713-
// TODO: better back-off
714-
await new Promise((resolve) => setTimeout(resolve, 50));
715-
return { retry: false };
716-
} else if (!result.ready) {
717-
// Checksums valid, but need more data for a consistent checkpoint.
718-
// Continue waiting.
719-
} else {
692+
const result = await this.applyCheckpoint(targetCheckpoint!, signal);
693+
if (result.endIteration) {
694+
return;
695+
} else if (result.applied) {
720696
appliedCheckpoint = targetCheckpoint;
721-
this.updateSyncStatus({
722-
connected: true,
723-
lastSyncedAt: new Date(),
724-
priorityStatusEntries: [],
725-
dataFlow: {
726-
downloading: false,
727-
downloadError: undefined
728-
}
729-
});
730697
}
731698
}
732699
}
733700
}
734701
this.logger.debug('Stream input empty');
735702
// Connection closed. Likely due to auth issue.
736-
return { retry: true };
703+
return;
737704
}
738705
});
739706
}
740707

708+
private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
709+
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
710+
const pending = this.pendingCrudUpload;
711+
712+
if (!result.checkpointValid) {
713+
this.logger.debug('Checksum mismatch in checkpoint, will reconnect');
714+
// This means checksums failed. Start again with a new checkpoint.
715+
// TODO: better back-off
716+
await new Promise((resolve) => setTimeout(resolve, 50));
717+
return { applied: false, endIteration: true };
718+
} else if (!result.ready && pending != null) {
719+
// We have pending entries in the local upload queue or are waiting to confirm a write
720+
// checkpoint, which prevented this checkpoint from applying. Wait for that to complete and
721+
// try again.
722+
this.logger.debug(
723+
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
724+
);
725+
await Promise.race([pending, onAbortPromise(abort)]);
726+
727+
if (abort.aborted) {
728+
return { applied: false, endIteration: true };
729+
}
730+
731+
// Try again now that uploads have completed.
732+
result = await this.options.adapter.syncLocalDatabase(checkpoint);
733+
}
734+
735+
if (result.checkpointValid && result.ready) {
736+
this.logger.debug('validated checkpoint', checkpoint);
737+
this.updateSyncStatus({
738+
connected: true,
739+
lastSyncedAt: new Date(),
740+
dataFlow: {
741+
downloading: false,
742+
downloadError: undefined
743+
}
744+
});
745+
746+
return { applied: true, endIteration: false };
747+
} else {
748+
this.logger.debug('Could not apply checkpoint. Waiting for next sync complete line.');
749+
return { applied: false, endIteration: false };
750+
}
751+
}
752+
741753
protected updateSyncStatus(options: SyncStatusOptions) {
742754
const updatedStatus = new SyncStatus({
743755
connected: options.connected ?? this.syncStatus.connected,

packages/common/src/utils/throttle.ts renamed to packages/common/src/utils/async.ts

+10
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,13 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
4848
}
4949
};
5050
}
51+
52+
export function onAbortPromise(signal: AbortSignal): Promise<void> {
53+
return new Promise<void>((resolve) => {
54+
if (signal.aborted) {
55+
resolve();
56+
} else {
57+
signal.onabort = () => resolve();
58+
}
59+
});
60+
}

packages/web/tests/stream.test.ts

+95
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,101 @@ describe('Streaming', { sequential: true }, () => {
4646
})
4747
)
4848
);
49+
50+
it('Should handle checkpoints during the upload process', async () => {
51+
const { powersync, remote, uploadSpy } = await generateConnectedDatabase();
52+
expect(powersync.connected).toBe(true);
53+
54+
let resolveUploadPromise: () => void;
55+
let resolveUploadStartedPromise: () => void;
56+
const completeUploadPromise = new Promise<void>((resolve) => {
57+
resolveUploadPromise = resolve;
58+
});
59+
const uploadStartedPromise = new Promise<void>((resolve) => {
60+
resolveUploadStartedPromise = resolve;
61+
});
62+
63+
async function expectUserRows(amount: number) {
64+
const row = await powersync.get<{ r: number }>('SELECT COUNT(*) AS r FROM users');
65+
expect(row.r).toBe(amount);
66+
}
67+
68+
uploadSpy.mockImplementation(async (db) => {
69+
const batch = await db.getCrudBatch();
70+
if (!batch) return;
71+
72+
resolveUploadStartedPromise();
73+
await completeUploadPromise;
74+
await batch?.complete();
75+
});
76+
77+
// trigger an upload
78+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['from local']);
79+
await expectUserRows(1);
80+
await uploadStartedPromise;
81+
82+
// A connector could have uploaded data (triggering a checkpoint) before finishing
83+
remote.enqueueLine({
84+
checkpoint: {
85+
write_checkpoint: '1',
86+
last_op_id: '2',
87+
buckets: [{ bucket: 'a', priority: 3, checksum: 0 }]
88+
}
89+
});
90+
remote.generateCheckpoint.mockImplementation(() => {
91+
return {
92+
data: {
93+
write_checkpoint: '1'
94+
}
95+
};
96+
});
97+
98+
remote.enqueueLine({
99+
data: {
100+
bucket: 'a',
101+
data: [
102+
{
103+
checksum: 0,
104+
op_id: '1',
105+
op: 'PUT',
106+
object_id: '1',
107+
object_type: 'users',
108+
data: '{"id": "test1", "name": "from local"}'
109+
},
110+
{
111+
checksum: 0,
112+
op_id: '2',
113+
op: 'PUT',
114+
object_id: '2',
115+
object_type: 'users',
116+
data: '{"id": "test1", "name": "additional entry"}'
117+
}
118+
]
119+
}
120+
});
121+
remote.enqueueLine({
122+
checkpoint_complete: {
123+
last_op_id: '2'
124+
}
125+
});
126+
127+
// Give the sync client some time to process these
128+
await new Promise<void>((resolve) => setTimeout(resolve, 500));
129+
130+
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
131+
await expectUserRows(1);
132+
133+
// Mark the upload as completed. This should trigger a write_checkpoint.json request
134+
resolveUploadPromise!();
135+
await vi.waitFor(() => {
136+
expect(remote.generateCheckpoint.mock.calls.length).equals(1);
137+
});
138+
139+
// Completing the upload should also make the checkpoint visible without it being sent again.
140+
await vi.waitFor(async () => {
141+
await expectUserRows(2);
142+
});
143+
});
49144
});
50145

51146
function describeStreamingTests(createConnectedDatabase: () => Promise<ConnectedDatabaseUtils>) {

packages/web/tests/utils/MockStreamOpenFactory.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
WebPowerSyncOpenFactoryOptions,
1919
WebStreamingSyncImplementation
2020
} from '@powersync/web';
21+
import { MockedFunction, vi } from 'vitest';
2122

2223
export class TestConnector implements PowerSyncBackendConnector {
2324
async fetchCredentials(): Promise<PowerSyncCredentials> {
@@ -35,12 +36,21 @@ export class TestConnector implements PowerSyncBackendConnector {
3536
export class MockRemote extends AbstractRemote {
3637
streamController: ReadableStreamDefaultController<StreamingSyncLine> | null;
3738
errorOnStreamStart = false;
39+
generateCheckpoint: MockedFunction<() => any>;
40+
3841
constructor(
3942
connector: RemoteConnector,
4043
protected onStreamRequested: () => void
4144
) {
4245
super(connector);
4346
this.streamController = null;
47+
this.generateCheckpoint = vi.fn(() => {
48+
return {
49+
data: {
50+
write_checkpoint: '1000'
51+
}
52+
};
53+
});
4454
}
4555

4656
async getBSON(): Promise<BSONImplementation> {
@@ -53,11 +63,7 @@ export class MockRemote extends AbstractRemote {
5363
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
5464
// mock a response for write checkpoint API
5565
if (path.includes('checkpoint')) {
56-
return {
57-
data: {
58-
write_checkpoint: '1000'
59-
}
60-
};
66+
return this.generateCheckpoint();
6167
}
6268
throw new Error('Not implemented');
6369
}

0 commit comments

Comments
 (0)