Skip to content

Commit 58fd059

Browse files
[Fix] Stale credentials in mulitple tabs (#257)
1 parent 10a03d0 commit 58fd059

File tree

3 files changed

+122
-87
lines changed

3 files changed

+122
-87
lines changed

.changeset/breezy-crews-fry.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/web': patch
3+
---
4+
5+
Fix an issue where the shared sync manager would not discard stale credentials

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

+11-8
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1+
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
12
import * as Comlink from 'comlink';
2-
import {
3-
WebStreamingSyncImplementation,
4-
WebStreamingSyncImplementationOptions
5-
} from './WebStreamingSyncImplementation';
3+
import { openWorkerDatabasePort } from '../../worker/db/open-worker-database';
4+
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
65
import {
76
ManualSharedSyncPayload,
87
SharedSyncClientEvent,
98
SharedSyncImplementation
109
} from '../../worker/sync/SharedSyncImplementation';
11-
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
12-
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
13-
import { openWorkerDatabasePort } from '../../worker/db/open-worker-database';
10+
import {
11+
WebStreamingSyncImplementation,
12+
WebStreamingSyncImplementationOptions
13+
} from './WebStreamingSyncImplementation';
1414

1515
/**
1616
* The shared worker will trigger methods on this side of the message port
@@ -144,6 +144,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
144144
*/
145145
async connect(options?: PowerSyncConnectionOptions): Promise<void> {
146146
await this.waitForReady();
147+
// This is needed since a new tab won't have any reference to the
148+
// shared worker sync implementation since that is only created on the first call to `connect`.
149+
await this.disconnect();
147150
return this.syncManager.connect(options);
148151
}
149152

@@ -170,9 +173,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
170173
};
171174

172175
this.messagePort.postMessage(closeMessagePayload);
173-
174176
// Release the proxy
175177
this.syncManager[Comlink.releaseProxy]();
178+
this.messagePort.close();
176179
}
177180

