Skip to content

Commit 7b49661

Browse files
feat: refresh watched queries on schema changes (#377)
Co-authored-by: Steven Ontong <[email protected]>
1 parent 1753453 commit 7b49661

File tree

29 files changed

+670
-406
lines changed

29 files changed

+670
-406
lines changed

.changeset/itchy-years-drop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Updated watch functions to recalculate depedendent tables if schema is updated.

.changeset/short-owls-play.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/op-sqlite': minor
3+
'@powersync/react-native': minor
4+
'@powersync/web': minor
5+
---
6+
7+
Added `refreshSchema()` which will cause all connections to be aware of a schema change.

.changeset/ten-birds-camp.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/vue': patch
3+
---
4+
5+
Queries will recalculate dependent tables if schema is updated.

.changeset/tender-llamas-shop.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/tanstack-react-query': patch
3+
'@powersync/react': patch
4+
---
5+
6+
Queries will recalculate dependent tables if schema is updated.

demos/react-supabase-todolist-optional-sync/src/app/views/layout.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export default function ViewsLayout({ children }: { children: React.ReactNode })
5858
beforeNavigate: async () => {
5959
// If user is logged in, sign out and stay on the current page
6060
if (supabase?.currentSession) {
61-
await supabase?.client.auth.signOut();
61+
await supabase?.logout();
6262
await powerSync.disconnectAndClear();
6363
setSyncEnabled(powerSync.database.name, false);
6464

demos/react-supabase-todolist-optional-sync/src/library/powersync/SupabaseConnector.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ export class SupabaseConnector extends BaseObserver<SupabaseConnectorListener> i
9393
this.updateSession(session);
9494
}
9595

96+
async logout() {
97+
await this.client.auth.signOut();
98+
this.updateSession(null);
99+
}
100+
96101
async fetchCredentials() {
97102
const {
98103
data: { session },

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
StreamingSyncImplementation,
2929
StreamingSyncImplementationListener
3030
} from './sync/stream/AbstractStreamingSyncImplementation.js';
31+
import { runOnSchemaChange } from './runOnSchemaChange.js';
3132

3233
export interface DisconnectAndClearOptions {
3334
/** When set to false, data in local-only tables is preserved. */
@@ -103,6 +104,7 @@ export interface WatchOnChangeHandler {
103104

104105
export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
105106
initialized: () => void;
107+
schemaChanged: (schema: Schema) => void;
106108
}
107109

108110
export interface PowerSyncCloseOptions {
@@ -360,7 +362,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
360362
this.options.logger?.warn('Schema validation failed. Unexpected behaviour could occur', ex);
361363
}
362364
this._schema = schema;
365+
363366
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
367+
await this.database.refreshSchema();
368+
this.iterateListeners(async (cb) => cb.schemaChanged?.(schema));
364369
}
365370

366371
/**
@@ -758,10 +763,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
758763
throw new Error('onResult is required');
759764
}
760765

761-
(async () => {
766+
const watchQuery = async (abortSignal: AbortSignal) => {
762767
try {
763768
const resolvedTables = await this.resolveTables(sql, parameters, options);
764-
765769
// Fetch initial data
766770
const result = await this.executeReadOnly(sql, parameters);
767771
onResult(result);
@@ -780,13 +784,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
780784
},
781785
{
782786
...(options ?? {}),
783-
tables: resolvedTables
787+
tables: resolvedTables,
788+
// Override the abort signal since we intercept it
789+
signal: abortSignal
784790
}
785791
);
786792
} catch (error) {
787793
onError?.(error);
788794
}
789-
})();
795+
};
796+
797+
runOnSchemaChange(watchQuery, this, options);
790798
}
791799

792800
/**
@@ -796,19 +804,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
796804
*/
797805
watchWithAsyncGenerator(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
798806
return new EventIterator<QueryResult>((eventOptions) => {
799-
(async () => {
800-
const resolvedTables = await this.resolveTables(sql, parameters, options);
807+
const handler: WatchHandler = {
808+
onResult: (result) => {
809+
eventOptions.push(result);
810+
},
811+
onError: (error) => {
812+
eventOptions.fail(error);
813+
}
814+
};
801815

802-
// Fetch initial data
803-
eventOptions.push(await this.executeReadOnly(sql, parameters));
816+
this.watchWithCallback(sql, parameters, handler, options);
804817

805-
for await (const event of this.onChangeWithAsyncGenerator({
806-
...(options ?? {}),
807-
tables: resolvedTables
808-
})) {
809-
eventOptions.push(await this.executeReadOnly(sql, parameters));
810-
}
811-
})();
818+
options?.signal?.addEventListener('abort', () => {
819+
eventOptions.stop();
820+
});
812821
});
813822
}
814823

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js';
2+
3+
export function runOnSchemaChange(
4+
callback: (signal: AbortSignal) => void,
5+
db: AbstractPowerSyncDatabase,
6+
options?: SQLWatchOptions
7+
): void {
8+
const triggerWatchedQuery = () => {
9+
const abortController = new AbortController();
10+
let disposeSchemaListener: (() => void) | null = null;
11+
const stopWatching = () => {
12+
abortController.abort('Abort triggered');
13+
disposeSchemaListener?.();
14+
disposeSchemaListener = null;
15+
// Stop listening to upstream abort for this watch
16+
options?.signal?.removeEventListener('abort', stopWatching);
17+
};
18+
19+
options?.signal?.addEventListener('abort', stopWatching);
20+
disposeSchemaListener = db.registerListener({
21+
schemaChanged: async () => {
22+
stopWatching();
23+
// Re trigger the watched query (recursively), setTimeout ensures that we don't modify the list of listeners while iterating through them
24+
setTimeout(() => triggerWatchedQuery(), 0);
25+
}
26+
});
27+
callback(abortController.signal);
28+
};
29+
30+
triggerWatchedQuery();
31+
}

packages/common/src/db/DBAdapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBG
101101
readTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
102102
writeLock: <T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions) => Promise<T>;
103103
writeTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
104+
/**
105+
* This method refreshes the schema information across all connections. This is for advanced use cases, and should generally not be needed.
106+
*/
107+
refreshSchema: () => Promise<void>;
104108
}
105109

106110
export function isBatchedUpdateNotification(

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export * from './client/SQLOpenFactory.js';
44
export * from './client/connection/PowerSyncBackendConnector.js';
55
export * from './client/connection/PowerSyncCredentials.js';
66
export * from './client/sync/bucket/BucketStorageAdapter.js';
7+
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
78
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
89
export * from './client/sync/bucket/SqliteBucketStorage.js';
910
export * from './client/sync/bucket/CrudBatch.js';

packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
7979
}
8080
return result as T;
8181
}
82+
83+
async refreshSchema() {
84+
await this.get("PRAGMA table_info('sqlite_master')");
85+
}
8286
}

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,13 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
271271
throw ex;
272272
}
273273
}
274+
275+
async refreshSchema(): Promise<void> {
276+
await this.initialized;
277+
await this.writeConnection!.refreshSchema();
278+
279+
for (let readConnection of this.readConnections) {
280+
await readConnection.connection.refreshSchema();
281+
}
282+
}
274283
}

