diff --git a/.changeset/twelve-bats-repeat.md b/.changeset/twelve-bats-repeat.md new file mode 100644 index 0000000000..20c8007ac3 --- /dev/null +++ b/.changeset/twelve-bats-repeat.md @@ -0,0 +1,6 @@ +--- +"jazz-tools": patch +"cojson": patch +--- + +Resume interrupted CoValue sync on app restart (without requiring CoValues to be manually reloaded) diff --git a/.specs/unsynced-covalues-tracking/design.md b/.specs/unsynced-covalues-tracking/design.md new file mode 100644 index 0000000000..0f92be6b6c --- /dev/null +++ b/.specs/unsynced-covalues-tracking/design.md @@ -0,0 +1,338 @@ +# Design: Track Unsynced CoValues & Resume Sync + +## Overview + +This design implements automatic tracking of CoValues with unsynced changes and provides mechanisms to resume syncing them on app restart. The solution integrates with the existing sync infrastructure and provides reactive APIs for monitoring sync state. + +The core idea is to maintain a set of CoValue IDs that have pending changes not yet fully synced to all persistent server peers. This set is persisted across app restarts and used to: +1. Automatically resume syncing on startup +2. Provide efficient sync state queries +3. Enable reactive sync state subscriptions + +## Architecture / Components + +### 1. Extended StorageAPI Interface + +Extend the existing `StorageAPI` interface to include methods for tracking unsynced CoValues. + +**Location:** `packages/cojson/src/storage/types.ts` + +**New Methods:** +```typescript +export interface StorageAPI { + // ... existing methods ... + + trackCoValuesSyncState(updates: { id: RawCoID; peerId: PeerID; synced: boolean }[]): void; + getUnsyncedCoValueIDs(callback: (data: RawCoID[]) => void); + stopTrackingSyncState(id: RawCoID): void; +} +``` + +**Implementation Strategy:** +- Storage implementations will add these methods to persist the set of unsynced CoValue IDs +- For IndexedDB: Add a new object store `"unsyncedCoValues"` +- For SQLite: Add a table `unsynced_covalues` + +### 2. UnsyncedCoValuesTracker + +A new class that manages the set of unsynced CoValue IDs. + +**Location:** `packages/cojson/src/sync/UnsyncedCoValuesTracker.ts` + +**Responsibilities:** +- Maintain an in-memory Set of unsynced CoValue IDs (including peers pending sync) +- Persist the set to storage using `StorageAPI.trackCoValuesSyncState` if available. Persistence is batched and performed asynchronously, to avoid the cost of N extra storage writes per update (where N is the number of peers). +- Load persisted unsynced CoValues on initialization using `StorageAPI.getUnsyncedCoValueIDs` + `LocalNode.loadCoValueCore` if storage is available +- Notify listeners when the set changes + +**Key Methods:** +- `constructor(getStorage: () => StorageAPI | undefined)`: Initialize with storage getter for persistence +- `add(id: RawCoID, peerId: PeerID)`: Add a CoValue to the unsynced set (queues for batched persistence) +- `remove(id: RawCoID, peerId: PeerID)`: Remove a CoValue from the unsynced set (queues for batched persistence) +- `getAll()`: Returns all unsynced CoValue IDs +- `isAllSynced()`: Check if all CoValues are synced (O(1), returns `size() === 0`) +- `private flush()`: Flush all pending persistence operations in a batch +- `subscribe(id: RawCoID, listener: (synced: boolean) => void)`: Subscribe to changes in whether a CoValue is synced +- `subscribe(listener: (synced: boolean) => void)`: Subscribe to changes in whether all CoValues are synced + +### 3. Integration with SyncManager + +**Location:** `packages/cojson/src/sync.ts` + +**Changes:** +- Add `unsyncedTracker: UnsyncedCoValuesTracker` property to `SyncManager` +- Initialize tracker in `SyncManager` constructor: `new UnsyncedCoValuesTracker(local.storage, this)` +- Update `syncContent()` method to keep track of unsynced CoValues created locally: + ```ts + syncContent(content: NewContentMessage) { + const coValue = this.local.getCoValue(content.id); + + this.storeContent(content); + + this.trackSyncState(coValue.id); + + // ... + } + + trackSyncState(coValueId: RawCoID): void { + for (const peer of this.getPersistentServerPeers()) { + this.unsyncedTracker.add(coValueId, peer.id); + + const unsubscribe = this.syncState.subscribeToPeerUpdates( + coValueId, + peer.id, + (_knownState, syncState) => { + if (syncState.uploaded) { + this.unsyncedTracker.remove(coValueId, peer.id); + unsubscribe(); + } + }, + ); + } + } + ``` +- Update `handleNewContent` method to keep track of unsynced CoValues received from other peers: + ```ts + handleNewContent( + msg: NewContentMessage, + from: PeerState | "storage" | "import", + ) { + const coValue = this.local.getCoValue(msg.id); + const peer = from === "storage" || from === "import" ? undefined : from; + const sourceRole = + from === "storage" + ? "storage" + : from === "import" + ? "import" + : peer?.role; + + // ... + + if (from !== "storage" && hasNewContent) { + this.storeContent(validNewContent); + } + + if (sourceRole === "client" && hasNewContent) { + this.trackSyncState(coValue.id); + } + + // ... + } + ``` +- Add method `async resumeUnsyncedCoValues()` to load and resume syncing unsynced CoValues. This happens asynchronously and doesn't block initialization +- Call `resumeUnsyncedCoValues` as part of `SyncManager.startPeerReconciliation`. + +**`resumeUnsyncedCoValues()` implementation:** + +```typescript +async resumeUnsyncedCoValues(): Promise { + if (!this.local.storage) { + // No storage available, skip resumption + return; + } + + await new Promise((resolve, reject) => { + // Load all persisted unsynced CoValues from storage + this.local.storage!.getUnsyncedCoValueIDs((unsyncedCoValueIDs) => { + if (unsyncedCoValueIDs.length === 0) { + resolve(); + return; + } + + const BATCH_SIZE = 10; + let processed = 0; + + const processBatch = async () => { + const batch = unsyncedCoValueIDs.slice( + processed, + processed + BATCH_SIZE, + ); + + await Promise.all( + batch.map(async (coValueId) => { + try { + // Load the CoValue from storage (this will trigger sync if peers are connected) + const coValue = await this.local.loadCoValueCore(coValueId); + + if (coValue.isAvailable()) { + // CoValue was successfully loaded. Resume tracking sync state for this CoValue + // This will add it back to the tracker and set up subscriptions + this.trackSyncState(coValueId); + } else { + // CoValue not found in storage. Remove all peer entries for this CoValue + this.local.storage!.stopTrackingSyncState(coValueId); + } + } catch (error) { + // Handle errors gracefully - log but don't fail the entire resumption + logger.warn( + `Failed to resume sync for CoValue ${coValueId}:`, + error, + ); + this.local.storage!.stopTrackingSyncState(coValueId); + } + }), + ); + + processed += batch.length; + + if (processed < unsyncedCoValueIDs.length) { + // Process next batch asynchronously to avoid blocking + setTimeout(processBatch, 0); + } else { + resolve(); + } + }; + + processBatch().catch(reject); + }); + }); +} +``` + +### 4. Sync Status Subscriptions + +**Location:** `packages/cojson/src/coValueCore/coValueCore.ts` and `packages/cojson/src/sync.ts` + +**CoValueCore.subscribeToSyncState:** +- Subscribe to changes in whether this specific CoValue is synced +- Uses `syncManager.unsyncedTracker.subscribe(coValueId)` to get notified when the CoValue's sync state changes +- Calls listener immediately with current state on subscription + +**SyncManager.subscribeToSyncState:** +- Subscribe to changes in whether all CoValues are synced +- Uses `syncManager.unsyncedTracker.subscribe()` to get notified when the unsynced set changes +- Calls listener immediately with current state on subscription (check `unsyncedTracker.isAllSynced()`) + +### 5. Refactored waitForSync + +**Location:** `packages/cojson/src/coValueCore/coValueCore.ts` and `packages/cojson/src/sync.ts` + +**Changes:** +- Replace the current implementation of `CoValueCore.waitForSync()` based on CoValue subscription with a simple call to `syncManager.unsyncedTracker.subscribe(coValueId)`. + +## Data Model + +### UnsyncedCoValuesTracker In-Memory Representation + +The tracker maintains an in-memory data structure that maps each unsynced CoValue to the set of peers it's unsynced to: + +```typescript +class UnsyncedCoValuesTracker { + // Map from CoValue ID to Set of Peer IDs that the CoValue is unsynced to + private unsynced: Map> = new Map(); + private coValueListeners: Map void> = new Map(); + private globalListeners: (synced: boolean) => void = new Set(); + private storage?: StorageAPI; +} +``` + +### Storage Layout + +Storage persists unsynced CoValue-to-peer relationships as individual rows, with one row per (CoValue ID, Peer ID) pair. + +**IndexedDB Schema:** +- Object store: `"unsyncedCoValues"` +- Key: `[coValueId, peerId]` (composite key) +- Value: `{ coValueId: RawCoID, peerId: PeerID }` +- Indexes: + - Index on `coValueId` for efficient queries by CoValue + - Index on `peerId` for efficient queries by peer (optional, for cleanup) + +**SQLite Schema:** +```sql +CREATE TABLE unsynced_covalues ( + co_value_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + PRIMARY KEY (co_value_id, peer_id) +); + +CREATE INDEX idx_unsynced_covalues_co_value_id ON unsynced_covalues(co_value_id); +``` + +**Storage Operations:** +- `trackCoValuesSyncState(updates)`: + - Takes an array of operations `{ id: RawCoID, peerId: PeerID, synced: boolean }[]` + - Executes all operations in a single transaction + - For each operation: + - If `synced === true`: DELETE row where `co_value_id = id AND peer_id = peerId` + - If `synced === false`: INSERT OR REPLACE row with `(id, peerId)` +- `getUnsyncedCoValueIDs(callback)`: + - Query all distinct `co_value_id` values (SELECT DISTINCT co_value_id) + - Return array of unique CoValue IDs that have at least one unsynced peer + +**Example Storage Data:** +``` +co_value_id | peer_id +---------------|---------- +co_abc123 | peer1 +co_abc123 | peer2 +co_def456 | peer1 +``` + +This represents: +- `co_abc123` is unsynced to `peer1` and `peer2` +- `co_def456` is unsynced to `peer1` + +**Loading on Startup:** +1. Call `getUnsyncedCoValueIDs()` to get all CoValue IDs with unsynced peers +2. For each CoValue ID, query all peer IDs: `SELECT peer_id WHERE co_value_id = ?` +3. Reconstruct the in-memory Map structure +4. Load each CoValue and resume syncing + +## Error Handling / Testing Strategy + +### Error Handling + +1. **Storage Errors:** + - If persistence fails, log error but continue with in-memory tracking + - On load failure, start with empty set (graceful degradation) + - Don't block LocalNode initialization if persistence fails + +2. **Missing CoValues:** + - Handle gracefully when trying to load non-existent CoValues + +3. **Peer Connection Issues:** + - Continue tracking even when peers are disconnected + - Resume syncing when peers reconnect + +4. **Race Conditions:** + - Use atomic operations for add/remove from Set + - Ensure persistence operations don't interfere with tracking updates + +### Testing Strategy + +1. **Unit Tests:** + - Test `UnsyncedCoValuesTracker` operations + - Test persistence and loading + - Test subscription notifications + - Test sync state determination logic + +2. **Integration Tests:** + - Test integration with `SyncManager` and `SyncStateManager` + - Test that CoValues are tracked when they become unsynced + - Test that CoValues are removed when they become synced + - Test resumption on LocalNode initialization + - Test subscription APIs (`subscribeToSyncState`) + - Test refactored `waitForSync` + +3. **E2E Tests:** + - Test offline/online scenario with partial CoValue loading + - Test that unsynced CoValues are synced after app restart + +4. **Performance Tests:** + - Test with large numbers of unsynced CoValues + - Test persistence/loading performance + - Test subscription performance with many listeners + - Test polling performance in `waitForSync` + +5. **Platform Tests:** + - Test on web (IndexedDB storage) + - Test on Node.js (SQLite storage) + +### Edge Cases + +1. **Very large unsynced set** - Should handle efficiently, without a noticeable impact on app startup +2. **Rapid sync state changes** - Should debounce/throttle persistence +3. **Multiple sessions sharing storage** - IndexedDB is shared across tabs, but this shouldn't be a problem as we don't need to keep the in-memory unsynced CoValue list in sync with storage (we only use storage for persistence +across app restarts) +4. **Storage unavailable** - Should fall back to in-memory only diff --git a/.specs/unsynced-covalues-tracking/requirements.md b/.specs/unsynced-covalues-tracking/requirements.md new file mode 100644 index 0000000000..263674264d --- /dev/null +++ b/.specs/unsynced-covalues-tracking/requirements.md @@ -0,0 +1,66 @@ +# Track Unsynced CoValues & Resume Sync + +## Introduction + +Currently, Jazz only resumes sync on CoValues that are loaded in memory. This creates a problem when users partially upload a CoValue graph, go offline, restart the app and then come back online. In this scenario, only the loaded parts of the CoValue graph resume syncing, leaving unloaded parts unsynced even though they may have pending changes. + +This feature will implement automatic tracking and resumption of sync for all CoValues with pending changes, regardless of whether they are currently loaded in memory. The solution must work across all platforms (web, React Native, Node.js, cloud workers) and be performant enough to run in cloud environments. + +Tracking unsynced CoValues also allows providing reactive APIs for tracking if CoValues have been synced. This includes refactoring `waitForSync` so that we don't need to create a subscription, and adding new subscription APIs for monitoring sync state at different levels of granularity: `CoValueCore.subscribeToSyncState` and `SyncManager.subscribeToSyncState`. + +## User Stories + +### US1: Track Unsynced CoValues +**As a** Jazz user +**I want** Jazz to automatically track CoValues that have unsynced changes +**So that** these CoValues can be synced even if they're not currently loaded in memory + +**Acceptance Criteria:** +- [ ] When a CoValue has local changes that haven't been fully uploaded to at least one peer, it is tracked as unsynced +- [ ] When a CoValue becomes fully synced to all persistent server peers, it is removed from the unsynced tracking +- [ ] The tracking persists across app restarts using platform-appropriate storage + +### US2: Resume Sync on App Start +**As a** Jazz user +**I want** Jazz to automatically resume syncing all unsynced CoValues when the app starts +**So that** all pending changes are eventually synced, even if the CoValues aren't loaded after going back online + +**Acceptance Criteria:** +- [ ] On LocalNode initialization, all previously tracked unsynced CoValues are loaded and syncing is resumed +- [ ] The resumption happens asynchronously and doesn't block LocalNode initialization +- [ ] The resumption is efficient and doesn't cause performance issues when running in sync servers + +### US3: Subscribe to CoValue Sync Status +**As a** Jazz user +**I want** to subscribe to changes in a CoValue's sync state +**So that** I can reactively monitor when a CoValue becomes synced + +**Acceptance Criteria:** +- [ ] `CoValueCore.subscribeToSyncState(listener)` method subscribes to sync state changes +- [ ] The listener receives a boolean indicating if the CoValue is synced to all persistent server peers +- [ ] The method returns an unsubscribe function +- [ ] The subscription uses the unsynced CoValues tracking for efficient updates +- [ ] The listener is called immediately with the current sync state when subscribing + +### US4: Subscribe to All CoValues Sync Status +**As a** Jazz user +**I want** to subscribe to changes in whether all CoValues are synced +**So that** I can reactively determine when all pending changes have been uploaded + +**Acceptance Criteria:** +- [ ] `SyncManager.subscribeToSyncState(listener)` method subscribes to global sync state changes +- [ ] The listener receives a boolean indicating if all CoValues are synced +- [ ] The method returns an unsubscribe function +- [ ] The subscription uses the unsynced CoValues tracking for efficient implementation +- [ ] The subscription works correctly even when some CoValues are not loaded in memory +- [ ] The listener is called immediately with the current sync state when subscribing + +### US5: Refactor waitForSync Without Subscriptions +**As a** Jazz developer +**I want** `waitForSync` to work without creating subscriptions +**So that** it's simpler and more efficient + +**Acceptance Criteria:** +- [ ] `CoValueCore.waitForSync` is refactored to use unsynced CoValues tracking instead of CoValue subscriptions +- [ ] The method maintains backward compatibility with existing API +- [ ] Performance is improved compared to the subscription-based approach diff --git a/.specs/unsynced-covalues-tracking/tasks.md b/.specs/unsynced-covalues-tracking/tasks.md new file mode 100644 index 0000000000..ad4f02bfff --- /dev/null +++ b/.specs/unsynced-covalues-tracking/tasks.md @@ -0,0 +1,117 @@ +# Implementation Tasks + +## Core Infrastructure + +- [x] **Task 1**: Extend `StorageAPI` interface with unsynced CoValues tracking methods (US1) + - Add `trackCoValuesSyncState(updates: { id: RawCoID; peerId: PeerID; synced: boolean }[]): void` to `StorageAPI` interface in `packages/cojson/src/storage/types.ts` + - Add `getUnsyncedCoValueIDs(callback: (data: RawCoID[]) => void)` to `StorageAPI` interface + - Add `stopTrackingSyncState(id: RawCoID): void` to `StorageAPI` interface + +- [x] **Task 2**: Implement `trackCoValuesSyncState`, `getUnsyncedCoValueIDs`, and `stopTrackingSyncState` for IndexedDB storage (US1) + - Add new object store `"unsyncedCoValues"` in IndexedDB schema (upgrade version) + - Implement `trackCoValuesSyncState` in `StorageApiAsync` to batch upsert/delete records in a single transaction + - Implement `getUnsyncedCoValueIDs` to query all unsynced CoValue IDs + - Implement `stopTrackingSyncState` to delete all records for a CoValue ID + - Update `packages/cojson-storage-indexeddb/src/idbNode.ts` for schema migration + +- [x] **Task 3**: Implement `trackCoValuesSyncState`, `getUnsyncedCoValueIDs`, and `stopTrackingSyncState` for SQLite storage (US1) + - Add `unsynced_covalues` table to SQLite schema + - Implement `trackCoValuesSyncState` in `StorageApiAsync` and `StorageApiSync` for SQLite (batch operations in transaction) + - Implement `getUnsyncedCoValueIDs` in `StorageApiAsync` and `StorageApiSync` for SQLite + - Implement `stopTrackingSyncState` in `StorageApiAsync` and `StorageApiSync` for SQLite + - Update SQLite client implementations in `packages/cojson/src/storage/sqlite/` and `packages/cojson/src/storage/sqliteAsync/` + +- [x] **Task 4**: Create `UnsyncedCoValuesTracker` class (US1, US3, US4) + - Create `packages/cojson/src/UnsyncedCoValuesTracker.ts` + - Implement in-memory `Map>` for tracking unsynced CoValues per peer + - Implement `add(id, peerId)`, `remove(id, peerId)`, `getAll()`, `isAllSynced()` methods + - Implement batched persistence using `StorageAPI.trackCoValuesSyncState` with automatic flushing (1s delay or on shutdown) + - Implement `flush()` method to immediately persist pending operations + - Implement `subscribe(id, listener)` for per-CoValue subscriptions + - Implement `subscribe(listener)` for global "all synced" subscriptions + - Handle storage errors gracefully (fallback to in-memory only) + +## Integration with Sync System + +- [x] **Task 5**: Integrate `UnsyncedCoValuesTracker` into `SyncManager` (US1) + - Add `unsyncedTracker` property to `SyncManager` class + - Initialize tracker in `SyncManager` constructor: `new UnsyncedCoValuesTracker(local.storage, this)` + - Create `trackSyncState(coValueId)` helper method that: + - Iterates through all persistent server peers using `getPersistentServerPeers()` + - Calls `unsyncedTracker.add(coValueId, peer.id)` for each peer + - Subscribes to `syncState.subscribeToPeerUpdates()` for each peer to remove from tracker when synced + - Handles unsubscribe cleanup when CoValue becomes synced to a peer + - Update `syncContent()` method to call `trackSyncState(coValue.id)` after storing content + - Update `handleNewContent()` method to call `trackSyncState(coValue.id)` when receiving content from client peers + +- [x] **Task 6**: Implement `resumeUnsyncedCoValues()` method (US2) + - Add `async resumeUnsyncedCoValues()` method to `SyncManager` + - Load persisted unsynced CoValue IDs from storage using `storage.getUnsyncedCoValueIDs()` + - Process CoValues in batches (e.g., 10 at a time) to avoid blocking + - For each unsynced CoValue ID, load the CoValue using `local.loadCoValueCore()` + - If CoValue loads successfully, call `trackSyncState()` to resume tracking + - If CoValue fails to load or is unavailable, call `storage.stopTrackingSyncState()` to clean up + - Use `setTimeout(processBatch, 0)` to yield control between batches + - Call `resumeUnsyncedCoValues()` in `startPeerReconciliation()` method + - Ensure it runs asynchronously and doesn't block initialization + +## Subscription APIs + +- [ ] **Task 7**: Implement `CoValueCore.subscribeToSyncState()` (US3) + - Add `subscribeToSyncState(listener)` method to `CoValueCore` class + - Use `syncManager.unsyncedTracker.subscribe(this.id, listener)` internally + - Call listener immediately with current state (check if CoValue ID is in `unsyncedTracker.getAll()`) + - Return unsubscribe function + - Update `packages/cojson/src/coValueCore/coValueCore.ts` + +- [ ] **Task 8**: Implement `SyncManager.subscribeToSyncState()` (US4) + - Add `subscribeToSyncState(listener)` method to `SyncManager` class + - Use `unsyncedTracker.subscribe(listener)` internally + - Call listener immediately with current state (`unsyncedTracker.isAllSynced()`) + - Return unsubscribe function + - Update `packages/cojson/src/sync.ts` + +## Refactored waitForSync + +- [ ] **Task 9**: Refactor `CoValueCore.waitForSync()` to use tracker (US5) + - Replace current subscription-based implementation + - Use `syncManager.unsyncedTracker.subscribe(this.id, ...)` instead + - Maintain backward compatibility with existing API signature + - Support timeout option if needed + - Update `packages/cojson/src/coValueCore/coValueCore.ts` + +## Testing + +- [ ] **Task 10**: Write unit tests for `UnsyncedCoValuesTracker` (US1, US3, US4) + - Test `add(id, peerId)`, `remove(id, peerId)`, `getAll()`, `isAllSynced()` operations + - Test batched persistence using `StorageAPI.trackCoValuesSyncState` + - Test automatic flushing after delay + - Test `flush()` method for immediate persistence + - Test subscription notifications (both per-CoValue and global) + - Test that listeners are called immediately with current state on subscription + - Test error handling when storage is unavailable (fallback to in-memory only) + - Create `packages/cojson/src/__tests__/UnsyncedCoValuesTracker.test.ts` + +- [ ] **Task 11**: Write integration tests for sync tracking (US1, US2) + - Test that CoValues are tracked when `syncContent()` is called + - Test that CoValues are removed when they become synced to all peers + - Test `resumeUnsyncedCoValues()` loads and resumes syncing + - Test integration with `SyncStateManager` + - Create/update integration tests in `packages/cojson/src/tests/` + +- [ ] **Task 12**: Write tests for subscription APIs (US3, US4) + - Test `CoValueCore.subscribeToSyncState()` notifies on status changes + - Test `SyncManager.subscribeToSyncState()` notifies when all synced + - Test immediate callback with current state on subscription + - Test unsubscribe functionality + +- [ ] **Task 13**: Write tests for refactored `waitForSync` (US5) + - Test that `waitForSync()` resolves when CoValue becomes synced + - Test timeout handling if applicable + - Test backward compatibility with existing usage + +- [ ] **Task 14**: Write E2E test for offline/online scenario (US1, US2) + - Test scenario: user goes offline, makes changes, closes app, reopens app + - Verify unsynced CoValues are tracked and resumed on restart + - Test with partial CoValue loading + - Update or create E2E test in `tests/e2e/` diff --git a/packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts b/packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts index a6072d99cf..3b081b0ff4 100644 --- a/packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts +++ b/packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts @@ -2,13 +2,22 @@ export type StoreName = | "coValues" | "sessions" | "transactions" - | "signatureAfter"; - -// A access unit for the IndexedDB Jazz database -// It's a wrapper around the IDBTransaction object that helps on batching multiple operations -// in a single transaction. + | "signatureAfter" + | "unsyncedCoValues"; + +const DEFAULT_TX_STORES: StoreName[] = [ + "coValues", + "sessions", + "transactions", + "signatureAfter", +]; + +/** + * An access unit for the IndexedDB Jazz database. + * It's a wrapper around the IDBTransaction object that helps on batching multiple operations + * in a single transaction. + */ export class CoJsonIDBTransaction { - db: IDBDatabase; declare tx: IDBTransaction; pendingRequests: ((txEntry: this) => void)[] = []; @@ -20,17 +29,16 @@ export class CoJsonIDBTransaction { failed = false; done = false; - constructor(db: IDBDatabase) { - this.db = db; - + constructor( + public db: IDBDatabase, + // The object stores this transaction will operate on + private storeNames: StoreName[] = DEFAULT_TX_STORES, + ) { this.refresh(); } refresh() { - this.tx = this.db.transaction( - ["coValues", "sessions", "transactions", "signatureAfter"], - "readwrite", - ); + this.tx = this.db.transaction(this.storeNames, "readwrite"); this.tx.oncomplete = () => { this.done = true; @@ -44,12 +52,6 @@ export class CoJsonIDBTransaction { this.tx.abort(); } - startedAt = performance.now(); - isReusable() { - const delta = performance.now() - this.startedAt; - return !this.done && !this.failed && delta <= 100; - } - getObjectStore(name: StoreName) { try { return this.tx.objectStore(name); diff --git a/packages/cojson-storage-indexeddb/src/idbClient.ts b/packages/cojson-storage-indexeddb/src/idbClient.ts index cfb024d17b..8b15d6d2b5 100644 --- a/packages/cojson-storage-indexeddb/src/idbClient.ts +++ b/packages/cojson-storage-indexeddb/src/idbClient.ts @@ -17,12 +17,13 @@ import { CoJsonIDBTransaction, putIndexedDbStore, queryIndexedDbStore, + StoreName, } from "./CoJsonIDBTransaction.js"; export class IDBTransaction implements DBTransactionInterfaceAsync { constructor(private tx: CoJsonIDBTransaction) {} - run( + private async run( handler: (txEntry: CoJsonIDBTransaction) => IDBRequest, ): Promise { return this.tx.handleRequest(handler); @@ -91,6 +92,55 @@ export class IDBTransaction implements DBTransactionInterfaceAsync { }), ); } + + /** + * Get an unsynced CoValue record by coValueId and peerId. + */ + async getUnsyncedCoValueRecord( + coValueId: RawCoID, + peerId: string, + ): Promise< + { rowID: number; coValueId: RawCoID; peerId: string } | undefined + > { + return this.run((tx) => + tx + .getObjectStore("unsyncedCoValues") + .index("uniqueUnsyncedCoValues") + .get([coValueId, peerId]), + ); + } + + /** + * Get all unsynced CoValue records for a given coValueId. + */ + async getAllUnsyncedCoValueRecords( + coValueId: RawCoID, + ): Promise<{ rowID: number; coValueId: RawCoID; peerId: string }[]> { + return this.run((tx) => + tx + .getObjectStore("unsyncedCoValues") + .index("byCoValueId") + .getAll(coValueId), + ); + } + + /** + * Delete an unsynced CoValue record by rowID. + */ + async deleteUnsyncedCoValueRecord(rowID: number): Promise { + await this.run((tx) => tx.getObjectStore("unsyncedCoValues").delete(rowID)); + } + + /** + * Insert or update an unsynced CoValue record. + */ + async putUnsyncedCoValueRecord(record: { + rowID?: number; + coValueId: RawCoID; + peerId: string; + }): Promise { + await this.run((tx) => tx.getObjectStore("unsyncedCoValues").put(record)); + } } export class IDBClient implements DBClientInterfaceAsync { @@ -161,8 +211,9 @@ export class IDBClient implements DBClientInterfaceAsync { async transaction( operationsCallback: (tx: DBTransactionInterfaceAsync) => Promise, + storeNames?: StoreName[], ) { - const tx = new CoJsonIDBTransaction(this.db); + const tx = new CoJsonIDBTransaction(this.db, storeNames); try { await operationsCallback(new IDBTransaction(tx)); @@ -171,4 +222,73 @@ export class IDBClient implements DBClientInterfaceAsync { tx.rollback(); } } + + async trackCoValuesSyncState( + updates: { id: RawCoID; peerId: string; synced: boolean }[], + ): Promise { + if (updates.length === 0) { + return; + } + + await this.transaction( + async (tx) => { + const idbTx = tx as IDBTransaction; + await Promise.all( + updates.map(async (update) => { + const record = await idbTx.getUnsyncedCoValueRecord( + update.id, + update.peerId, + ); + if (update.synced) { + // Delete + if (record) { + await idbTx.deleteUnsyncedCoValueRecord(record.rowID); + } + } else { + // Insert or update + await idbTx.putUnsyncedCoValueRecord( + record + ? { + rowID: record.rowID, + coValueId: update.id, + peerId: update.peerId, + } + : { + coValueId: update.id, + peerId: update.peerId, + }, + ); + } + }), + ); + }, + ["unsyncedCoValues"], + ); + } + + async getUnsyncedCoValueIDs(): Promise { + const records = await queryIndexedDbStore< + { rowID: number; coValueId: RawCoID; peerId: string }[] + >(this.db, "unsyncedCoValues", (store) => store.getAll()); + const uniqueIds = new Set(); + for (const record of records) { + uniqueIds.add(record.coValueId); + } + return Array.from(uniqueIds); + } + + async stopTrackingSyncState(id: RawCoID): Promise { + await this.transaction( + async (tx) => { + const idbTx = tx as IDBTransaction; + const records = await idbTx.getAllUnsyncedCoValueRecords(id); + await Promise.all( + records.map((record) => + idbTx.deleteUnsyncedCoValueRecord(record.rowID), + ), + ); + }, + ["unsyncedCoValues"], + ); + } } diff --git a/packages/cojson-storage-indexeddb/src/idbNode.ts b/packages/cojson-storage-indexeddb/src/idbNode.ts index cac1af0cc4..381be09980 100644 --- a/packages/cojson-storage-indexeddb/src/idbNode.ts +++ b/packages/cojson-storage-indexeddb/src/idbNode.ts @@ -9,7 +9,7 @@ export function internal_setDatabaseName(name: string) { export async function getIndexedDBStorage(name = DATABASE_NAME) { const dbPromise = new Promise((resolve, reject) => { - const request = indexedDB.open(name, 4); + const request = indexedDB.open(name, 5); request.onerror = () => { reject(request.error); }; @@ -47,6 +47,20 @@ export async function getIndexedDBStorage(name = DATABASE_NAME) { keyPath: ["ses", "idx"], }); } + if (ev.oldVersion <= 4) { + const unsyncedCoValues = db.createObjectStore("unsyncedCoValues", { + autoIncrement: true, + keyPath: "rowID", + }); + unsyncedCoValues.createIndex("byCoValueId", "coValueId"); + unsyncedCoValues.createIndex( + "uniqueUnsyncedCoValues", + ["coValueId", "peerId"], + { + unique: true, + }, + ); + } }; }); diff --git a/packages/cojson-storage-indexeddb/src/tests/CoJsonIDBTransaction.test.ts b/packages/cojson-storage-indexeddb/src/tests/CoJsonIDBTransaction.test.ts index cd1887b852..85ad37b7f3 100644 --- a/packages/cojson-storage-indexeddb/src/tests/CoJsonIDBTransaction.test.ts +++ b/packages/cojson-storage-indexeddb/src/tests/CoJsonIDBTransaction.test.ts @@ -23,6 +23,17 @@ describe("CoJsonIDBTransaction", () => { }); db.createObjectStore("transactions", { keyPath: "id" }); db.createObjectStore("signatureAfter", { keyPath: "id" }); + const unsyncedCoValues = db.createObjectStore("unsyncedCoValues", { + keyPath: "rowID", + }); + unsyncedCoValues.createIndex("byCoValueId", "coValueId"); + unsyncedCoValues.createIndex( + "uniqueUnsyncedCoValues", + ["coValueId", "peerId"], + { + unique: true, + }, + ); }; request.onsuccess = () => { @@ -167,4 +178,80 @@ describe("CoJsonIDBTransaction", () => { expect(badTx.failed).toBe(true); }); + + test("transaction with custom stores only includes specified stores", async () => { + const tx = new CoJsonIDBTransaction(db, ["coValues", "sessions"]); + + // Should work with included stores + await tx.handleRequest((tx) => + tx.getObjectStore("coValues").put({ + id: "test1", + value: "hello", + }), + ); + + await tx.handleRequest((tx) => + tx.getObjectStore("sessions").put({ + id: "session1", + data: "session data", + }), + ); + + // Should fail when trying to access a store not included in transaction + await expect( + tx.handleRequest((tx) => + tx.getObjectStore("transactions").put({ + id: "tx1", + data: "tx data", + }), + ), + ).rejects.toThrow( + "Failed to execute 'objectStore' on 'IDBTransaction': The specified object store was not found.", + ); + }); + + test("if no custom stores are provided, transaction uses default stores", async () => { + const tx = new CoJsonIDBTransaction(db); + + await tx.handleRequest((tx) => + tx.getObjectStore("coValues").put({ + id: "test1", + value: "hello", + }), + ); + + await tx.handleRequest((tx) => + tx.getObjectStore("sessions").put({ + id: "session1", + data: "session data", + }), + ); + + await tx.handleRequest((tx) => + tx.getObjectStore("transactions").put({ + id: "tx1", + data: "tx data", + }), + ); + + await tx.handleRequest((tx) => + tx.getObjectStore("signatureAfter").put({ + id: "sig1", + data: "sig data", + }), + ); + + // Should fail when trying to access unsyncedCoValues (not in default) + await expect( + tx.handleRequest((tx) => + tx.getObjectStore("unsyncedCoValues").put({ + rowID: 1, + coValueId: "coValue1", + peerId: "peer1", + }), + ), + ).rejects.toThrow( + "Failed to execute 'objectStore' on 'IDBTransaction': The specified object store was not found.", + ); + }); }); diff --git a/packages/cojson-storage-indexeddb/src/tests/storage.indexeddb.test.ts b/packages/cojson-storage-indexeddb/src/tests/storage.indexeddb.test.ts index 0209f9ea18..d28cf34b5c 100644 --- a/packages/cojson-storage-indexeddb/src/tests/storage.indexeddb.test.ts +++ b/packages/cojson-storage-indexeddb/src/tests/storage.indexeddb.test.ts @@ -1,29 +1,37 @@ -import { LocalNode, StorageApiAsync } from "cojson"; +import { LocalNode, StorageApiAsync, cojsonInternals } from "cojson"; import { WasmCrypto } from "cojson/crypto/WasmCrypto"; -import { afterEach, beforeEach, expect, test, vi } from "vitest"; -import { getIndexedDBStorage } from "../index.js"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { getIndexedDBStorage, internal_setDatabaseName } from "../index.js"; import { toSimplifiedMessages } from "./messagesTestUtils.js"; -import { trackMessages, waitFor } from "./testUtils.js"; +import { + clearObjectStore, + connectToSyncServer, + createTestNode, + trackMessages, + waitFor, +} from "./testUtils.js"; const Crypto = await WasmCrypto.create(); let syncMessages: ReturnType; +const DATABASE_NAME = "jazz-storage"; +internal_setDatabaseName(DATABASE_NAME); + beforeEach(() => { syncMessages = trackMessages(); + cojsonInternals.setSyncStateTrackingBatchDelay(0); + cojsonInternals.setCoValueLoadingRetryDelay(10); }); -afterEach(() => { +afterEach(async () => { syncMessages.restore(); + cojsonInternals.setSyncStateTrackingBatchDelay(1000); + + await clearObjectStore(DATABASE_NAME, "unsyncedCoValues"); }); test("should sync and load data from storage", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); const group = node1.createGroup(); @@ -51,12 +59,7 @@ test("should sync and load data from storage", async () => { node1.gracefulShutdown(); syncMessages.clear(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); - + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); const map2 = await node2.load(map.id); @@ -84,13 +87,7 @@ test("should sync and load data from storage", async () => { }); test("should send an empty content message if there is no content", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); @@ -117,11 +114,7 @@ test("should send an empty content message if there is no content", async () => syncMessages.clear(); node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); @@ -148,13 +141,7 @@ test("should send an empty content message if there is no content", async () => }); test("should load dependencies correctly (group inheritance)", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); const group = node1.createGroup(); @@ -189,11 +176,7 @@ test("should load dependencies correctly (group inheritance)", async () => { syncMessages.clear(); node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); @@ -223,13 +206,7 @@ test("should load dependencies correctly (group inheritance)", async () => { }); test("should not send the same dependency value twice", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); @@ -250,11 +227,7 @@ test("should not send the same dependency value twice", async () => { syncMessages.clear(); node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); @@ -289,13 +262,7 @@ test("should not send the same dependency value twice", async () => { }); test("should recover from data loss", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); const storage = await getIndexedDBStorage(); node1.setStorage(storage); @@ -347,11 +314,7 @@ test("should recover from data loss", async () => { syncMessages.clear(); node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); @@ -386,13 +349,7 @@ test("should recover from data loss", async () => { }); test("should sync multiple sessions in a single content message", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); @@ -406,11 +363,7 @@ test("should sync multiple sessions in a single content message", async () => { node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); node2.setStorage(await getIndexedDBStorage()); @@ -427,11 +380,7 @@ test("should sync multiple sessions in a single content message", async () => { node2.gracefulShutdown(); - const node3 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node3 = createTestNode({ secret: node1.agentSecret }); syncMessages.clear(); @@ -462,13 +411,7 @@ test("should sync multiple sessions in a single content message", async () => { }); test("large coValue upload streaming", async () => { - const agentSecret = Crypto.newRandomAgentSecret(); - - const node1 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node1 = createTestNode(); node1.setStorage(await getIndexedDBStorage()); @@ -494,11 +437,7 @@ test("large coValue upload streaming", async () => { node1.gracefulShutdown(); - const node2 = new LocalNode( - agentSecret, - Crypto.newRandomSessionID(Crypto.getAgentID(agentSecret)), - Crypto, - ); + const node2 = createTestNode({ secret: node1.agentSecret }); syncMessages.clear(); @@ -603,3 +542,115 @@ test("should sync and load accounts from storage", async () => { expect(node2.getCoValue(accountID).isAvailable()).toBeTruthy(); }); + +describe("sync state persistence", () => { + test("unsynced coValues are asynchronously persisted to storage", async () => { + // Client is not connected to a sync server, so sync will not be completed + const client = createTestNode(); + client.setStorage(await getIndexedDBStorage()); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 500)); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + + await client.gracefulShutdown(); + }); + + test("synced coValues are removed from storage", async () => { + const syncServer = createTestNode(); + const client = createTestNode(); + client.setStorage(await getIndexedDBStorage()); + + connectToSyncServer(client, syncServer); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait enough time for the coValue to be synced + await new Promise((resolve) => setTimeout(resolve, 500)); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(0); + expect(client.syncManager.unsyncedTracker.has(map.id)).toBe(false); + + await client.gracefulShutdown(); + await syncServer.gracefulShutdown(); + }); + + test("unsynced coValues are persisted to storage when the node is shutdown", async () => { + const client = createTestNode(); + client.setStorage(await getIndexedDBStorage()); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for local transaction to trigger sync + await new Promise((resolve) => queueMicrotask(resolve)); + + await client.gracefulShutdown(); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + }); +}); + +describe("sync resumption", () => { + test("unsynced coValues are resumed when the node is restarted", async () => { + // Client is not connected to a sync server, so sync will not be completed + const node1 = createTestNode(); + const storage = await getIndexedDBStorage(); + node1.setStorage(storage); + + const getUnsyncedCoValueIDsFromStorage = async () => + new Promise((resolve) => + node1.storage?.getUnsyncedCoValueIDs(resolve), + ); + + const group = node1.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + const unsyncedTracker = node1.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + expect(await getUnsyncedCoValueIDsFromStorage()).toHaveLength(2); + + node1.gracefulShutdown(); + + // Create second node with the same storage + const node2 = createTestNode(); + node2.setStorage(storage); + + // Connect to sync server + const syncServer = createTestNode(); + connectToSyncServer(node2, syncServer); + + await node2.syncManager.waitForAllCoValuesSync(); + // Wait for sync to resume & complete + await waitFor( + async () => (await getUnsyncedCoValueIDsFromStorage()).length === 0, + ); + + await node2.gracefulShutdown(); + }); +}); diff --git a/packages/cojson-storage-indexeddb/src/tests/testUtils.ts b/packages/cojson-storage-indexeddb/src/tests/testUtils.ts index 1b9b6afa1c..6bcbfc5f55 100644 --- a/packages/cojson-storage-indexeddb/src/tests/testUtils.ts +++ b/packages/cojson-storage-indexeddb/src/tests/testUtils.ts @@ -1,5 +1,11 @@ -import type { RawCoID, SyncMessage } from "cojson"; -import { StorageApiAsync } from "cojson"; +import type { AgentSecret, RawCoID, SessionID, SyncMessage } from "cojson"; +import { + cojsonInternals, + ControlledAgent, + LocalNode, + StorageApiAsync, +} from "cojson"; +import { WasmCrypto } from "cojson/crypto/WasmCrypto"; import { onTestFinished } from "vitest"; export function trackMessages() { @@ -115,3 +121,78 @@ export function waitFor( }, 100); }); } + +export async function clearObjectStore( + dbName: string, + storeName: string, +): Promise { + return new Promise((resolve, reject) => { + const openReq = indexedDB.open(dbName); + + openReq.onerror = () => reject(openReq.error); + + openReq.onsuccess = () => { + const db = openReq.result; + + if (!db.objectStoreNames.contains(storeName)) { + db.close(); + resolve(); + return; + } + + const tx = db.transaction(storeName, "readwrite"); + const store = tx.objectStore(storeName); + + const clearReq = store.clear(); + + clearReq.onerror = () => reject(clearReq.error); + + tx.oncomplete = () => { + db.close(); + resolve(); + }; + + tx.onerror = () => { + db.close(); + reject(tx.error); + }; + + tx.onabort = () => { + db.close(); + reject(tx.error || new Error("Transaction aborted")); + }; + }; + }); +} + +const Crypto = await WasmCrypto.create(); + +export function getAgentAndSessionID( + secret: AgentSecret = Crypto.newRandomAgentSecret(), +): [ControlledAgent, SessionID] { + const sessionID = Crypto.newRandomSessionID(Crypto.getAgentID(secret)); + return [new ControlledAgent(secret, Crypto), sessionID]; +} + +export function createTestNode(opts?: { secret?: AgentSecret }) { + const [admin, session] = getAgentAndSessionID(opts?.secret); + return new LocalNode(admin.agentSecret, session, Crypto); +} + +export function connectToSyncServer( + client: LocalNode, + syncServer: LocalNode, +): void { + const [clientPeer, serverPeer] = cojsonInternals.connectedPeers( + client.currentSessionID, + syncServer.currentSessionID, + { + peer1role: "client", + peer2role: "server", + persistent: true, + }, + ); + + client.syncManager.addPeer(serverPeer); + syncServer.syncManager.addPeer(clientPeer); +} diff --git a/packages/cojson-storage-indexeddb/vitest.config.ts b/packages/cojson-storage-indexeddb/vitest.config.ts index 224a1ebe34..4e28c3f4ee 100644 --- a/packages/cojson-storage-indexeddb/vitest.config.ts +++ b/packages/cojson-storage-indexeddb/vitest.config.ts @@ -7,7 +7,12 @@ export default defineProject({ browser: { enabled: true, provider: playwright(), - instances: [{ browser: "chromium", headless: true }], + instances: [ + { + headless: process.env.HEADLESS !== "false", + browser: "chromium", + }, + ], }, include: ["src/**/*.test.ts"], }, diff --git a/packages/cojson/src/PeerState.ts b/packages/cojson/src/PeerState.ts index e407e64dee..6133403242 100644 --- a/packages/cojson/src/PeerState.ts +++ b/packages/cojson/src/PeerState.ts @@ -1,5 +1,5 @@ import { PeerKnownState } from "./coValueCore/PeerKnownState.js"; -import { RawCoID, SessionID } from "./ids.js"; +import { RawCoID } from "./ids.js"; import { CoValueKnownState } from "./knownState.js"; import { logger } from "./logger.js"; import { Peer, SyncMessage } from "./sync.js"; @@ -181,7 +181,7 @@ export class PeerState { this.closeListeners.clear(); } - gracefulShutdown() { + gracefulShutdown(): void { if (this.closed) { return; } diff --git a/packages/cojson/src/SyncStateManager.ts b/packages/cojson/src/SyncStateManager.ts index 32ed6576a3..a3fcc1906c 100644 --- a/packages/cojson/src/SyncStateManager.ts +++ b/packages/cojson/src/SyncStateManager.ts @@ -16,6 +16,8 @@ export type GlobalSyncStateListenerCallback = ( sync: SyncState, ) => void; +export type CoValueSyncStateListenerCallback = GlobalSyncStateListenerCallback; + export type PeerSyncStateListenerCallback = ( knownState: CoValueKnownState, sync: SyncState, @@ -25,9 +27,13 @@ export class SyncStateManager { constructor(private syncManager: SyncManager) {} private listeners = new Set(); - private listenersByPeers = new Map< + private listenersByCoValues = new Map< + RawCoID, + Set + >(); + private listenersByPeersAndCoValues = new Map< PeerID, - Set + Map> >(); subscribeToUpdates(listener: GlobalSyncStateListenerCallback) { @@ -38,28 +44,67 @@ export class SyncStateManager { }; } + subscribeToCoValueUpdates( + coValueId: RawCoID, + listener: CoValueSyncStateListenerCallback, + ) { + let listeners = this.listenersByCoValues.get(coValueId); + if (!listeners) { + listeners = new Set(); + this.listenersByCoValues.set(coValueId, listeners); + } + listeners.add(listener); + + return () => { + listeners.delete(listener); + if (listeners.size === 0) { + this.listenersByCoValues.delete(coValueId); + } + }; + } + subscribeToPeerUpdates( peerId: PeerID, + coValueId: RawCoID, listener: PeerSyncStateListenerCallback, ) { - const listeners = this.listenersByPeers.get(peerId) ?? new Set(); + let peerMap = this.listenersByPeersAndCoValues.get(peerId); + if (!peerMap) { + peerMap = new Map(); + this.listenersByPeersAndCoValues.set(peerId, peerMap); + } - if (listeners.size === 0) { - this.listenersByPeers.set(peerId, listeners); + let listeners = peerMap.get(coValueId); + if (!listeners) { + listeners = new Set(); + peerMap.set(coValueId, listeners); } listeners.add(listener); return () => { listeners.delete(listener); + if (listeners.size === 0) { + peerMap.delete(coValueId); + if (peerMap.size === 0) { + this.listenersByPeersAndCoValues.delete(peerId); + } + } }; } triggerUpdate(peerId: PeerID, id: RawCoID, knownState: CoValueKnownState) { - const peerListeners = this.listenersByPeers.get(peerId); - - // If we don't have any active listeners do nothing - if (!peerListeners?.size && !this.listeners.size) { + const globalListeners = this.listeners; + const coValueListeners = this.listenersByCoValues.get(id); + const peerMap = this.listenersByPeersAndCoValues.get(peerId); + const coValueAndPeerListeners = peerMap?.get(id); + + if ( + !globalListeners.size && + !coValueListeners?.size && + !coValueAndPeerListeners?.size + ) { + // If we don't have any active listeners do nothing return; } @@ -71,10 +116,16 @@ export class SyncStateManager { listener(peerId, knownState, syncState); } - if (!peerListeners) return; + if (coValueListeners) { + for (const listener of coValueListeners) { + listener(peerId, knownState, syncState); + } + } - for (const listener of peerListeners) { - listener(knownState, syncState); + if (coValueAndPeerListeners) { + for (const listener of coValueAndPeerListeners) { + listener(knownState, syncState); + } } } diff --git a/packages/cojson/src/UnsyncedCoValuesTracker.ts b/packages/cojson/src/UnsyncedCoValuesTracker.ts new file mode 100644 index 0000000000..4f0c7be24f --- /dev/null +++ b/packages/cojson/src/UnsyncedCoValuesTracker.ts @@ -0,0 +1,272 @@ +import type { RawCoID } from "./ids.js"; +import { logger } from "./logger.js"; +import type { PeerID } from "./sync.js"; +import type { StorageAPI } from "./storage/types.js"; + +/** + * Used to track a CoValue that hasn't been synced to any peer, + * because none is currently connected. + */ +const ANY_PEER_ID: PeerID = "any"; + +// Flush pending updates to storage after 200ms +let BATCH_DELAY_MS = 200; + +/** + * Set the delay for flushing pending sync state updates to storage. + * @internal + */ +export function setSyncStateTrackingBatchDelay(delay: number): void { + BATCH_DELAY_MS = delay; +} + +type PendingUpdate = { + id: RawCoID; + peerId: PeerID; + synced: boolean; +}; + +/** + * Tracks CoValues that have unsynced changes to specific peers. + * Maintains an in-memory map and periodically persists to storage. + */ +export class UnsyncedCoValuesTracker { + private unsynced: Map> = new Map(); + private coValueListeners: Map void>> = + new Map(); + // Listeners for global "all synced" status changes + private globalListeners: Set<(synced: boolean) => void> = new Set(); + + // Pending updates to be persisted + private pendingUpdates: PendingUpdate[] = []; + private flushTimer: ReturnType | undefined; + + private storage?: StorageAPI; + + /** + * Add a CoValue as unsynced to a specific peer. + * Triggers persistence if storage is available. + * @returns true if the CoValue was already tracked, false otherwise. + */ + add(id: RawCoID, peerId: PeerID = ANY_PEER_ID): boolean { + if (!this.unsynced.has(id)) { + this.unsynced.set(id, new Set()); + } + const peerSet = this.unsynced.get(id)!; + + const alreadyTracked = peerSet.has(peerId); + if (!alreadyTracked) { + // Only update if this is a new peer + peerSet.add(peerId); + + this.schedulePersist(id, peerId, false); + + this.notifyCoValueListeners(id, false); + this.notifyGlobalListeners(false); + } + + return alreadyTracked; + } + + /** + * Remove a CoValue from being unsynced to a specific peer. + * Triggers persistence if storage is available. + */ + remove(id: RawCoID, peerId: PeerID = ANY_PEER_ID): void { + const peerSet = this.unsynced.get(id); + if (!peerSet || !peerSet.has(peerId)) { + return; + } + + peerSet.delete(peerId); + + // If no more unsynced peers for this CoValue, remove the entry + if (peerSet.size === 0) { + this.unsynced.delete(id); + } + + this.schedulePersist(id, peerId, true); + + const isSynced = !this.unsynced.has(id); + this.notifyCoValueListeners(id, isSynced); + this.notifyGlobalListeners(this.isAllSynced()); + } + + /** + * Remove all tracking for a CoValue (all peers). + * Triggers persistence if storage is available. + */ + removeAll(id: RawCoID): void { + const peerSet = this.unsynced.get(id); + if (!peerSet) { + return; + } + + // Remove all peers for this CoValue + const peersToRemove = Array.from(peerSet); + for (const peerId of peersToRemove) { + this.remove(id, peerId); + } + } + + forcePersist(): Promise | undefined { + return this.flush(); + } + + private schedulePersist(id: RawCoID, peerId: PeerID, synced: boolean): void { + const storage = this.storage; + if (!storage) { + return; + } + + this.pendingUpdates.push({ id, peerId, synced }); + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flush(); + }, BATCH_DELAY_MS); + } + } + + /** + * Flush all pending persistence updates in a batch + */ + private flush(): Promise | undefined { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = undefined; + } + + if (this.pendingUpdates.length === 0) { + return; + } + + const storage = this.storage; + if (!storage) { + return; + } + + const filteredUpdates = this.simplifyPendingUpdates(this.pendingUpdates); + this.pendingUpdates = []; + + return new Promise((resolve) => { + try { + storage.trackCoValuesSyncState(filteredUpdates, () => resolve()); + } catch (error) { + logger.warn("Failed to persist batched unsynced CoValue tracking", { + err: error, + }); + resolve(); + } + }); + } + + /** + * Get all CoValue IDs that have at least one unsynced peer. + */ + getAll(): RawCoID[] { + return Array.from(this.unsynced.keys()); + } + + /** + * Check if all CoValues are synced + */ + isAllSynced(): boolean { + return this.unsynced.size === 0; + } + + /** + * Check if a specific CoValue is tracked as unsynced. + */ + has(id: RawCoID): boolean { + return this.unsynced.has(id); + } + + /** + * Subscribe to changes in whether a specific CoValue is synced. + * The listener is called immediately with the current state. + * @returns Unsubscribe function + */ + subscribe(id: RawCoID, listener: (synced: boolean) => void): () => void; + /** + * Subscribe to changes in whether all CoValues are synced. + * The listener is called immediately with the current state. + * @returns Unsubscribe function + */ + subscribe(listener: (synced: boolean) => void): () => void; + subscribe( + idOrListener: RawCoID | ((synced: boolean) => void), + listener?: (synced: boolean) => void, + ): () => void { + if (typeof idOrListener === "string" && listener) { + const id = idOrListener; + if (!this.coValueListeners.has(id)) { + this.coValueListeners.set(id, new Set()); + } + this.coValueListeners.get(id)!.add(listener); + + // Call immediately with current state + const isSynced = !this.unsynced.has(id); + listener(isSynced); + + return () => { + const listeners = this.coValueListeners.get(id); + if (listeners) { + listeners.delete(listener); + if (listeners.size === 0) { + this.coValueListeners.delete(id); + } + } + }; + } + + const globalListener = idOrListener as (synced: boolean) => void; + this.globalListeners.add(globalListener); + + // Call immediately with current state + globalListener(this.isAllSynced()); + + return () => { + this.globalListeners.delete(globalListener); + }; + } + + setStorage(storage: StorageAPI) { + this.storage = storage; + } + + removeStorage() { + this.storage = undefined; + } + + /** + * Notify all listeners for a specific CoValue about sync status change. + */ + private notifyCoValueListeners(id: RawCoID, synced: boolean): void { + const listeners = this.coValueListeners.get(id); + if (listeners) { + for (const listener of listeners) { + listener(synced); + } + } + } + + /** + * Notify all global listeners about "all synced" status change. + */ + private notifyGlobalListeners(allSynced: boolean): void { + for (const listener of this.globalListeners) { + listener(allSynced); + } + } + + /** + * Keep only the last update for each (id, peerId) combination + */ + private simplifyPendingUpdates(updates: PendingUpdate[]): PendingUpdate[] { + const latestUpdates = new Map(); + for (const update of updates) { + latestUpdates.set(`${update.id}|${update.peerId}`, update); + } + return Array.from(latestUpdates.values()); + } +} diff --git a/packages/cojson/src/exports.ts b/packages/cojson/src/exports.ts index 99e6a702e4..006022e0b7 100644 --- a/packages/cojson/src/exports.ts +++ b/packages/cojson/src/exports.ts @@ -65,12 +65,13 @@ import type { AgentID, RawCoID, SessionID } from "./ids.js"; import type { JsonObject, JsonValue } from "./jsonValue.js"; import type * as Media from "./media.js"; import { isAccountRole } from "./permissions.js"; -import type { Peer, SyncMessage } from "./sync.js"; +import type { Peer, SyncMessage, SyncWhen } from "./sync.js"; import { DisconnectedError, SyncManager, hwrServerPeerSelector, } from "./sync.js"; +import { setSyncStateTrackingBatchDelay } from "./UnsyncedCoValuesTracker.js"; import { emptyKnownState } from "./knownState.js"; import { @@ -125,6 +126,7 @@ export const cojsonInternals = { setCoValueLoadingRetryDelay, setCoValueLoadingMaxRetries, setCoValueLoadingTimeout, + setSyncStateTrackingBatchDelay, ConnectedPeerChannel, textEncoder, textDecoder, @@ -193,6 +195,7 @@ export type { AccountRole, AvailableCoValueCore, PeerState, + SyncWhen, CoValueHeader, }; diff --git a/packages/cojson/src/localNode.ts b/packages/cojson/src/localNode.ts index 18b178c6af..f12d8286d1 100644 --- a/packages/cojson/src/localNode.ts +++ b/packages/cojson/src/localNode.ts @@ -34,7 +34,7 @@ import { AgentSecret, CryptoProvider } from "./crypto/crypto.js"; import { AgentID, RawCoID, SessionID, isAgentID, isRawCoID } from "./ids.js"; import { logger } from "./logger.js"; import { StorageAPI } from "./storage/index.js"; -import { Peer, PeerID, SyncManager } from "./sync.js"; +import { Peer, PeerID, SyncManager, type SyncWhen } from "./sync.js"; import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js"; import { expectGroup } from "./typeUtils/expectGroup.js"; import { canBeBranched } from "./coValueCore/branching.js"; @@ -75,6 +75,7 @@ export class LocalNode { agentSecret: AgentSecret, currentSessionID: SessionID, crypto: CryptoProvider, + public readonly syncWhen?: SyncWhen, ) { this.agentSecret = agentSecret; this.currentSessionID = currentSessionID; @@ -94,11 +95,13 @@ export class LocalNode { setStorage(storage: StorageAPI) { this.storage = storage; + this.syncManager.setStorage(storage); } removeStorage() { this.storage?.close(); this.storage = undefined; + this.syncManager.removeStorage(); } hasCoValue(id: RawCoID) { @@ -178,12 +181,14 @@ export class LocalNode { crypto: CryptoProvider; initialAgentSecret?: AgentSecret; peers?: Peer[]; + syncWhen?: SyncWhen; storage?: StorageAPI; }): RawAccount { const { crypto, initialAgentSecret = crypto.newRandomAgentSecret(), peers = [], + syncWhen, } = opts; const accountHeader = accountHeaderForInitialAgentSecret( initialAgentSecret, @@ -195,6 +200,7 @@ export class LocalNode { initialAgentSecret, crypto.newRandomSessionID(accountID as RawAccountID), crypto, + syncWhen, ); if (opts.storage) { @@ -236,6 +242,7 @@ export class LocalNode { static async withNewlyCreatedAccount({ creationProps, peers, + syncWhen, migration, crypto, initialAgentSecret = crypto.newRandomAgentSecret(), @@ -243,6 +250,7 @@ export class LocalNode { }: { creationProps: { name: string }; peers?: Peer[]; + syncWhen?: SyncWhen; migration?: RawAccountMigration; crypto: CryptoProvider; initialAgentSecret?: AgentSecret; @@ -257,6 +265,7 @@ export class LocalNode { crypto, initialAgentSecret, peers, + syncWhen, storage, }); const node = account.core.node; @@ -299,6 +308,7 @@ export class LocalNode { accountSecret, sessionID, peers, + syncWhen, crypto, migration, storage, @@ -307,6 +317,7 @@ export class LocalNode { accountSecret: AgentSecret; sessionID: SessionID | undefined; peers: Peer[]; + syncWhen?: SyncWhen; crypto: CryptoProvider; migration?: RawAccountMigration; storage?: StorageAPI; @@ -318,6 +329,7 @@ export class LocalNode { accountSecret, sessionID || crypto.newRandomSessionID(accountID), crypto, + syncWhen, ); if (storage) { @@ -817,9 +829,9 @@ export class LocalNode { * * @returns Promise of the current pending store operation, if any. */ - gracefulShutdown(): Promise | undefined { - this.syncManager.gracefulShutdown(); + async gracefulShutdown(): Promise { this.garbageCollector?.stop(); + await this.syncManager.gracefulShutdown(); return this.storage?.close(); } } diff --git a/packages/cojson/src/storage/sqlite/client.ts b/packages/cojson/src/storage/sqlite/client.ts index be7b19810c..8629e59b76 100644 --- a/packages/cojson/src/storage/sqlite/client.ts +++ b/packages/cojson/src/storage/sqlite/client.ts @@ -4,6 +4,7 @@ import type { } from "../../coValueCore/verifiedState.js"; import type { Signature } from "../../crypto/crypto.js"; import type { RawCoID, SessionID } from "../../exports.js"; +import type { PeerID } from "../../sync.js"; import { logger } from "../../logger.js"; import type { DBClientInterfaceSync, @@ -193,4 +194,34 @@ export class SQLiteClient this.db.transaction(() => operationsCallback(this)); return undefined; } + + getUnsyncedCoValueIDs(): RawCoID[] { + const rows = this.db.query<{ co_value_id: RawCoID }>( + "SELECT DISTINCT co_value_id FROM unsynced_covalues", + [], + ) as { co_value_id: RawCoID }[]; + return rows.map((row) => row.co_value_id); + } + + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + ): void { + for (const update of updates) { + if (update.synced) { + this.db.run( + "DELETE FROM unsynced_covalues WHERE co_value_id = ? AND peer_id = ?", + [update.id, update.peerId], + ); + } else { + this.db.run( + "INSERT OR REPLACE INTO unsynced_covalues (co_value_id, peer_id) VALUES (?, ?)", + [update.id, update.peerId], + ); + } + } + } + + stopTrackingSyncState(id: RawCoID): void { + this.db.run("DELETE FROM unsynced_covalues WHERE co_value_id = ?", [id]); + } } diff --git a/packages/cojson/src/storage/sqlite/sqliteMigrations.ts b/packages/cojson/src/storage/sqlite/sqliteMigrations.ts index 94681ffc1b..cca21eb3d7 100644 --- a/packages/cojson/src/storage/sqlite/sqliteMigrations.ts +++ b/packages/cojson/src/storage/sqlite/sqliteMigrations.ts @@ -31,6 +31,15 @@ export const migrations: Record = { ) WITHOUT ROWID;`, "ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;", ], + 4: [ + `CREATE TABLE IF NOT EXISTS unsynced_covalues ( + rowID INTEGER PRIMARY KEY, + co_value_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + UNIQUE (co_value_id, peer_id) + );`, + "CREATE INDEX IF NOT EXISTS idx_unsynced_covalues_co_value_id ON unsynced_covalues(co_value_id);", + ], }; type Migration = { diff --git a/packages/cojson/src/storage/sqliteAsync/client.ts b/packages/cojson/src/storage/sqliteAsync/client.ts index 46b2fa0fbf..f07f181df6 100644 --- a/packages/cojson/src/storage/sqliteAsync/client.ts +++ b/packages/cojson/src/storage/sqliteAsync/client.ts @@ -15,6 +15,7 @@ import type { TransactionRow, } from "../types.js"; import type { SQLiteDatabaseDriverAsync } from "./types.js"; +import type { PeerID } from "../../sync.js"; export type RawCoValueRow = { id: RawCoID; @@ -201,4 +202,38 @@ export class SQLiteClientAsync ) { return this.db.transaction(() => operationsCallback(this)); } + + async getUnsyncedCoValueIDs(): Promise { + const rows = await this.db.query<{ co_value_id: RawCoID }>( + "SELECT DISTINCT co_value_id FROM unsynced_covalues", + [], + ); + return rows.map((row) => row.co_value_id); + } + + async trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + ): Promise { + await Promise.all( + updates.map(async (update) => { + if (update.synced) { + await this.db.run( + "DELETE FROM unsynced_covalues WHERE co_value_id = ? AND peer_id = ?", + [update.id, update.peerId], + ); + } else { + await this.db.run( + "INSERT OR REPLACE INTO unsynced_covalues (co_value_id, peer_id) VALUES (?, ?)", + [update.id, update.peerId], + ); + } + }), + ); + } + + async stopTrackingSyncState(id: RawCoID): Promise { + await this.db.run("DELETE FROM unsynced_covalues WHERE co_value_id = ?", [ + id, + ]); + } } diff --git a/packages/cojson/src/storage/storageAsync.ts b/packages/cojson/src/storage/storageAsync.ts index 97cf625645..c5bdf59345 100644 --- a/packages/cojson/src/storage/storageAsync.ts +++ b/packages/cojson/src/storage/storageAsync.ts @@ -10,7 +10,7 @@ import { logger, } from "../exports.js"; import { StoreQueue } from "../queue/StoreQueue.js"; -import { NewContentMessage } from "../sync.js"; +import { NewContentMessage, type PeerID } from "../sync.js"; import { CoValueKnownState, emptyKnownState, @@ -392,6 +392,23 @@ export class StorageApiAsync implements StorageAPI { return this.knownStates.waitForSync(id, coValue); } + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + done?: () => void, + ): void { + this.dbClient.trackCoValuesSyncState(updates).then(() => done?.()); + } + + getUnsyncedCoValueIDs( + callback: (unsyncedCoValueIDs: RawCoID[]) => void, + ): void { + this.dbClient.getUnsyncedCoValueIDs().then(callback); + } + + stopTrackingSyncState(id: RawCoID): void { + this.dbClient.stopTrackingSyncState(id); + } + close() { return this.storeQueue.close(); } diff --git a/packages/cojson/src/storage/storageSync.ts b/packages/cojson/src/storage/storageSync.ts index 704d33d9a9..cd51a452a2 100644 --- a/packages/cojson/src/storage/storageSync.ts +++ b/packages/cojson/src/storage/storageSync.ts @@ -10,7 +10,7 @@ import { type StorageAPI, logger, } from "../exports.js"; -import { NewContentMessage } from "../sync.js"; +import { NewContentMessage, type PeerID } from "../sync.js"; import { StorageKnownState } from "./knownState.js"; import { CoValueKnownState, @@ -365,6 +365,25 @@ export class StorageApiSync implements StorageAPI { return this.knownStates.waitForSync(id, coValue); } + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + done?: () => void, + ): void { + this.dbClient.trackCoValuesSyncState(updates); + done?.(); + } + + getUnsyncedCoValueIDs( + callback: (unsyncedCoValueIDs: RawCoID[]) => void, + ): void { + const ids = this.dbClient.getUnsyncedCoValueIDs(); + callback(ids); + } + + stopTrackingSyncState(id: RawCoID): void { + this.dbClient.stopTrackingSyncState(id); + } + close() { return undefined; } diff --git a/packages/cojson/src/storage/types.ts b/packages/cojson/src/storage/types.ts index 397fb42b47..615fd17e71 100644 --- a/packages/cojson/src/storage/types.ts +++ b/packages/cojson/src/storage/types.ts @@ -5,6 +5,7 @@ import type { import { Signature } from "../crypto/crypto.js"; import type { CoValueCore, RawCoID, SessionID } from "../exports.js"; import { NewContentMessage } from "../sync.js"; +import type { PeerID } from "../sync.js"; import { CoValueKnownState } from "../knownState.js"; export type CorrectionCallback = ( @@ -29,6 +30,28 @@ export interface StorageAPI { waitForSync(id: string, coValue: CoValueCore): Promise; + /** + * Track multiple sync status updates. + * Does not guarantee the updates will be applied in order, so only one + * update per CoValue ID + Peer ID combination should be tracked at a time. + */ + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + done?: () => void, + ): void; + + /** + * Get all CoValue IDs that have at least one unsynced peer. + */ + getUnsyncedCoValueIDs( + callback: (unsyncedCoValueIDs: RawCoID[]) => void, + ): void; + + /** + * Stop tracking sync status for a CoValue (remove all peer entries). + */ + stopTrackingSyncState(id: RawCoID): void; + close(): Promise | undefined; } @@ -118,6 +141,14 @@ export interface DBClientInterfaceAsync { transaction( callback: (tx: DBTransactionInterfaceAsync) => Promise, ): Promise; + + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + ): Promise; + + getUnsyncedCoValueIDs(): Promise; + + stopTrackingSyncState(id: RawCoID): Promise; } export interface DBTransactionInterfaceSync { @@ -170,4 +201,12 @@ export interface DBClientInterfaceSync { ): Pick[]; transaction(callback: (tx: DBTransactionInterfaceSync) => unknown): unknown; + + trackCoValuesSyncState( + updates: { id: RawCoID; peerId: PeerID; synced: boolean }[], + ): void; + + getUnsyncedCoValueIDs(): RawCoID[]; + + stopTrackingSyncState(id: RawCoID): void; } diff --git a/packages/cojson/src/sync.ts b/packages/cojson/src/sync.ts index 01d317ea1b..de3a50083c 100644 --- a/packages/cojson/src/sync.ts +++ b/packages/cojson/src/sync.ts @@ -2,6 +2,7 @@ import { md5 } from "@noble/hashes/legacy"; import { Histogram, ValueType, metrics } from "@opentelemetry/api"; import { PeerState } from "./PeerState.js"; import { SyncStateManager } from "./SyncStateManager.js"; +import { UnsyncedCoValuesTracker } from "./UnsyncedCoValuesTracker.js"; import { getContenDebugInfo, getNewTransactionsFromContentMessage, @@ -23,6 +24,7 @@ import { knownStateFrom, KnownStateSessions, } from "./knownState.js"; +import { StorageAPI } from "./storage/index.js"; export type SyncMessage = | LoadMessage @@ -63,6 +65,15 @@ export type DoneMessage = { id: RawCoID; }; +/** + * Determines when network sync is enabled. + * - "always": sync is enabled for both Anonymous Authentication and Authenticated Account + * - "signedUp": sync is enabled when the user is authenticated + * - "never": sync is disabled, content stays local + * Can be dynamically modified to control sync behavior at runtime. + */ +export type SyncWhen = "always" | "signedUp" | "never"; + export type PeerID = string; export type DisconnectedError = "Disconnected"; @@ -121,6 +132,7 @@ export class SyncManager { constructor(local: LocalNode) { this.local = local; this.syncState = new SyncStateManager(this); + this.unsyncedTracker = new UnsyncedCoValuesTracker(); this.transactionsSizeHistogram = metrics .getMeter("cojson") @@ -132,6 +144,7 @@ export class SyncManager { } syncState: SyncStateManager; + unsyncedTracker: UnsyncedCoValuesTracker; disableTransactionVerification() { this.skipVerify = true; @@ -154,6 +167,10 @@ export class SyncManager { : serverPeers; } + getPersistentServerPeers(id: RawCoID): PeerState[] { + return this.getServerPeers(id).filter((peer) => peer.persistent); + } + handleSyncMessage(msg: SyncMessage, peer: PeerState) { if (!isRawCoID(msg.id)) { const errorType = msg.id ? "invalid" : "undefined"; @@ -259,7 +276,88 @@ export class SyncManager { } } + async resumeUnsyncedCoValues(): Promise { + if (!this.local.storage) { + // No storage available, skip resumption + return; + } + + await new Promise((resolve, reject) => { + // Load all persisted unsynced CoValues from storage + this.local.storage?.getUnsyncedCoValueIDs((unsyncedCoValueIDs) => { + const coValuesToLoad = unsyncedCoValueIDs.filter( + (coValueId) => !this.local.hasCoValue(coValueId), + ); + if (coValuesToLoad.length === 0) { + resolve(); + return; + } + + const BATCH_SIZE = 10; + let processed = 0; + + const processBatch = async () => { + const batch = coValuesToLoad.slice(processed, processed + BATCH_SIZE); + + await Promise.all( + batch.map( + async (coValueId) => + new Promise((resolve) => { + try { + // Clear previous tracking (as it may include outdated peers) + this.local.storage?.stopTrackingSyncState(coValueId); + + // Resume tracking sync state for this CoValue + // This will add it back to the tracker and set up subscriptions + this.trackSyncState(coValueId); + + // Load the CoValue from storage (this will trigger sync if peers are connected) + const coValue = this.local.getCoValue(coValueId); + coValue.loadFromStorage((found) => { + if (!found) { + // CoValue could not be loaded from storage, stop tracking + this.unsyncedTracker.removeAll(coValueId); + } + resolve(); + }); + } catch (error) { + // Handle errors gracefully - log but don't fail the entire resumption + logger.warn( + `Failed to resume sync for CoValue ${coValueId}:`, + { + err: error, + coValueId, + }, + ); + this.unsyncedTracker.removeAll(coValueId); + resolve(); + } + }), + ), + ); + + processed += batch.length; + + if (processed < coValuesToLoad.length) { + processBatch().catch(reject); + } else { + resolve(); + } + }; + + processBatch().catch(reject); + }); + }); + } + startPeerReconciliation(peer: PeerState) { + if (peer.role === "server" && peer.persistent) { + // Resume syncing unsynced CoValues asynchronously + this.resumeUnsyncedCoValues().catch((error) => { + logger.warn("Failed to resume unsynced CoValues:", error); + }); + } + const coValuesOrderedByDependency: CoValueCore[] = []; const seen = new Set(); @@ -732,6 +830,9 @@ export class SyncManager { if (from !== "storage" && hasNewContent) { this.storeContent(validNewContent); + if (from === "import") { + this.trackSyncState(coValue.id); + } } for (const peer of this.getPeers(coValue.id)) { @@ -787,6 +888,8 @@ export class SyncManager { this.storeContent(content); + this.trackSyncState(coValue.id); + const contentKnownState = knownStateFromContent(content); for (const peer of this.getPeers(coValue.id)) { @@ -811,6 +914,37 @@ export class SyncManager { } } + private trackSyncState(coValueId: RawCoID): void { + const peers = this.getPersistentServerPeers(coValueId); + + const isSyncRequired = this.local.syncWhen !== "never"; + if (isSyncRequired && peers.length === 0) { + this.unsyncedTracker.add(coValueId); + return; + } + + for (const peer of peers) { + if (this.syncState.isSynced(peer, coValueId)) { + continue; + } + const alreadyTracked = this.unsyncedTracker.add(coValueId, peer.id); + if (alreadyTracked) { + continue; + } + + const unsubscribe = this.syncState.subscribeToPeerUpdates( + peer.id, + coValueId, + (_knownState, syncState) => { + if (syncState.uploaded) { + this.unsyncedTracker.remove(coValueId, peer.id); + unsubscribe(); + } + }, + ); + } + } + private storeContent(content: NewContentMessage) { const storage = this.local.storage; @@ -860,8 +994,9 @@ export class SyncManager { return new Promise((resolve, reject) => { const unsubscribe = this.syncState.subscribeToPeerUpdates( peerId, - (knownState, syncState) => { - if (syncState.uploaded && knownState.id === id) { + id, + (_knownState, syncState) => { + if (syncState.uploaded) { resolve(true); unsubscribe?.(); clearTimeout(timeoutId); @@ -916,10 +1051,23 @@ export class SyncManager { ); } - gracefulShutdown() { + setStorage(storage: StorageAPI) { + this.unsyncedTracker.setStorage(storage); + } + + removeStorage() { + this.unsyncedTracker.removeStorage(); + } + + /** + * Closes all the peer connections and ensures the list of unsynced coValues is persisted to storage. + * @returns Promise of the current pending store operation, if any. + */ + gracefulShutdown(): Promise | undefined { for (const peer of Object.values(this.peers)) { peer.gracefulShutdown(); } + return this.unsyncedTracker.forcePersist(); } } diff --git a/packages/cojson/src/tests/SyncStateManager.test.ts b/packages/cojson/src/tests/SyncStateManager.test.ts index f4cba8bfa3..8a3efa446e 100644 --- a/packages/cojson/src/tests/SyncStateManager.test.ts +++ b/packages/cojson/src/tests/SyncStateManager.test.ts @@ -78,10 +78,12 @@ describe("SyncStateManager", () => { const updateToStorageSpy: PeerSyncStateListenerCallback = vi.fn(); const unsubscribe1 = subscriptionManager.subscribeToPeerUpdates( peerState.id, + map.core.id, updateToJazzCloudSpy, ); const unsubscribe2 = subscriptionManager.subscribeToPeerUpdates( serverPeer.id, + group.core.id, updateToStorageSpy, ); @@ -141,6 +143,7 @@ describe("SyncStateManager", () => { const unsubscribe1 = subscriptionManager.subscribeToUpdates(anyUpdateSpy); const unsubscribe2 = subscriptionManager.subscribeToPeerUpdates( peerState.id, + map.core.id, anyUpdateSpy, ); diff --git a/packages/cojson/src/tests/coValueCore.loadFromStorage.test.ts b/packages/cojson/src/tests/coValueCore.loadFromStorage.test.ts index 4f96b2e3dd..7a099fe9cb 100644 --- a/packages/cojson/src/tests/coValueCore.loadFromStorage.test.ts +++ b/packages/cojson/src/tests/coValueCore.loadFromStorage.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { RawCoID } from "../ids"; +import { PeerID } from "../sync"; import { StorageAPI } from "../storage/types"; import { createTestMetricReader, @@ -36,6 +37,13 @@ function createMockStorage( store?: (data: any, correctionCallback: any) => void; getKnownState?: (id: RawCoID) => any; waitForSync?: (id: string, coValue: any) => Promise; + trackCoValuesSyncState?: ( + operations: Array<{ id: RawCoID; peerId: PeerID; synced: boolean }>, + ) => void; + getUnsyncedCoValueIDs?: ( + callback: (unsyncedCoValueIDs: RawCoID[]) => void, + ) => void; + stopTrackingSyncState?: (id: RawCoID) => void; close?: () => Promise | undefined; } = {}, ): StorageAPI { @@ -44,6 +52,9 @@ function createMockStorage( store: opts.store || vi.fn(), getKnownState: opts.getKnownState || vi.fn(), waitForSync: opts.waitForSync || vi.fn().mockResolvedValue(undefined), + trackCoValuesSyncState: opts.trackCoValuesSyncState || vi.fn(), + getUnsyncedCoValueIDs: opts.getUnsyncedCoValueIDs || vi.fn(), + stopTrackingSyncState: opts.stopTrackingSyncState || vi.fn(), close: opts.close || vi.fn().mockResolvedValue(undefined), }; } diff --git a/packages/cojson/src/tests/sync.tracking.test.ts b/packages/cojson/src/tests/sync.tracking.test.ts new file mode 100644 index 0000000000..62d6978673 --- /dev/null +++ b/packages/cojson/src/tests/sync.tracking.test.ts @@ -0,0 +1,396 @@ +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { setSyncStateTrackingBatchDelay } from "../UnsyncedCoValuesTracker"; +import { + blockMessageTypeOnOutgoingPeer, + SyncMessagesLog, + TEST_NODE_CONFIG, + setupTestNode, + waitFor, +} from "./testUtils"; + +let jazzCloud: ReturnType; + +beforeEach(async () => { + // We want to simulate a real world communication that happens asynchronously + TEST_NODE_CONFIG.withAsyncPeers = true; + + SyncMessagesLog.clear(); + jazzCloud = setupTestNode({ isSyncServer: true }); + + setSyncStateTrackingBatchDelay(0); +}); + +afterEach(() => { + setSyncStateTrackingBatchDelay(1000); +}); + +describe("coValue sync state tracking", () => { + test("coValues with unsynced local changes are tracked as unsynced", async () => { + const { node: client } = setupTestNode({ connected: true }); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for local transaction to trigger sync + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + }); + + test("coValue is marked as synced when all persistent server peers have received the content", async () => { + const { node: client } = setupTestNode({ connected: true }); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for local transaction to trigger sync + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + + const serverPeer = + client.syncManager.peers[jazzCloud.node.currentSessionID]!; + await waitFor(() => + client.syncManager.syncState.isSynced(serverPeer, map.id), + ); + expect(unsyncedTracker.has(map.id)).toBe(false); + }); + + test("coValues are tracked as unsynced even if there are no persistent server peers", async () => { + const { node: client } = setupTestNode({ connected: false }); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + }); + + test("only tracks sync state for persistent servers peers", async () => { + const { node: client, connectToSyncServer } = setupTestNode({ + connected: true, + }); + + // Add a second server peer that is NOT persistent + const server2 = setupTestNode({ isSyncServer: true }); + const { peer: server2PeerOnClient, peerState: server2PeerStateOnClient } = + connectToSyncServer({ + syncServer: server2.node, + syncServerName: "server2", + persistent: false, + }); + + // Do not deliver new content messages to the second server peer + blockMessageTypeOnOutgoingPeer(server2PeerOnClient, "content", {}); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + + const serverPeer = + client.syncManager.peers[jazzCloud.node.currentSessionID]!; + await waitFor(() => + client.syncManager.syncState.isSynced(serverPeer, map.id), + ); + + expect( + client.syncManager.syncState.isSynced(server2PeerStateOnClient, map.id), + ).toBe(false); + expect(unsyncedTracker.has(map.id)).toBe(false); + }); + + test("coValues are not tracked as unsynced if sync is disabled", async () => { + const { node: client } = setupTestNode({ + connected: false, + syncWhen: "never", + }); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(false); + }); + + test("already synced coValues are not tracked as unsynced when trackSyncState is called", async () => { + const { node: client } = setupTestNode({ connected: true }); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + await new Promise((resolve) => queueMicrotask(resolve)); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + + const serverPeer = + client.syncManager.peers[jazzCloud.node.currentSessionID]!; + await waitFor(() => + client.syncManager.syncState.isSynced(serverPeer, map.id), + ); + expect(unsyncedTracker.has(map.id)).toBe(false); + + // @ts-expect-error trackSyncState is private + client.syncManager.trackSyncState(map.id); + expect(unsyncedTracker.has(map.id)).toBe(false); + }); + + test("imported coValue content is tracked as unsynced", async () => { + const { node: client } = setupTestNode({ connected: true }); + const { node: client2 } = setupTestNode({ connected: false }); + + const group = client2.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Export the content from client2 to client + const groupContent = group.core.newContentSince()![0]!; + const mapContent = map.core.newContentSince()![0]!; + client.syncManager.handleNewContent(groupContent, "import"); + client.syncManager.handleNewContent(mapContent, "import"); + + const unsyncedTracker = client.syncManager.unsyncedTracker; + + // The imported coValue should be tracked as unsynced since it hasn't been synced to the server yet + expect(unsyncedTracker.has(group.id)).toBe(true); + expect(unsyncedTracker.has(map.id)).toBe(true); + + // Wait for the map to sync + const serverPeer = + client.syncManager.peers[jazzCloud.node.currentSessionID]!; + await waitFor(() => + client.syncManager.syncState.isSynced(serverPeer, map.id), + ); + expect(unsyncedTracker.has(map.id)).toBe(false); + }); +}); + +describe("sync state persistence", () => { + test("unsynced coValues are asynchronously persisted to storage", async () => { + const { node: client, addStorage } = setupTestNode({ connected: false }); + addStorage(); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + }); + + test("synced coValues are removed from storage", async () => { + const { node: client, addStorage } = setupTestNode({ connected: true }); + addStorage(); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait enough time for the coValue to be synced + await new Promise((resolve) => setTimeout(resolve, 100)); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(0); + expect(client.syncManager.unsyncedTracker.has(map.id)).toBe(false); + }); + + test("unsynced coValues are persisted to storage when the node is shutdown", async () => { + const { node: client, addStorage } = setupTestNode({ connected: false }); + addStorage(); + + const group = client.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for local transaction to trigger sync + await new Promise((resolve) => queueMicrotask(resolve)); + + await client.gracefulShutdown(); + + const unsyncedCoValueIDs = await new Promise((resolve) => + client.storage?.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + }); +}); + +describe("sync resumption", () => { + test("unsynced coValues are resumed when the node is restarted", async () => { + const client = setupTestNode({ connected: false }); + const { storage } = client.addStorage(); + + const getUnsyncedCoValueIDsFromStorage = async () => + new Promise((resolve) => + client.node.storage?.getUnsyncedCoValueIDs(resolve), + ); + + const group = client.node.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + const unsyncedTracker = client.node.syncManager.unsyncedTracker; + expect(unsyncedTracker.has(map.id)).toBe(true); + expect(await getUnsyncedCoValueIDsFromStorage()).toHaveLength(2); + + client.restart(); + client.addStorage({ storage }); + const { peerState: serverPeerState } = client.connectToSyncServer(); + + // Wait for sync to resume & complete + await waitFor( + async () => (await getUnsyncedCoValueIDsFromStorage()).length === 0, + ); + expect( + client.node.syncManager.syncState.isSynced(serverPeerState, map.id), + ).toBe(true); + }); + + test("lots of unsynced coValues are resumed in batches when the node is restarted", async () => { + const client = setupTestNode({ connected: false }); + const { storage } = client.addStorage(); + + const getUnsyncedCoValueIDsFromStorage = async () => + new Promise((resolve) => + client.node.storage?.getUnsyncedCoValueIDs(resolve), + ); + + const group = client.node.createGroup(); + const maps = Array.from({ length: 100 }, () => { + const map = group.createMap(); + map.set("key", "value"); + return map; + }); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + const unsyncedTracker = client.node.syncManager.unsyncedTracker; + for (const map of maps) { + expect(unsyncedTracker.has(map.id)).toBe(true); + } + expect(await getUnsyncedCoValueIDsFromStorage()).toHaveLength(101); + + client.restart(); + client.addStorage({ storage }); + const { peerState: serverPeerState } = client.connectToSyncServer(); + + // Wait for sync to resume & complete + await waitFor( + async () => (await getUnsyncedCoValueIDsFromStorage()).length === 0, + ); + for (const map of maps) { + expect( + client.node.syncManager.syncState.isSynced(serverPeerState, map.id), + ).toBe(true); + } + }); + + test("old peer entries are removed from storage when restarting with new peers", async () => { + const client = setupTestNode(); + const { peer: serverPeer } = client.connectToSyncServer({ + persistent: true, + }); + const { storage } = client.addStorage(); + + // Do not deliver new content messages to the sync server + blockMessageTypeOnOutgoingPeer(serverPeer, "content", {}); + + const getUnsyncedCoValueIDsFromStorage = async () => + new Promise((resolve) => + client.node.storage?.getUnsyncedCoValueIDs(resolve), + ); + + const group = client.node.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(await getUnsyncedCoValueIDsFromStorage()).toHaveLength(2); + + client.restart(); + client.addStorage({ storage }); + const newSyncServer = setupTestNode({ isSyncServer: true }); + const { peerState: newServerPeerState } = client.connectToSyncServer({ + syncServer: newSyncServer.node, + persistent: true, + }); + + // Wait for sync to resume & complete + await waitFor( + async () => (await getUnsyncedCoValueIDsFromStorage()).length === 0, + ); + expect( + client.node.syncManager.syncState.isSynced(newServerPeerState, map.id), + ).toBe(true); + }); + + test("sync resumption is skipped when adding a peer that is not a persistent server", async () => { + const client = setupTestNode({ connected: false }); + const { storage } = client.addStorage(); + + const getUnsyncedCoValueIDsFromStorage = async () => + new Promise((resolve) => + client.node.storage?.getUnsyncedCoValueIDs(resolve), + ); + + const group = client.node.createGroup(); + const map = group.createMap(); + map.set("key", "value"); + + // Wait for the unsynced coValues to be persisted to storage + await new Promise((resolve) => setTimeout(resolve, 100)); + + let unsyncedCoValueIDs = await getUnsyncedCoValueIDsFromStorage(); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + + client.restart(); + client.addStorage({ storage }); + const newPeer = setupTestNode({ isSyncServer: true }); + client.connectToSyncServer({ + syncServer: newPeer.node, + persistent: false, + }); + + // Wait to confirm sync is not resumed + await new Promise((resolve) => setTimeout(resolve, 100)); + + unsyncedCoValueIDs = await getUnsyncedCoValueIDsFromStorage(); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(map.id); + expect(unsyncedCoValueIDs).toContain(group.id); + }); +}); diff --git a/packages/cojson/src/tests/testUtils.ts b/packages/cojson/src/tests/testUtils.ts index 75d9e33366..1a855f6c5d 100644 --- a/packages/cojson/src/tests/testUtils.ts +++ b/packages/cojson/src/tests/testUtils.ts @@ -22,7 +22,7 @@ import { import type { SessionID } from "../ids.js"; import { LocalNode } from "../localNode.js"; import { connectedPeers } from "../streamUtils.js"; -import type { Peer, SyncMessage } from "../sync.js"; +import type { Peer, SyncMessage, SyncWhen } from "../sync.js"; import { expectGroup } from "../typeUtils/expectGroup.js"; import { toSimplifiedMessages } from "./messagesTestUtils.js"; import { createAsyncStorage, createSyncStorage } from "./testStorage.js"; @@ -449,13 +449,14 @@ export function setupTestNode( isSyncServer?: boolean; connected?: boolean; secret?: AgentSecret; + syncWhen?: SyncWhen; } = {}, ) { const [admin, session] = opts.secret ? agentAndSessionIDFromSecret(opts.secret) : randomAgentAndSessionID(); - let node = new LocalNode(admin.agentSecret, session, Crypto); + let node = new LocalNode(admin.agentSecret, session, Crypto, opts.syncWhen); if (opts.isSyncServer) { syncServer.current = node; @@ -527,7 +528,12 @@ export function setupTestNode( addAsyncStorage, restart: () => { node.gracefulShutdown(); - ctx.node = node = new LocalNode(admin.agentSecret, session, Crypto); + ctx.node = node = new LocalNode( + admin.agentSecret, + session, + Crypto, + opts.syncWhen, + ); if (opts.isSyncServer) { syncServer.current = node; diff --git a/packages/jazz-tools/src/browser/createBrowserContext.ts b/packages/jazz-tools/src/browser/createBrowserContext.ts index 59cec8f5f8..22f28c48c3 100644 --- a/packages/jazz-tools/src/browser/createBrowserContext.ts +++ b/packages/jazz-tools/src/browser/createBrowserContext.ts @@ -64,6 +64,7 @@ async function setupPeers(options: BaseBrowserContextOptions) { connected: () => false, toggleNetwork: () => {}, peers, + syncWhen: options.sync.when, storage, setNode: () => {}, crypto, @@ -114,6 +115,7 @@ async function setupPeers(options: BaseBrowserContextOptions) { return wsPeer.connected; }, peers, + syncWhen: options.sync.when, storage, setNode, crypto, @@ -126,6 +128,7 @@ export async function createJazzBrowserGuestContext( const { toggleNetwork, peers, + syncWhen, setNode, crypto, storage, @@ -136,6 +139,7 @@ export async function createJazzBrowserGuestContext( const context = await createAnonymousJazzContext({ crypto, peers, + syncWhen, storage, }); @@ -178,6 +182,7 @@ export async function createJazzBrowserContext< const { toggleNetwork, peers, + syncWhen, setNode, crypto, storage, @@ -207,6 +212,7 @@ export async function createJazzBrowserContext< credentials: options.credentials, newAccountProps: options.newAccountProps, peers, + syncWhen, storage, crypto, defaultProfileName: options.defaultProfileName, diff --git a/packages/jazz-tools/src/react-native-core/platform.ts b/packages/jazz-tools/src/react-native-core/platform.ts index 547a5be428..17b1c84cde 100644 --- a/packages/jazz-tools/src/react-native-core/platform.ts +++ b/packages/jazz-tools/src/react-native-core/platform.ts @@ -57,6 +57,7 @@ async function setupPeers(options: BaseReactNativeContextOptions) { addConnectionListener: () => () => {}, connected: () => false, peers, + syncWhen: options.sync.when, setNode: () => {}, crypto, storage, @@ -105,6 +106,7 @@ async function setupPeers(options: BaseReactNativeContextOptions) { }, connected: () => wsPeer.connected, peers, + syncWhen: options.sync.when, setNode, crypto, storage, @@ -117,6 +119,7 @@ export async function createJazzReactNativeGuestContext( const { toggleNetwork, peers, + syncWhen, setNode, crypto, storage, @@ -127,6 +130,7 @@ export async function createJazzReactNativeGuestContext( const context = createAnonymousJazzContext({ crypto, peers, + syncWhen, storage, }); @@ -169,6 +173,7 @@ export async function createJazzReactNativeContext< const { toggleNetwork, peers, + syncWhen, setNode, crypto, storage, @@ -203,6 +208,7 @@ export async function createJazzReactNativeContext< credentials: options.credentials, newAccountProps: options.newAccountProps, peers, + syncWhen, crypto, defaultProfileName: options.defaultProfileName, AccountSchema: options.AccountSchema, diff --git a/packages/jazz-tools/src/tools/implementation/createContext.ts b/packages/jazz-tools/src/tools/implementation/createContext.ts index b4eb10a5e3..0749709f25 100644 --- a/packages/jazz-tools/src/tools/implementation/createContext.ts +++ b/packages/jazz-tools/src/tools/implementation/createContext.ts @@ -18,6 +18,7 @@ import { type ID, type InstanceOfSchema, coValueClassFromCoValueClassOrSchema, + type SyncWhen, } from "../internal.js"; import { AuthCredentials, NewAccountProps } from "../types.js"; import { activeAccountContext } from "./activeAccountContext.js"; @@ -110,6 +111,7 @@ export async function createJazzContextFromExistingCredentials< >({ credentials, peers, + syncWhen, crypto, storage, AccountSchema: PropsAccountSchema, @@ -119,6 +121,7 @@ export async function createJazzContextFromExistingCredentials< }: { credentials: Credentials; peers: Peer[]; + syncWhen?: SyncWhen; crypto: CryptoProvider; AccountSchema?: S; sessionProvider: SessionProvider; @@ -140,9 +143,10 @@ export async function createJazzContextFromExistingCredentials< const node = await LocalNode.withLoadedAccount({ accountID: credentials.accountID as unknown as CoID, accountSecret: credentials.secret, - sessionID: sessionID, - peers: peers, - crypto: crypto, + sessionID, + peers, + syncWhen, + crypto, storage, migration: async (rawAccount, _node, creationProps) => { const account = AccountClass.fromRaw(rawAccount) as InstanceOfSchema; @@ -167,7 +171,7 @@ export async function createJazzContextFromExistingCredentials< sessionDone(); }, logOut: async () => { - node.gracefulShutdown(); + await node.gracefulShutdown(); sessionDone(); await onLogOut?.(); }, @@ -182,6 +186,7 @@ export async function createJazzContextForNewAccount< creationProps, initialAgentSecret, peers, + syncWhen, crypto, AccountSchema: PropsAccountSchema, onLogOut, @@ -191,6 +196,7 @@ export async function createJazzContextForNewAccount< creationProps: { name: string }; initialAgentSecret?: AgentSecret; peers: Peer[]; + syncWhen?: SyncWhen; crypto: CryptoProvider; AccountSchema?: S; onLogOut?: () => Promise; @@ -206,6 +212,7 @@ export async function createJazzContextForNewAccount< const { node } = await LocalNode.withNewlyCreatedAccount({ creationProps, peers, + syncWhen, crypto, initialAgentSecret, storage, @@ -233,7 +240,7 @@ export async function createJazzContextForNewAccount< sessionDone(); }, logOut: async () => { - node.gracefulShutdown(); + await node.gracefulShutdown(); await onLogOut?.(); }, }; @@ -247,6 +254,7 @@ export async function createJazzContext< credentials?: AuthCredentials; newAccountProps?: NewAccountProps; peers: Peer[]; + syncWhen?: SyncWhen; crypto: CryptoProvider; defaultProfileName?: string; AccountSchema?: S; @@ -271,6 +279,7 @@ export async function createJazzContext< secret: credentials.accountSecret, }, peers: options.peers, + syncWhen: options.syncWhen, crypto, AccountSchema: options.AccountSchema, sessionProvider: options.sessionProvider, @@ -295,6 +304,7 @@ export async function createJazzContext< creationProps, initialAgentSecret, peers: options.peers, + syncWhen: options.syncWhen, crypto, AccountSchema: options.AccountSchema, sessionProvider: options.sessionProvider, @@ -322,10 +332,12 @@ export async function createJazzContext< export function createAnonymousJazzContext({ peers, + syncWhen, crypto, storage, }: { peers: Peer[]; + syncWhen?: SyncWhen; crypto: CryptoProvider; storage?: StorageAPI; }): JazzContextWithAgent { @@ -335,6 +347,7 @@ export function createAnonymousJazzContext({ agentSecret, crypto.newRandomSessionID(crypto.getAgentID(agentSecret)), crypto, + syncWhen, ); for (const peer of peers) { diff --git a/packages/jazz-tools/src/tools/tests/createContext.test.ts b/packages/jazz-tools/src/tools/tests/createContext.test.ts index 2c5e4a4852..85830e50dd 100644 --- a/packages/jazz-tools/src/tools/tests/createContext.test.ts +++ b/packages/jazz-tools/src/tools/tests/createContext.test.ts @@ -24,6 +24,7 @@ import { setupJazzTestSync, } from "../testing"; import { assertLoaded, loadCoValueOrFail } from "./utils"; +import { createAsyncStorage } from "./testStorage"; const Crypto = await WasmCrypto.create(); let randomSessionProvider = new MockSessionProvider(); @@ -162,10 +163,42 @@ describe("createContext methods", () => { asActiveAccount: true, }); - context.logOut(); + await context.logOut(); expect(onLogOut).toHaveBeenCalled(); }); + test("waits for all storage operations to complete before logging out", async () => { + const account = await createJazzTestAccount({ + isCurrentActiveAccount: true, + }); + + const context = await createJazzContextFromExistingCredentials({ + credentials: { + accountID: account.$jazz.id, + secret: account.$jazz.localNode.getCurrentAgent().agentSecret, + }, + peers: [getPeerConnectedToTestSyncServer()], + crypto: Crypto, + sessionProvider: randomSessionProvider, + storage: await createAsyncStorage({}), + asActiveAccount: true, + }); + + // Create a coValue (and a group as its owner) + const coValue = co.plainText().create("test"); + + // Wait for local transaction to trigger sync + await new Promise((resolve) => queueMicrotask(resolve)); + + await context.logOut(); + + const unsyncedCoValueIDs = await new Promise((resolve) => + context.node.storage!.getUnsyncedCoValueIDs(resolve), + ); + expect(unsyncedCoValueIDs).toHaveLength(2); + expect(unsyncedCoValueIDs).toContain(coValue.$jazz.id); + }); + test("connects to provided peers", async () => { const account = await createJazzTestAccount({ isCurrentActiveAccount: true, diff --git a/packages/jazz-tools/src/tools/types.ts b/packages/jazz-tools/src/tools/types.ts index acb68fe16f..2bf37698ea 100644 --- a/packages/jazz-tools/src/tools/types.ts +++ b/packages/jazz-tools/src/tools/types.ts @@ -1,6 +1,8 @@ import type { AgentSecret, LocalNode } from "cojson"; import type { Account, AnonymousJazzAgent, ID } from "./internal.js"; +export type { SyncWhen } from "cojson"; + export type AuthCredentials = { accountID: ID; secretSeed?: Uint8Array; diff --git a/tests/stress-test/src/2_main.tsx b/tests/stress-test/src/2_main.tsx index c71db2ecc4..485fe49f93 100644 --- a/tests/stress-test/src/2_main.tsx +++ b/tests/stress-test/src/2_main.tsx @@ -170,6 +170,14 @@ function HomeScreen() { e.currentTarget.style.borderColor = "#e1e5e9"; }} > + { + e.stopPropagation(); + me.root.projects.$jazz.remove( + (p) => p.$jazz.id === project.$jazz.id, + ); + }} + />
); } + +function DeleteButton({ + onClick, +}: { + onClick: (e: React.MouseEvent) => void; +}) { + return ( + + ); +}