178181
async waitForReady() {

packages/web/src/worker/sync/SharedSyncImplementation.ts

+106-79
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
1-
import * as Comlink from 'comlink';
2-
import Logger, { type ILogger } from 'js-logger';
31
import {
42
type AbstractStreamingSyncImplementation,
5-
type StreamingSyncImplementation,
63
type LockOptions,
4+
type PowerSyncConnectionOptions,
5+
type StreamingSyncImplementation,
76
type StreamingSyncImplementationListener,
87
type SyncStatusOptions,
9-
type PowerSyncConnectionOptions,
8+
AbortOperation,
109
BaseObserver,
10+
DBAdapter,
1111
SqliteBucketStorage,
12-
SyncStatus,
13-
AbortOperation
12+
SyncStatus
1413
} from '@powersync/common';
14+
import { Mutex } from 'async-mutex';
15+
import * as Comlink from 'comlink';
16+
import Logger, { type ILogger } from 'js-logger';
17+
import { WebRemote } from '../../db/sync/WebRemote';
1518
import {
1619
WebStreamingSyncImplementation,
1720
WebStreamingSyncImplementationOptions
1821
} from '../../db/sync/WebStreamingSyncImplementation';
19-
import { Mutex } from 'async-mutex';
20-
import { WebRemote } from '../../db/sync/WebRemote';
2122

2223
import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter';
2324
import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider';
@@ -66,20 +67,28 @@ export class SharedSyncImplementation
6667
implements StreamingSyncImplementation
6768
{
6869
protected ports: WrappedSyncPort[];
69-
protected syncStreamClient?: AbstractStreamingSyncImplementation;
70+
protected syncStreamClient: AbstractStreamingSyncImplementation | null;
7071

7172
protected isInitialized: Promise<void>;
7273
protected statusListener?: () => void;
7374

7475
protected fetchCredentialsController?: RemoteOperationAbortController;
7576
protected uploadDataController?: RemoteOperationAbortController;
7677

78+
protected dbAdapter: DBAdapter | null;
79+
protected syncParams: SharedSyncInitOptions | null;
80+
protected logger: ILogger;
81+
7782
syncStatus: SyncStatus;
7883
broadCastLogger: ILogger;
7984

8085
constructor() {
8186
super();
8287
this.ports = [];
88+
this.dbAdapter = null;
89+
this.syncParams = null;
90+
this.syncStreamClient = null;
91+
this.logger = Logger.get('shared-sync');
8392

8493
this.isInitialized = new Promise((resolve) => {
8594
const callback = this.registerListener({
@@ -115,82 +124,29 @@ export class SharedSyncImplementation
115124
* Configures the DBAdapter connection and a streaming sync client.
116125
*/
117126
async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) {
118-
if (this.syncStreamClient) {
127+
if (this.dbAdapter) {
119128
// Cannot modify already existing sync implementation
120129
return;
121130
}
122131

123-
const logger = params.streamOptions?.flags?.broadcastLogs ? this.broadCastLogger : Logger.get('shared-sync');
132+
this.dbAdapter = new WASQLiteDBAdapter({
133+
dbFilename: params.dbName,
134+
workerPort: dbWorkerPort,
135+
flags: { enableMultiTabs: true, useWebWorker: true },
136+
logger: this.logger
137+
});
138+
139+
this.syncParams = params;
140+
141+
if (params.streamOptions?.flags?.broadcastLogs) {
142+
this.logger = this.broadCastLogger;
143+
}
124144

125145
self.onerror = (event) => {
126146
// Share any uncaught events on the broadcast logger
127-
logger.error('Uncaught exception in PowerSync shared sync worker', event);
147+
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
128148
};
129149

130-
this.syncStreamClient = new WebStreamingSyncImplementation({
131-
adapter: new SqliteBucketStorage(
132-
new WASQLiteDBAdapter({
133-
dbFilename: params.dbName,
134-
workerPort: dbWorkerPort,
135-
flags: { enableMultiTabs: true, useWebWorker: true },
136-
logger
137-
}),
138-
new Mutex(),
139-
logger
140-
),
141-
remote: new WebRemote({
142-
fetchCredentials: async () => {
143-
const lastPort = this.ports[this.ports.length - 1];
144-
return new Promise(async (resolve, reject) => {
145-
const abortController = new AbortController();
146-
this.fetchCredentialsController = {
147-
controller: abortController,
148-
activePort: lastPort
149-
};
150-
151-
abortController.signal.onabort = reject;
152-
try {
153-
resolve(await lastPort.clientProvider.fetchCredentials());
154-
} catch (ex) {
155-
reject(ex);
156-
} finally {
157-
this.fetchCredentialsController = undefined;
158-
}
159-
});
160-
}
161-
}),
162-
uploadCrud: async () => {
163-
const lastPort = this.ports[this.ports.length - 1];
164-
165-
return new Promise(async (resolve, reject) => {
166-
const abortController = new AbortController();
167-
this.uploadDataController = {
168-
controller: abortController,
169-
activePort: lastPort
170-
};
171-
172-
// Resolving will make it retry
173-
abortController.signal.onabort = () => resolve();
174-
try {
175-
resolve(await lastPort.clientProvider.uploadCrud());
176-
} catch (ex) {
177-
reject(ex);
178-
} finally {
179-
this.uploadDataController = undefined;
180-
}
181-
});
182-
},
183-
...params.streamOptions,
184-
// Logger cannot be transferred just yet
185-
logger
186-
});
187-
188-
this.syncStreamClient.registerListener({
189-
statusChanged: (status) => {
190-
this.updateAllStatuses(status.toJSON());
191-
}
192-
});
193-
194150
this.iterateListeners((l) => l.initialized?.());
195151
}
196152

@@ -209,13 +165,27 @@ export class SharedSyncImplementation
209165
async connect(options?: PowerSyncConnectionOptions) {
210166
await this.waitForReady();
211167
// This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized
212-
return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.connect(options));
168+
return navigator.locks.request('shared-sync-connect', async () => {
169+
this.syncStreamClient = this.generateStreamingImplementation();
170+
171+
this.syncStreamClient.registerListener({
172+
statusChanged: (status) => {
173+
this.updateAllStatuses(status.toJSON());
174+
}
175+
});
176+
177+
await this.syncStreamClient.connect(options);
178+
});
213179
}
214180

215181
async disconnect() {
216182
await this.waitForReady();
217183
// This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized
218-
return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.disconnect());
184+
return navigator.locks.request('shared-sync-connect', async () => {
185+
await this.syncStreamClient?.disconnect();
186+
await this.syncStreamClient?.dispose();
187+
this.syncStreamClient = null;
188+
});
219189
}
220190

221191
/**
@@ -281,6 +251,62 @@ export class SharedSyncImplementation
281251
return this.syncStreamClient!.getWriteCheckpoint();
282252
}
283253

254+
protected generateStreamingImplementation() {
255+
// This should only be called after initialization has completed
256+
const syncParams = this.syncParams!;
257+
258+
// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
259+
return new WebStreamingSyncImplementation({
260+
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
261+
remote: new WebRemote({
262+
fetchCredentials: async () => {
263+
const lastPort = this.ports[this.ports.length - 1];
264+
return new Promise(async (resolve, reject) => {
265+
const abortController = new AbortController();
266+
this.fetchCredentialsController = {
267+
controller: abortController,
268+
activePort: lastPort
269+
};
270+
271+
abortController.signal.onabort = reject;
272+
try {
273+
console.log('calling the last port client provider for credentials');
274+
resolve(await lastPort.clientProvider.fetchCredentials());
275+
} catch (ex) {
276+
reject(ex);
277+
} finally {
278+
this.fetchCredentialsController = undefined;
279+
}
280+
});
281+
}
282+
}),
283+
uploadCrud: async () => {
284+
const lastPort = this.ports[this.ports.length - 1];
285+
286+
return new Promise(async (resolve, reject) => {
287+
const abortController = new AbortController();
288+
this.uploadDataController = {
289+
controller: abortController,
290+
activePort: lastPort
291+
};
292+
293+
// Resolving will make it retry
294+
abortController.signal.onabort = () => resolve();
295+
try {
296+
resolve(await lastPort.clientProvider.uploadCrud());
297+
} catch (ex) {
298+
reject(ex);
299+
} finally {
300+
this.uploadDataController = undefined;
301+
}
302+
});
303+
},
304+
...syncParams.streamOptions,
305+
// Logger cannot be transferred just yet
306+
logger: this.logger
307+
});
308+
}
309+
284310
/**
285311
* A method to update the all shared statuses for each
286312
* client.
@@ -296,7 +322,8 @@ export class SharedSyncImplementation
296322
*/
297323
private _testUpdateAllStatuses(status: SyncStatusOptions) {
298324
if (!this.syncStreamClient) {
299-
console.warn('no stream client has been initialized yet');
325+
// This is just for testing purposes
326+
this.syncStreamClient = this.generateStreamingImplementation();
300327
}
301328

302329
// Only assigning, don't call listeners for this test

0 commit comments

Comments
 (0)