packages/react-native/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
},
3131
"homepage": "https://docs.powersync.com/",
3232
"peerDependencies": {
33-
"@journeyapps/react-native-quick-sqlite": "^2.0.0",
34-
"@powersync/common": "workspace:^1.20.2",
33+
"@journeyapps/react-native-quick-sqlite": "^2.1.0",
34+
"@powersync/common": "workspace:^1.20.1",
3535
"react": "*",
3636
"react-native": "*"
3737
},
@@ -46,7 +46,7 @@
4646
},
4747
"devDependencies": {
4848
"@craftzdog/react-native-buffer": "^6.0.5",
49-
"@journeyapps/react-native-quick-sqlite": "^2.0.0",
49+
"@journeyapps/react-native-quick-sqlite": "^2.1.0",
5050
"@rollup/plugin-alias": "^5.1.0",
5151
"@rollup/plugin-commonjs": "^25.0.7",
5252
"@rollup/plugin-inject": "^5.0.5",

packages/react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,8 @@ export class RNQSDBAdapter extends BaseObserver<DBAdapterListener> implements DB
124124
}
125125
};
126126
}
127+
128+
async refreshSchema() {
129+
await this.baseDB.refreshSchema();
130+
}
127131
}

packages/react/src/WatchedQuery.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import { AbstractPowerSyncDatabase, BaseListener, BaseObserver, CompilableQuery, Disposable } from '@powersync/common';
1+
import {
2+
AbstractPowerSyncDatabase,
3+
BaseListener,
4+
BaseObserver,
5+
CompilableQuery,
6+
Disposable,
7+
runOnSchemaChange
8+
} from '@powersync/common';
29
import { AdditionalOptions } from './hooks/useQuery';
310

411
export class Query<T> {
@@ -117,7 +124,7 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> implements
117124
this.setError(error);
118125
};
119126

