-
Notifications
You must be signed in to change notification settings - Fork 103
feat: resume interrupted CoValues sync on restart #3320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
98cb206
docs: spec for CoValue sync status tracking & sync resumption
nrainhart 9f255fd
docs: address comments from Guido
nrainhart c19ea7f
docs: update tasks
nrainhart 043d024
feat: add sync state tracking methods to StorageAPI
nrainhart f8a19ca
feat: add `UnsyncedCoValuesTracker`
nrainhart 12beaee
feat: track sync state in SyncManager
nrainhart 3ee8b54
docs: rename "sync status" to "sync state"
nrainhart 9bcd6b8
feat: resume sync after app restart
nrainhart e23b587
perf: batch DB writes when tracking sync state
nrainhart 8551e00
perf: add coValueId to SyncStateManager.syncState.subscribeToPeerUpdates
nrainhart 6e3ac57
perf: simplify sync state batch to avoid unnecessary DB writes
nrainhart 6b747ee
docs: update spec with batched sync state persistence
nrainhart a2ff3f1
perf: avoid duplicate sync state subscriptions
nrainhart e5997a1
chore: add delete button to stress test app projects
nrainhart 05936ea
feat: track unsynced CoValues when there are no peers
nrainhart f8b9858
perf: only load unsynced CoValues that aren't already in memory
nrainhart ceda0ed
perf: promisify IDB transactions & run batched sync state tracking in…
nrainhart 5ee9d8c
test: CoValue sync state tracking
nrainhart f9fac60
test: sync state persistence
nrainhart 854038b
test: resume sync on restart
nrainhart a430c83
feat: avoid tracking sync state for coValues modified by other peers
nrainhart aa3a96a
fix: permanently unsynced coValues if coValue is loaded before tracking
nrainhart 46d0993
test: support debugging browser mode tests
nrainhart 60c39d6
test: IndexedDB storage
nrainhart b260724
test: batched sync resumption
nrainhart 83f84ca
chore: add changeset
nrainhart 33d0c0f
fix: remove unused peer id index
nrainhart b534b13
fix: remove unnecessary subscription when there are no peers
nrainhart 4a94050
feat: resume sync only when adding persistent server peers
nrainhart f386707
refactor: storage as field instead of getter
nrainhart 9dfd57d
fix: only load CoValues from storage when resuming sync
nrainhart 92ba14d
perf(idb): avoid locking all tables when only updating unsyncedCoValu…
nrainhart b93c151
fix: add synchronous sync check before tracking sync state
nrainhart ae63881
feat: track sync state for imported coValue content
nrainhart d0cafc4
feat: reduce delay for persisting sync state to 200ms
nrainhart 0572c0f
feat: persist sync state when shutting down LocalNode
nrainhart 7565421
feat: wait for graceful shutdown on logout
nrainhart bde3fee
fix: remove in-memory tracking of resumed CoValue sync if not found i…
nrainhart 01cbaeb
Update changeset
nrainhart 633321e
fix: always close storage on node shutdown
nrainhart File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| --- | ||
| "cojson-core-napi-linux-arm-gnueabihf": patch | ||
| "cojson-core-napi-linux-arm64-musl": patch | ||
| "cojson-core-napi-linux-arm64-gnu": patch | ||
| "cojson-core-napi-linux-x64-musl": patch | ||
| "cojson-core-napi-linux-x64-gnu": patch | ||
| "cojson-core-napi-darwin-arm64": patch | ||
| "cojson-core-napi-darwin-x64": patch | ||
| "cojson-storage-do-sqlite": patch | ||
| "cojson-storage-indexeddb": patch | ||
| "cojson-storage-sqlite": patch | ||
| "cojson-transport-ws": patch | ||
| "svelte-passkey-auth": patch | ||
| "community-jazz-vue": patch | ||
| "jazz-react-passkey-auth-starter": patch | ||
| "cojson-core-napi": patch | ||
| "cojson-core-wasm": patch | ||
| "durable-object": patch | ||
| "jazz-sveltekit": patch | ||
| "cojson-core-rn": patch | ||
| "jazz-webhook": patch | ||
| "chat-svelte": patch | ||
| "jazz-tools": patch | ||
| "jazz-run": patch | ||
| "cojson": patch | ||
| --- | ||
|
|
||
| Resume interrupted CoValue sync on app restart (without requiring CoValues to be manually reloaded) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,338 @@ | ||
| # Design: Track Unsynced CoValues & Resume Sync | ||
|
|
||
| ## Overview | ||
|
|
||
| This design implements automatic tracking of CoValues with unsynced changes and provides mechanisms to resume syncing them on app restart. The solution integrates with the existing sync infrastructure and provides reactive APIs for monitoring sync state. | ||
|
|
||
| The core idea is to maintain a set of CoValue IDs that have pending changes not yet fully synced to all persistent server peers. This set is persisted across app restarts and used to: | ||
| 1. Automatically resume syncing on startup | ||
| 2. Provide efficient sync state queries | ||
| 3. Enable reactive sync state subscriptions | ||
|
|
||
| ## Architecture / Components | ||
|
|
||
| ### 1. Extended StorageAPI Interface | ||
|
|
||
| Extend the existing `StorageAPI` interface to include methods for tracking unsynced CoValues. | ||
|
|
||
| **Location:** `packages/cojson/src/storage/types.ts` | ||
|
|
||
| **New Methods:** | ||
| ```typescript | ||
| export interface StorageAPI { | ||
| // ... existing methods ... | ||
|
|
||
| trackCoValuesSyncState(updates: { id: RawCoID; peerId: PeerID; synced: boolean }[]): void; | ||
| getUnsyncedCoValueIDs(callback: (data: RawCoID[]) => void); | ||
| stopTrackingSyncState(id: RawCoID): void; | ||
| } | ||
| ``` | ||
|
|
||
| **Implementation Strategy:** | ||
| - Storage implementations will add these methods to persist the set of unsynced CoValue IDs | ||
| - For IndexedDB: Add a new object store `"unsyncedCoValues"` | ||
| - For SQLite: Add a table `unsynced_covalues` | ||
|
|
||
| ### 2. UnsyncedCoValuesTracker | ||
|
|
||
| A new class that manages the set of unsynced CoValue IDs. | ||
|
|
||
| **Location:** `packages/cojson/src/sync/UnsyncedCoValuesTracker.ts` | ||
|
|
||
| **Responsibilities:** | ||
| - Maintain an in-memory Set of unsynced CoValue IDs (including peers pending sync) | ||
| - Persist the set to storage using `StorageAPI.trackCoValuesSyncState` if available. Persistence is batched and performed asynchronously, to avoid the cost of N extra storage writes per update (where N is the number of peers). | ||
| - Load persisted unsynced CoValues on initialization using `StorageAPI.getUnsyncedCoValueIDs` + `LocalNode.loadCoValueCore` if storage is available | ||
| - Notify listeners when the set changes | ||
|
|
||
| **Key Methods:** | ||
| - `constructor(getStorage: () => StorageAPI | undefined)`: Initialize with storage getter for persistence | ||
| - `add(id: RawCoID, peerId: PeerID)`: Add a CoValue to the unsynced set (queues for batched persistence) | ||
| - `remove(id: RawCoID, peerId: PeerID)`: Remove a CoValue from the unsynced set (queues for batched persistence) | ||
| - `getAll()`: Returns all unsynced CoValue IDs | ||
| - `isAllSynced()`: Check if all CoValues are synced (O(1), returns `size() === 0`) | ||
| - `private flush()`: Flush all pending persistence operations in a batch | ||
| - `subscribe(id: RawCoID, listener: (synced: boolean) => void)`: Subscribe to changes in whether a CoValue is synced | ||
| - `subscribe(listener: (synced: boolean) => void)`: Subscribe to changes in whether all CoValues are synced | ||
|
|
||
| ### 3. Integration with SyncManager | ||
|
|
||
| **Location:** `packages/cojson/src/sync.ts` | ||
|
|
||
| **Changes:** | ||
| - Add `unsyncedTracker: UnsyncedCoValuesTracker` property to `SyncManager` | ||
| - Initialize tracker in `SyncManager` constructor: `new UnsyncedCoValuesTracker(local.storage, this)` | ||
| - Update `syncContent()` method to keep track of unsynced CoValues created locally: | ||
| ```ts | ||
| syncContent(content: NewContentMessage) { | ||
| const coValue = this.local.getCoValue(content.id); | ||
|
|
||
| this.storeContent(content); | ||
|
|
||
| this.trackSyncState(coValue.id); | ||
|
|
||
| // ... | ||
| } | ||
|
|
||
| trackSyncState(coValueId: RawCoID): void { | ||
| for (const peer of this.getPersistentServerPeers()) { | ||
| this.unsyncedTracker.add(coValueId, peer.id); | ||
|
|
||
| const unsubscribe = this.syncState.subscribeToPeerUpdates( | ||
| coValueId, | ||
| peer.id, | ||
| (_knownState, syncState) => { | ||
| if (syncState.uploaded) { | ||
| this.unsyncedTracker.remove(coValueId, peer.id); | ||
| unsubscribe(); | ||
| } | ||
| }, | ||
| ); | ||
| } | ||
| } | ||
| ``` | ||
| - Update `handleNewContent` method to keep track of unsynced CoValues received from other peers: | ||
| ```ts | ||
| handleNewContent( | ||
| msg: NewContentMessage, | ||
| from: PeerState | "storage" | "import", | ||
| ) { | ||
| const coValue = this.local.getCoValue(msg.id); | ||
| const peer = from === "storage" || from === "import" ? undefined : from; | ||
| const sourceRole = | ||
| from === "storage" | ||
| ? "storage" | ||
| : from === "import" | ||
| ? "import" | ||
| : peer?.role; | ||
|
|
||
| // ... | ||
|
|
||
| if (from !== "storage" && hasNewContent) { | ||
| this.storeContent(validNewContent); | ||
| } | ||
|
|
||
| if (sourceRole === "client" && hasNewContent) { | ||
| this.trackSyncState(coValue.id); | ||
| } | ||
|
|
||
| // ... | ||
| } | ||
| ``` | ||
| - Add method `async resumeUnsyncedCoValues()` to load and resume syncing unsynced CoValues. This happens asynchronously and doesn't block initialization | ||
| - Call `resumeUnsyncedCoValues` as part of `SyncManager.startPeerReconciliation`. | ||
|
|
||
| **`resumeUnsyncedCoValues()` implementation:** | ||
|
|
||
| ```typescript | ||
| async resumeUnsyncedCoValues(): Promise<void> { | ||
| if (!this.local.storage) { | ||
| // No storage available, skip resumption | ||
| return; | ||
| } | ||
|
|
||
| await new Promise<void>((resolve, reject) => { | ||
| // Load all persisted unsynced CoValues from storage | ||
| this.local.storage!.getUnsyncedCoValueIDs((unsyncedCoValueIDs) => { | ||
| if (unsyncedCoValueIDs.length === 0) { | ||
| resolve(); | ||
| return; | ||
| } | ||
|
|
||
| const BATCH_SIZE = 10; | ||
| let processed = 0; | ||
|
|
||
| const processBatch = async () => { | ||
| const batch = unsyncedCoValueIDs.slice( | ||
| processed, | ||
| processed + BATCH_SIZE, | ||
| ); | ||
|
|
||
| await Promise.all( | ||
| batch.map(async (coValueId) => { | ||
| try { | ||
| // Load the CoValue from storage (this will trigger sync if peers are connected) | ||
| const coValue = await this.local.loadCoValueCore(coValueId); | ||
|
|
||
| if (coValue.isAvailable()) { | ||
| // CoValue was successfully loaded. Resume tracking sync state for this CoValue | ||
| // This will add it back to the tracker and set up subscriptions | ||
| this.trackSyncState(coValueId); | ||
| } else { | ||
| // CoValue not found in storage. Remove all peer entries for this CoValue | ||
| this.local.storage!.stopTrackingSyncState(coValueId); | ||
| } | ||
| } catch (error) { | ||
| // Handle errors gracefully - log but don't fail the entire resumption | ||
| logger.warn( | ||
| `Failed to resume sync for CoValue ${coValueId}:`, | ||
| error, | ||
| ); | ||
| this.local.storage!.stopTrackingSyncState(coValueId); | ||
| } | ||
| }), | ||
| ); | ||
|
|
||
| processed += batch.length; | ||
|
|
||
| if (processed < unsyncedCoValueIDs.length) { | ||
| // Process next batch asynchronously to avoid blocking | ||
| setTimeout(processBatch, 0); | ||
| } else { | ||
| resolve(); | ||
| } | ||
| }; | ||
|
|
||
| processBatch().catch(reject); | ||
| }); | ||
| }); | ||
| } | ||
| ``` | ||
|
|
||
| ### 4. Sync Status Subscriptions | ||
|
|
||
| **Location:** `packages/cojson/src/coValueCore/coValueCore.ts` and `packages/cojson/src/sync.ts` | ||
|
|
||
| **CoValueCore.subscribeToSyncState:** | ||
| - Subscribe to changes in whether this specific CoValue is synced | ||
| - Uses `syncManager.unsyncedTracker.subscribe(coValueId)` to get notified when the CoValue's sync state changes | ||
| - Calls listener immediately with current state on subscription | ||
|
|
||
| **SyncManager.subscribeToSyncState:** | ||
| - Subscribe to changes in whether all CoValues are synced | ||
| - Uses `syncManager.unsyncedTracker.subscribe()` to get notified when the unsynced set changes | ||
| - Calls listener immediately with current state on subscription (check `unsyncedTracker.isAllSynced()`) | ||
|
|
||
| ### 5. Refactored waitForSync | ||
|
|
||
| **Location:** `packages/cojson/src/coValueCore/coValueCore.ts` and `packages/cojson/src/sync.ts` | ||
|
|
||
| **Changes:** | ||
| - Replace the current implementation of `CoValueCore.waitForSync()` based on CoValue subscription with a simple call to `syncManager.unsyncedTracker.subscribe(coValueId)`. | ||
|
|
||
| ## Data Model | ||
|
|
||
| ### UnsyncedCoValuesTracker In-Memory Representation | ||
|
|
||
| The tracker maintains an in-memory data structure that maps each unsynced CoValue to the set of peers it's unsynced to: | ||
|
|
||
| ```typescript | ||
| class UnsyncedCoValuesTracker { | ||
| // Map from CoValue ID to Set of Peer IDs that the CoValue is unsynced to | ||
| private unsynced: Map<RawCoID, Set<PeerID>> = new Map(); | ||
| private coValueListeners: Map<RawCoID, (synced: boolean) => void> = new Map(); | ||
| private globalListeners: (synced: boolean) => void = new Set(); | ||
| private storage?: StorageAPI; | ||
| } | ||
| ``` | ||
|
|
||
| ### Storage Layout | ||
|
|
||
| Storage persists unsynced CoValue-to-peer relationships as individual rows, with one row per (CoValue ID, Peer ID) pair. | ||
|
|
||
| **IndexedDB Schema:** | ||
| - Object store: `"unsyncedCoValues"` | ||
| - Key: `[coValueId, peerId]` (composite key) | ||
| - Value: `{ coValueId: RawCoID, peerId: PeerID }` | ||
| - Indexes: | ||
| - Index on `coValueId` for efficient queries by CoValue | ||
| - Index on `peerId` for efficient queries by peer (optional, for cleanup) | ||
|
|
||
| **SQLite Schema:** | ||
| ```sql | ||
| CREATE TABLE unsynced_covalues ( | ||
| co_value_id TEXT NOT NULL, | ||
| peer_id TEXT NOT NULL, | ||
| PRIMARY KEY (co_value_id, peer_id) | ||
| ); | ||
|
|
||
| CREATE INDEX idx_unsynced_covalues_co_value_id ON unsynced_covalues(co_value_id); | ||
| ``` | ||
|
|
||
| **Storage Operations:** | ||
| - `trackCoValuesSyncState(updates)`: | ||
| - Takes an array of operations `{ id: RawCoID, peerId: PeerID, synced: boolean }[]` | ||
| - Executes all operations in a single transaction | ||
| - For each operation: | ||
| - If `synced === true`: DELETE row where `co_value_id = id AND peer_id = peerId` | ||
| - If `synced === false`: INSERT OR REPLACE row with `(id, peerId)` | ||
| - `getUnsyncedCoValueIDs(callback)`: | ||
| - Query all distinct `co_value_id` values (SELECT DISTINCT co_value_id) | ||
| - Return array of unique CoValue IDs that have at least one unsynced peer | ||
|
|
||
| **Example Storage Data:** | ||
| ``` | ||
| co_value_id | peer_id | ||
| ---------------|---------- | ||
| co_abc123 | peer1 | ||
| co_abc123 | peer2 | ||
| co_def456 | peer1 | ||
| ``` | ||
|
|
||
| This represents: | ||
| - `co_abc123` is unsynced to `peer1` and `peer2` | ||
| - `co_def456` is unsynced to `peer1` | ||
|
|
||
| **Loading on Startup:** | ||
| 1. Call `getUnsyncedCoValueIDs()` to get all CoValue IDs with unsynced peers | ||
| 2. For each CoValue ID, query all peer IDs: `SELECT peer_id WHERE co_value_id = ?` | ||
| 3. Reconstruct the in-memory Map structure | ||
| 4. Load each CoValue and resume syncing | ||
|
|
||
| ## Error Handling / Testing Strategy | ||
|
|
||
| ### Error Handling | ||
|
|
||
| 1. **Storage Errors:** | ||
| - If persistence fails, log error but continue with in-memory tracking | ||
| - On load failure, start with empty set (graceful degradation) | ||
| - Don't block LocalNode initialization if persistence fails | ||
|
|
||
| 2. **Missing CoValues:** | ||
| - Handle gracefully when trying to load non-existent CoValues | ||
|
|
||
| 3. **Peer Connection Issues:** | ||
| - Continue tracking even when peers are disconnected | ||
| - Resume syncing when peers reconnect | ||
|
|
||
| 4. **Race Conditions:** | ||
| - Use atomic operations for add/remove from Set | ||
| - Ensure persistence operations don't interfere with tracking updates | ||
|
|
||
| ### Testing Strategy | ||
|
|
||
| 1. **Unit Tests:** | ||
| - Test `UnsyncedCoValuesTracker` operations | ||
| - Test persistence and loading | ||
| - Test subscription notifications | ||
| - Test sync state determination logic | ||
|
|
||
| 2. **Integration Tests:** | ||
| - Test integration with `SyncManager` and `SyncStateManager` | ||
| - Test that CoValues are tracked when they become unsynced | ||
| - Test that CoValues are removed when they become synced | ||
| - Test resumption on LocalNode initialization | ||
| - Test subscription APIs (`subscribeToSyncState`) | ||
| - Test refactored `waitForSync` | ||
|
|
||
| 3. **E2E Tests:** | ||
| - Test offline/online scenario with partial CoValue loading | ||
| - Test that unsynced CoValues are synced after app restart | ||
|
|
||
| 4. **Performance Tests:** | ||
| - Test with large numbers of unsynced CoValues | ||
| - Test persistence/loading performance | ||
| - Test subscription performance with many listeners | ||
| - Test polling performance in `waitForSync` | ||
|
|
||
| 5. **Platform Tests:** | ||
| - Test on web (IndexedDB storage) | ||
| - Test on Node.js (SQLite storage) | ||
|
|
||
| ### Edge Cases | ||
|
|
||
| 1. **Very large unsynced set** - Should handle efficiently, without a noticeable impact on app startup | ||
| 2. **Rapid sync state changes** - Should debounce/throttle persistence | ||
| 3. **Multiple sessions sharing storage** - IndexedDB is shared across tabs, but this shouldn't be a problem as we don't need to keep the in-memory unsynced CoValue list in sync with storage (we only use storage for persistence | ||
| across app restarts) | ||
| 4. **Storage unavailable** - Should fall back to in-memory only |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.