Skip to content

Commit 6780fe2

Browse files
authored
Merge pull request #502 from powersync-ja/feat/bucket-priorities
Support bucket priorities
2 parents 4f3df42 + e04c040 commit 6780fe2

24 files changed

+508
-222
lines changed

.changeset/friendly-chairs-camp.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Support bucket priorities

packages/common/src/client/AbstractPowerSyncDatabase.ts

+51-12
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
UpdateNotification,
1010
isBatchedUpdateNotification
1111
} from '../db/DBAdapter.js';
12-
import { SyncStatus } from '../db/crud/SyncStatus.js';
12+
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
1313
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1414
import { Schema } from '../db/schema/Schema.js';
1515
import { BaseObserver } from '../utils/BaseObserver.js';
@@ -146,6 +146,11 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
146146
return typeof test == 'object' && isSQLOpenOptions(test.database);
147147
};
148148

149+
/**
150+
* The priority used by the core extension to indicate that a full sync was completed.
151+
*/
152+
const FULL_SYNC_PRIORITY = 2147483647;
153+
149154
export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
150155
/**
151156
* Transactions should be queued in the DBAdapter, but we also want to prevent
@@ -260,16 +265,30 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
260265
}
261266

262267
/**
268+
* Wait for the first sync operation to complete.
269+
*
270+
* @argument request Either an abort signal (after which the promise will complete regardless of
271+
* whether a full sync was completed) or an object providing an abort signal and a priority target.
272+
* When a priority target is set, the promise may complete when all buckets with the given (or higher)
273+
* priorities have been synchronized. This can be earlier than a complete sync.
263274
* @returns A promise which will resolve once the first full sync has completed.
264275
*/
265-
async waitForFirstSync(signal?: AbortSignal): Promise<void> {
266-
if (this.currentStatus.hasSynced) {
276+
async waitForFirstSync(request?: AbortSignal | { signal?: AbortSignal; priority?: number }): Promise<void> {
277+
const signal = request instanceof AbortSignal ? request : request?.signal;
278+
const priority = request && 'priority' in request ? request.priority : undefined;
279+
280+
const statusMatches =
281+
priority === undefined
282+
? (status: SyncStatus) => status.hasSynced
283+
: (status: SyncStatus) => status.statusForPriority(priority).hasSynced;
284+
285+
if (statusMatches(this.currentStatus)) {
267286
return;
268287
}
269288
return new Promise((resolve) => {
270289
const dispose = this.registerListener({
271290
statusChanged: (status) => {
272-
if (status.hasSynced) {
291+
if (statusMatches(status)) {
273292
dispose();
274293
resolve();
275294
}
@@ -329,14 +348,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
329348
}
330349

331350
protected async updateHasSynced() {
332-
const result = await this.database.get<{ synced_at: string | null }>(
333-
'SELECT powersync_last_synced_at() as synced_at'
351+
const result = await this.database.getAll<{ priority: number; last_synced_at: string }>(
352+
'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC'
334353
);
335-
const hasSynced = result.synced_at != null;
336-
const syncedAt = result.synced_at != null ? new Date(result.synced_at! + 'Z') : undefined;
354+
let lastCompleteSync: Date | undefined;
355+
const priorityStatuses: SyncPriorityStatus[] = [];
356+
357+
for (const { priority, last_synced_at } of result) {
358+
const parsedDate = new Date(last_synced_at + 'Z');
359+
360+
if (priority == FULL_SYNC_PRIORITY) {
361+
// This lowest-possible priority represents a complete sync.
362+
lastCompleteSync = parsedDate;
363+
} else {
364+
priorityStatuses.push({ priority, hasSynced: true, lastSyncedAt: parsedDate });
365+
}
366+
}
367+
368+
const hasSynced = lastCompleteSync != null;
369+
const updatedStatus = new SyncStatus({
370+
...this.currentStatus.toJSON(),
371+
hasSynced,
372+
priorityStatuses,
373+
lastSyncedAt: lastCompleteSync
374+
});
337375

338-
if (hasSynced != this.currentStatus.hasSynced) {
339-
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced, lastSyncedAt: syncedAt });
376+
if (!updatedStatus.isEqual(this.currentStatus)) {
377+
this.currentStatus = updatedStatus;
340378
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
341379
}
342380
}
@@ -379,7 +417,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
379417
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
380418
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
381419
return {
382-
retryDelayMs: options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
420+
retryDelayMs:
421+
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
383422
crudUploadThrottleMs:
384423
options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs ?? DEFAULT_CRUD_UPLOAD_THROTTLE_MS
385424
};
@@ -401,7 +440,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
401440

402441
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
403442
retryDelayMs,
404-
crudUploadThrottleMs,
443+
crudUploadThrottleMs
405444
});
406445
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
407446
statusChanged: (status) => {

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ import { CrudBatch } from './CrudBatch.js';
33
import { CrudEntry, OpId } from './CrudEntry.js';
44
import { SyncDataBatch } from './SyncDataBatch.js';
55

6+
export interface BucketDescription {
7+
name: string;
8+
priority: number;
9+
}
10+
611
export interface Checkpoint {
712
last_op_id: OpId;
813
buckets: BucketChecksum[];
@@ -27,6 +32,7 @@ export interface SyncLocalDatabaseResult {
2732

2833
export interface BucketChecksum {
2934
bucket: string;
35+
priority?: number;
3036
/**
3137
* 32-bit unsigned hash.
3238
*/
@@ -60,7 +66,10 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
6066

6167
getBucketStates(): Promise<BucketState[]>;
6268

63-
syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
69+
syncLocalDatabase(
70+
checkpoint: Checkpoint,
71+
priority?: number
72+
): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
6473

6574
nextCrudItem(): Promise<CrudEntry | undefined>;
6675
hasCrud(): Promise<boolean>;

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

+39-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapt
44
import { BaseObserver } from '../../../utils/BaseObserver.js';
55
import { MAX_OP_ID } from '../../constants.js';
66
import {
7+
BucketChecksum,
78
BucketState,
89
BucketStorageAdapter,
910
BucketStorageListener,
@@ -135,8 +136,8 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
135136
return completed;
136137
}
137138

138-
async syncLocalDatabase(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
139-
const r = await this.validateChecksums(checkpoint);
139+
async syncLocalDatabase(checkpoint: Checkpoint, priority?: number): Promise<SyncLocalDatabaseResult> {
140+
const r = await this.validateChecksums(checkpoint, priority);
140141
if (!r.checkpointValid) {
141142
this.logger.error('Checksums failed for', r.checkpointFailures);
142143
for (const b of r.checkpointFailures ?? []) {
@@ -145,19 +146,23 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
145146
return { ready: false, checkpointValid: false, checkpointFailures: r.checkpointFailures };
146147
}
147148

148-
const bucketNames = checkpoint.buckets.map((b) => b.bucket);
149+
const buckets = checkpoint.buckets;
150+
if (priority !== undefined) {
151+
buckets.filter((b) => hasMatchingPriority(priority, b));
152+
}
153+
const bucketNames = buckets.map((b) => b.bucket);
149154
await this.writeTransaction(async (tx) => {
150155
await tx.execute(`UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))`, [
151156
checkpoint.last_op_id,
152157
JSON.stringify(bucketNames)
153158
]);
154159

155-
if (checkpoint.write_checkpoint) {
160+
if (priority == null && checkpoint.write_checkpoint) {
156161
await tx.execute("UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", [checkpoint.write_checkpoint]);
157162
}
158163
});
159164

160-
const valid = await this.updateObjectsFromBuckets(checkpoint);
165+
const valid = await this.updateObjectsFromBuckets(checkpoint, priority);
161166
if (!valid) {
162167
this.logger.debug('Not at a consistent checkpoint - cannot update local db');
163168
return { ready: false, checkpointValid: true };
@@ -176,21 +181,41 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
176181
*
177182
* This includes creating new tables, dropping old tables, and copying data over from the oplog.
178183
*/
179-
private async updateObjectsFromBuckets(checkpoint: Checkpoint) {
184+
private async updateObjectsFromBuckets(checkpoint: Checkpoint, priority: number | undefined) {
185+
let arg = '';
186+
if (priority !== undefined) {
187+
const affectedBuckets: string[] = [];
188+
for (const desc of checkpoint.buckets) {
189+
if (hasMatchingPriority(priority, desc)) {
190+
affectedBuckets.push(desc.bucket);
191+
}
192+
}
193+
194+
arg = JSON.stringify({ priority, buckets: affectedBuckets });
195+
}
196+
180197
return this.writeTransaction(async (tx) => {
181198
const { insertId: result } = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
182199
'sync_local',
183-
''
200+
arg
184201
]);
185202
return result == 1;
186203
});
187204
}
188205

189-
async validateChecksums(checkpoint: Checkpoint): Promise<SyncLocalDatabaseResult> {
190-
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [JSON.stringify(checkpoint)]);
206+
async validateChecksums(checkpoint: Checkpoint, priority: number | undefined): Promise<SyncLocalDatabaseResult> {
207+
if (priority !== undefined) {
208+
// Only validate the buckets within the priority we care about
209+
const newBuckets = checkpoint.buckets.filter((cs) => hasMatchingPriority(priority, cs));
210+
checkpoint = { ...checkpoint, buckets: newBuckets };
211+
}
212+
213+
const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [
214+
JSON.stringify({ ...checkpoint })
215+
]);
191216

192217
const resultItem = rs.rows?.item(0);
193-
this.logger.debug('validateChecksums result item', resultItem);
218+
this.logger.debug('validateChecksums priority, checkpoint, result item', priority, checkpoint, resultItem);
194219
if (!resultItem) {
195220
return {
196221
checkpointValid: false,
@@ -367,3 +392,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
367392
// No-op for now
368393
}
369394
}
395+
396+
function hasMatchingPriority(priority: number, bucket: BucketChecksum) {
397+
return bucket.priority != null && bucket.priority <= priority;
398+
}

0 commit comments

Comments
 (0)