120-
(async () => {
127+
const watchQuery = async (abortSignal: AbortSignal) => {
121128
await this.fetchTables();
122129
await this.fetchData();
123130

@@ -131,12 +138,13 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> implements
131138
},
132139
{
133140
...this.options,
134-
signal: this.controller.signal,
141+
signal: abortSignal,
135142
tables: this.tables
136143
}
137144
);
138145
}
139-
})();
146+
};
147+
runOnSchemaChange(watchQuery, this.db, { signal: this.controller.signal });
140148
}
141149

142150
private setData(results: any[]) {

packages/react/src/hooks/useQuery.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,18 @@ export const useQuery = <T = any>(
118118
};
119119

120120
React.useEffect(() => {
121-
(async () => {
121+
const updateData = async () => {
122122
await fetchTables();
123123
await fetchData();
124-
})();
124+
};
125+
126+
updateData();
127+
128+
const l = powerSync.registerListener({
129+
schemaChanged: updateData
130+
});
131+
132+
return () => l?.();
125133
}, [powerSync, memoizedParams, sqlStatement]);
126134

127135
React.useEffect(() => {

packages/react/tests/useQuery.test.tsx

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ import { useQuery } from '../src/hooks/useQuery';
77

88
const mockPowerSync = {
99
currentStatus: { status: 'initial' },
10-
registerListener: vi.fn(() => ({
11-
statusChanged: vi.fn(() => 'updated')
12-
})),
10+
registerListener: vi.fn(() => {}),
1311
resolveTables: vi.fn(() => ['table1', 'table2']),
1412
onChangeWithCallback: vi.fn(),
1513
getAll: vi.fn(() => Promise.resolve(['list1', 'list2']))
@@ -92,9 +90,7 @@ describe('useQuery', () => {
9290
it('should set error when error occurs and runQueryOnce flag is set', async () => {
9391
const mockPowerSyncError = {
9492
currentStatus: { status: 'initial' },
95-
registerListener: vi.fn(() => ({
96-
statusChanged: vi.fn(() => 'updated')
97-
})),
93+
registerListener: vi.fn(() => {}),
9894
onChangeWithCallback: vi.fn(),
9995
resolveTables: vi.fn(() => ['table1', 'table2']),
10096
getAll: vi.fn(() => {
@@ -119,9 +115,7 @@ describe('useQuery', () => {
119115
it('should set error when error occurs', async () => {
120116
const mockPowerSyncError = {
121117
currentStatus: { status: 'initial' },
122-
registerListener: vi.fn(() => ({
123-
statusChanged: vi.fn(() => 'updated')
124-
})),
118+
registerListener: vi.fn(() => {}),
125119
onChangeWithCallback: vi.fn(),
126120
resolveTables: vi.fn(() => ['table1', 'table2']),
127121
getAll: vi.fn(() => {

packages/react/tests/useStatus.test.tsx

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ describe('useStatus', () => {
3333
it.skip('should update the status when the listener is called', () => {
3434
const mockPowerSyncInTest = {
3535
currentStatus: { status: 'initial' },
36-
registerListener: () => ({
37-
statusChanged: () => 'updated'
38-
})
36+
registerListener: vi.fn(() => {})
3937
};
4038

4139
const wrapper = ({ children }) => (

packages/react/tests/useSuspenseQuery.test.tsx

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ const defaultQueryResult = ['list1', 'list2'];
1111
const createMockPowerSync = () => {
1212
return {
1313
currentStatus: { status: 'initial' },
14-
registerListener: vi.fn(() => ({
15-
statusChanged: vi.fn(() => 'updated')
16-
})),
14+
registerListener: vi.fn(() => {}),
1715
resolveTables: vi.fn(() => ['table1', 'table2']),
1816
onChangeWithCallback: vi.fn(),
1917
getAll: vi.fn(() => Promise.resolve(defaultQueryResult)) as Mock<any, any>

packages/tanstack-react-query/src/hooks/useQuery.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,20 @@ function useQueryCore<
102102
};
103103

104104
React.useEffect(() => {
105-
if (!query) return;
105+
if (!query) return () => {};
106106

107107
(async () => {
108108
await fetchTables();
109109
})();
110+
111+
const l = powerSync.registerListener({
112+
schemaChanged: async () => {
113+
await fetchTables();
114+
queryClient.invalidateQueries({ queryKey: options.queryKey });
115+
}
116+
});
117+
118+
return () => l?.();
110119
}, [powerSync, sqlStatement, stringifiedParams]);
111120

112121
const queryFn = React.useCallback(async () => {

0 commit comments

Comments
 (0)