Skip to content

Commit b7a150a

Browse files
fix: Drizzle Read Connection Concurrency (#765)
Co-authored-by: Ralf Kistner <[email protected]>
1 parent 14521c0 commit b7a150a

File tree

12 files changed

+928
-134
lines changed

12 files changed

+928
-134
lines changed

.changeset/hot-socks-love.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/drizzle-driver': minor
3+
'@powersync/node': minor
4+
---
5+
6+
Add support for concurrent read queries with Drizzle.

packages/drizzle-driver/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@
5050
"@powersync/web": "workspace:*",
5151
"@journeyapps/wa-sqlite": "^1.3.2",
5252
"@types/node": "^20.17.6",
53-
"drizzle-orm": "^0.35.2",
53+
"drizzle-orm": "^0.44.7",
5454
"vite": "^6.1.0",
5555
"vite-plugin-top-level-await": "^1.4.4",
5656
"vite-plugin-wasm": "^3.3.0"
5757
}
58-
}
58+
}

packages/drizzle-driver/src/sqlite/PowerSyncSQLiteBaseSession.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { LockContext, QueryResult } from '@powersync/common';
1+
import type { QueryResult } from '@powersync/common';
2+
import type { WithCacheConfig } from 'drizzle-orm/cache/core/types';
23
import { entityKind } from 'drizzle-orm/entity';
34
import type { Logger } from 'drizzle-orm/logger';
45
import { NoopLogger } from 'drizzle-orm/logger';
@@ -7,13 +8,13 @@ import { type Query } from 'drizzle-orm/sql/sql';
78
import type { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
89
import type { SelectedFieldsOrdered } from 'drizzle-orm/sqlite-core/query-builders/select.types';
910
import {
10-
type PreparedQueryConfig as PreparedQueryConfigBase,
11-
type SQLiteExecuteMethod,
1211
SQLiteSession,
1312
SQLiteTransaction,
13+
type PreparedQueryConfig as PreparedQueryConfigBase,
14+
type SQLiteExecuteMethod,
1415
type SQLiteTransactionConfig
1516
} from 'drizzle-orm/sqlite-core/session';
16-
import { PowerSyncSQLitePreparedQuery } from './PowerSyncSQLitePreparedQuery.js';
17+
import { PowerSyncSQLitePreparedQuery, type ContextProvider } from './PowerSyncSQLitePreparedQuery.js';
1718

1819
export interface PowerSyncSQLiteSessionOptions {
1920
logger?: Logger;
@@ -39,7 +40,7 @@ export class PowerSyncSQLiteBaseSession<
3940
protected logger: Logger;
4041

4142
constructor(
42-
protected db: LockContext,
43+
protected contextProvider: ContextProvider,
4344
protected dialect: SQLiteAsyncDialect,
4445
protected schema: RelationalSchemaConfig<TSchema> | undefined,
4546
protected options: PowerSyncSQLiteSessionOptions = {}
@@ -53,16 +54,24 @@ export class PowerSyncSQLiteBaseSession<
5354
fields: SelectedFieldsOrdered | undefined,
5455
executeMethod: SQLiteExecuteMethod,
5556
isResponseInArrayMode: boolean,
56-
customResultMapper?: (rows: unknown[][], mapColumnValue?: (value: unknown) => unknown) => unknown
57+
customResultMapper?: (rows: unknown[][], mapColumnValue?: (value: unknown) => unknown) => unknown,
58+
queryMetadata?: {
59+
type: 'select' | 'update' | 'delete' | 'insert';
60+
tables: string[];
61+
},
62+
cacheConfig?: WithCacheConfig
5763
): PowerSyncSQLitePreparedQuery<T> {
5864
return new PowerSyncSQLitePreparedQuery(
59-
this.db,
65+
this.contextProvider,
6066
query,
6167
this.logger,
6268
fields,
6369
executeMethod,
6470
isResponseInArrayMode,
65-
customResultMapper
71+
customResultMapper,
72+
undefined, // cache not supported yet
73+
queryMetadata,
74+
cacheConfig
6675
);
6776
}
6877

packages/drizzle-driver/src/sqlite/PowerSyncSQLiteDatabase.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@ import {
1111
createTableRelationsHelpers,
1212
extractTablesRelationalConfig,
1313
ExtractTablesWithRelations,
14+
TableRelationalConfig,
1415
type RelationalSchemaConfig,
1516
type TablesRelationalConfig
1617
} from 'drizzle-orm/relations';
17-
import { SQLiteTransaction } from 'drizzle-orm/sqlite-core';
18+
import { SQLiteSession, SQLiteTable, SQLiteTransaction } from 'drizzle-orm/sqlite-core';
1819
import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db';
1920
import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
21+
import { RelationalQueryBuilder } from 'drizzle-orm/sqlite-core/query-builders/query';
2022
import type { DrizzleConfig } from 'drizzle-orm/utils';
2123
import { toCompilableQuery } from './../utils/compilableQuery.js';
22-
import { PowerSyncSQLiteTransactionConfig } from './PowerSyncSQLiteBaseSession.js';
24+
import { PowerSyncSQLiteBaseSession, PowerSyncSQLiteTransactionConfig } from './PowerSyncSQLiteBaseSession.js';
2325
import { PowerSyncSQLiteSession } from './PowerSyncSQLiteSession.js';
2426

2527
export type DrizzleQuery<T> = { toSQL(): Query; execute(): Promise<T | T[]> };
@@ -54,6 +56,41 @@ export class PowerSyncSQLiteDatabase<
5456

5557
super('async', dialect, session as any, schema as any);
5658
this.db = db;
59+
60+
/**
61+
* A hack in order to use read locks for `db.query.users.findMany()` etc queries.
62+
* We don't currently get queryMetadata for these queries, so we can't use the regular session.
63+
* This session always uses read locks.
64+
*/
65+
const querySession = new PowerSyncSQLiteBaseSession(
66+
{
67+
useReadContext: (callback) => db.readLock(callback),
68+
useWriteContext: (callback) => db.readLock(callback)
69+
},
70+
dialect,
71+
schema,
72+
{
73+
logger
74+
}
75+
);
76+
if (this._.schema) {
77+
// https://github.com/drizzle-team/drizzle-orm/blob/ad4ddd444d066b339ffd5765cb6ec3bf49380189/drizzle-orm/src/sqlite-core/db.ts#L72
78+
const query = this.query as {
79+
[K in keyof TSchema]: RelationalQueryBuilder<'async', any, any, any>;
80+
};
81+
for (const [tableName, columns] of Object.entries(this._.schema)) {
82+
query[tableName as keyof TSchema] = new RelationalQueryBuilder(
83+
'async',
84+
schema!.fullSchema,
85+
this._.schema,
86+
this._.tableNamesMap,
87+
schema!.fullSchema[tableName] as SQLiteTable,
88+
columns as TableRelationalConfig,
89+
dialect,
90+
querySession as SQLiteSession<'async', any, any, any>
91+
);
92+
}
93+
}
5794
}
5895

5996
transaction<T>(

packages/drizzle-driver/src/sqlite/PowerSyncSQLitePreparedQuery.ts

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,37 @@
1-
import { LockContext, QueryResult } from '@powersync/common';
2-
import { Column, DriverValueDecoder, getTableName, SQL } from 'drizzle-orm';
1+
import type { LockContext, QueryResult } from '@powersync/common';
2+
import { Column, DriverValueDecoder, SQL, getTableName } from 'drizzle-orm';
3+
import type { Cache } from 'drizzle-orm/cache/core';
4+
import type { WithCacheConfig } from 'drizzle-orm/cache/core/types';
35
import { entityKind, is } from 'drizzle-orm/entity';
46
import type { Logger } from 'drizzle-orm/logger';
57
import { fillPlaceholders, type Query } from 'drizzle-orm/sql/sql';
68
import { SQLiteColumn } from 'drizzle-orm/sqlite-core';
79
import type { SelectedFieldsOrdered } from 'drizzle-orm/sqlite-core/query-builders/select.types';
810
import {
11+
SQLitePreparedQuery,
912
type PreparedQueryConfig as PreparedQueryConfigBase,
10-
type SQLiteExecuteMethod,
11-
SQLitePreparedQuery
13+
type SQLiteExecuteMethod
1214
} from 'drizzle-orm/sqlite-core/session';
1315

1416
type PreparedQueryConfig = Omit<PreparedQueryConfigBase, 'statement' | 'run'>;
1517

18+
/**
19+
* Callback which uses a LockContext for database operations.
20+
*/
21+
export type LockCallback<T> = (ctx: LockContext) => Promise<T>;
22+
23+
/**
24+
* Provider for specific database contexts.
25+
* Handlers are provided a context to the provided callback.
26+
* This does not necessarily need to acquire a database lock for each call.
27+
* Calls might use the same lock context for multiple operations.
28+
* The read/write context may relate to a single read OR write context.
29+
*/
30+
export type ContextProvider = {
31+
useReadContext: <T>(fn: LockCallback<T>) => Promise<T>;
32+
useWriteContext: <T>(fn: LockCallback<T>) => Promise<T>;
33+
};
34+
1635
export class PowerSyncSQLitePreparedQuery<
1736
T extends PreparedQueryConfig = PreparedQueryConfig
1837
> extends SQLitePreparedQuery<{
@@ -25,36 +44,48 @@ export class PowerSyncSQLitePreparedQuery<
2544
}> {
2645
static readonly [entityKind]: string = 'PowerSyncSQLitePreparedQuery';
2746

47+
private readOnly = false;
48+
2849
constructor(
29-
private db: LockContext,
50+
private contextProvider: ContextProvider,
3051
query: Query,
3152
private logger: Logger,
3253
private fields: SelectedFieldsOrdered | undefined,
3354
executeMethod: SQLiteExecuteMethod,
3455
private _isResponseInArrayMode: boolean,
35-
private customResultMapper?: (rows: unknown[][]) => unknown
56+
private customResultMapper?: (rows: unknown[][]) => unknown,
57+
cache?: Cache | undefined,
58+
queryMetadata?:
59+
| {
60+
type: 'select' | 'update' | 'delete' | 'insert';
61+
tables: string[];
62+
}
63+
| undefined,
64+
cacheConfig?: WithCacheConfig | undefined
3665
) {
37-
super('async', executeMethod, query);
66+
super('async', executeMethod, query, cache, queryMetadata, cacheConfig);
67+
this.readOnly = queryMetadata?.type == 'select';
3868
}
3969

4070
async run(placeholderValues?: Record<string, unknown>): Promise<QueryResult> {
4171
const params = fillPlaceholders(this.query.params, placeholderValues ?? {});
4272
this.logger.logQuery(this.query.sql, params);
43-
const rs = await this.db.execute(this.query.sql, params);
44-
return rs;
73+
return this.useContext(async (ctx) => {
74+
return await ctx.execute(this.query.sql, params);
75+
});
4576
}
4677

4778
async all(placeholderValues?: Record<string, unknown>): Promise<T['all']> {
4879
const { fields, query, logger, customResultMapper } = this;
4980
if (!fields && !customResultMapper) {
5081
const params = fillPlaceholders(query.params, placeholderValues ?? {});
5182
logger.logQuery(query.sql, params);
52-
const rs = await this.db.execute(this.query.sql, params);
53-
return rs.rows?._array ?? [];
83+
return await this.useContext(async (ctx) => {
84+
return await ctx.getAll(this.query.sql, params);
85+
});
5486
}
5587

5688
const rows = (await this.values(placeholderValues)) as unknown[][];
57-
5889
if (customResultMapper) {
5990
const mapped = customResultMapper(rows) as T['all'];
6091
return mapped;
@@ -69,7 +100,9 @@ export class PowerSyncSQLitePreparedQuery<
69100
const { fields, customResultMapper } = this;
70101
const joinsNotNullableMap = (this as any).joinsNotNullableMap;
71102
if (!fields && !customResultMapper) {
72-
return this.db.get(this.query.sql, params);
103+
return this.useContext(async (ctx) => {
104+
return await ctx.get(this.query.sql, params);
105+
});
73106
}
74107

75108
const rows = (await this.values(placeholderValues)) as unknown[][];
@@ -90,12 +123,22 @@ export class PowerSyncSQLitePreparedQuery<
90123
const params = fillPlaceholders(this.query.params, placeholderValues ?? {});
91124
this.logger.logQuery(this.query.sql, params);
92125

93-
return await this.db.executeRaw(this.query.sql, params);
126+
return await this.useContext(async (ctx) => {
127+
return await ctx.executeRaw(this.query.sql, params);
128+
});
94129
}
95130

96131
isResponseInArrayMode(): boolean {
97132
return this._isResponseInArrayMode;
98133
}
134+
135+
protected useContext<T>(callback: LockCallback<T>): Promise<T> {
136+
if (this.readOnly) {
137+
return this.contextProvider.useReadContext(callback);
138+
} else {
139+
return this.contextProvider.useWriteContext(callback);
140+
}
141+
}
99142
}
100143

101144
/**

packages/drizzle-driver/src/sqlite/PowerSyncSQLiteSession.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AbstractPowerSyncDatabase, DBAdapter } from '@powersync/common';
1+
import { AbstractPowerSyncDatabase, LockContext } from '@powersync/common';
22
import { entityKind } from 'drizzle-orm/entity';
33
import type { RelationalSchemaConfig, TablesRelationalConfig } from 'drizzle-orm/relations';
44
import type { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
@@ -21,7 +21,16 @@ export class PowerSyncSQLiteSession<
2121
schema: RelationalSchemaConfig<TSchema> | undefined,
2222
options: PowerSyncSQLiteSessionOptions = {}
2323
) {
24-
super(db, dialect, schema, options);
24+
super(
25+
// Top level operations use the respective locks.
26+
{
27+
useReadContext: (callback) => db.readLock(callback),
28+
useWriteContext: (callback) => db.writeLock(callback)
29+
},
30+
dialect,
31+
schema,
32+
options
33+
);
2534
this.client = db;
2635
}
2736

@@ -39,14 +48,23 @@ export class PowerSyncSQLiteSession<
3948
}
4049

4150
protected async internalTransaction<T>(
42-
connection: DBAdapter,
51+
connection: LockContext,
4352
fn: (tx: PowerSyncSQLiteTransaction<TFullSchema, TSchema>) => T,
4453
config: PowerSyncSQLiteTransactionConfig = {}
4554
): Promise<T> {
4655
const tx = new PowerSyncSQLiteTransaction<TFullSchema, TSchema>(
4756
'async',
4857
(this as any).dialect,
49-
new PowerSyncSQLiteBaseSession(connection, this.dialect, this.schema, this.options),
58+
new PowerSyncSQLiteBaseSession(
59+
{
60+
// We already have a fixed context here. We need to use it for both "read" and "write" operations.
61+
useReadContext: (callback) => callback(connection),
62+
useWriteContext: (callback) => callback(connection)
63+
},
64+
this.dialect,
65+
this.schema,
66+
this.options
67+
),
5068
this.schema
5169
);
5270

0 commit comments

Comments
 (0)