Skip to content

Commit 7ecc4cc

Browse files
committed
sqlite web: Export message port to database
1 parent 2a4778b commit 7ecc4cc

File tree

4 files changed

+126
-6
lines changed

4 files changed

+126
-6
lines changed

sqlite3_web/lib/src/client.dart

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ final class RemoteDatabase implements Database {
5353
}
5454
}
5555

56+
@override
57+
Future<void> get closed {
58+
return connection.closed;
59+
}
60+
5661
@override
5762
Future<void> dispose() async {
5863
_isClosed = true;
@@ -125,6 +130,16 @@ final class RemoteDatabase implements Database {
125130
final result = await select('pragma user_version;');
126131
return result.single[0] as int;
127132
}
133+
134+
@override
135+
Future<SqliteWebEndpoint> additionalConnection() async {
136+
final response = await connection.sendRequest(
137+
OpenAdditonalConnection(requestId: 0, databaseId: databaseId),
138+
MessageType.endpointResponse,
139+
);
140+
final endpoint = response.endpoint;
141+
return (endpoint.port, endpoint.lockName!);
142+
}
128143
}
129144

130145
final class WorkerConnection extends ProtocolChannel {
@@ -301,6 +316,19 @@ final class DatabaseClient implements WebSqlite {
301316
);
302317
}
303318

319+
Future<Database> connectToExisting(SqliteWebEndpoint endpoint) async {
320+
final channel = WorkerConnection(
321+
WebEndpoint(port: endpoint.$1, lockName: endpoint.$2).connect());
322+
323+
return RemoteDatabase(
324+
connection: channel,
325+
// The database id for this pre-existing connection is always zero.
326+
// It gets assigned by the worker handling the OpenAdditonalConnection
327+
// request.
328+
databaseId: 0,
329+
);
330+
}
331+
304332
@override
305333
Future<Database> connect(
306334
String name, StorageMode type, AccessMode access) async {

sqlite3_web/lib/src/database.dart

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:js_interop';
22

33
import 'package:sqlite3/wasm.dart';
4+
import 'package:web/web.dart' hide FileSystem;
45

56
import 'types.dart';
67
import 'client.dart';
@@ -29,6 +30,16 @@ abstract base class DatabaseController {
2930
ClientConnection connection, JSAny? request);
3031
}
3132

33+
/// An endpoint that can be used, by any running JavaScript context in the same
34+
/// website, to connect to an existing [Database].
35+
///
36+
/// These endpoints are created by calling [Database.additionalConnection] and
37+
/// consist of a [MessagePort] and a [String] internally identifying the
38+
/// connection. Both objects can be transferred over send ports towards another
39+
/// worker or context. That context can then use [WebSqlite.connectToPort] to
40+
/// connect to the port already opened.
41+
typedef SqliteWebEndpoint = (MessagePort, String);
42+
3243
/// Abstraction over a database either available locally or in a remote worker.
3344
abstract class Database {
3445
FileSystem get fileSystem;
@@ -39,6 +50,15 @@ abstract class Database {
3950
/// stream is active.
4051
Stream<SqliteUpdate> get updates;
4152

53+
/// A future that resolves when the database is closed.
54+
///
55+
/// Typically, databases are closed because [dispose] is called. For databases
56+
/// opened with [WebSqlite.connectToPort] however, it's possible that the
57+
/// original worker hosting the database gets closed without this [Database]
58+
/// instance being explicitly [dispose]d. In those cases, monitoring [closed]
59+
/// is useful to react to databases closing.
60+
Future<void> get closed;
61+
4262
/// Closes this database and instructs the worker to release associated
4363
/// resources.
4464
///
@@ -67,6 +87,12 @@ abstract class Database {
6787
/// Custom requests are handled by implementing `handleCustomRequest` in your
6888
/// `WorkerDatabase` subclass.
6989
Future<JSAny?> customRequest(JSAny? request);
90+
91+
/// Creates a [MessagePort] (a transferrable object that can be sent to
92+
/// another JavaScript context like a worker) that can be used with
93+
/// [WebSqlite.connectToPort] to open another instance of this database
94+
/// remotely.
95+
Future<SqliteWebEndpoint> additionalConnection();
7096
}
7197

7298
/// A connection from a client from the perspective of a worker.
@@ -157,4 +183,9 @@ abstract class WebSqlite {
157183
}) {
158184
return DatabaseClient(worker, wasmModule);
159185
}
186+
187+
static Future<Database> connectToPort(SqliteWebEndpoint endpoint) {
188+
final client = DatabaseClient(Uri.base, Uri.base);
189+
return client.connectToExisting(endpoint);
190+
}
160191
}

sqlite3_web/lib/src/protocol.dart

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ enum MessageType<T extends Message> {
2828
simpleSuccessResponse<SimpleSuccessResponse>(),
2929
rowsResponse<RowsResponse>(),
3030
errorResponse<ErrorResponse>(),
31+
endpointResponse<EndpointResponse>(),
3132
closeDatabase<CloseDatabase>(),
33+
openAdditionalConnection<OpenAdditonalConnection>(),
3234
notifyUpdate<UpdateNotification>(),
3335
;
3436

@@ -86,9 +88,12 @@ sealed class Message {
8688
MessageType.fileSystemAccess => FileSystemAccess.deserialize(object),
8789
MessageType.connect => ConnectRequest.deserialize(object),
8890
MessageType.closeDatabase => CloseDatabase.deserialize(object),
91+
MessageType.openAdditionalConnection =>
92+
OpenAdditonalConnection.deserialize(object),
8993
MessageType.updateRequest => UpdateStreamRequest.deserialize(object),
9094
MessageType.simpleSuccessResponse =>
9195
SimpleSuccessResponse.deserialize(object),
96+
MessageType.endpointResponse => EndpointResponse.deserialize(object),
9297
MessageType.rowsResponse => RowsResponse.deserialize(object),
9398
MessageType.errorResponse => ErrorResponse.deserialize(object),
9499
MessageType.notifyUpdate => UpdateNotification.deserialize(object),
@@ -428,7 +433,7 @@ final class RunQuery extends Request {
428433
}
429434
}
430435

431-
class CloseDatabase extends Request {
436+
final class CloseDatabase extends Request {
432437
CloseDatabase({required super.requestId, required super.databaseId});
433438

434439
factory CloseDatabase.deserialize(JSObject object) {
@@ -440,6 +445,23 @@ class CloseDatabase extends Request {
440445
MessageType<Message> get type => MessageType.closeDatabase;
441446
}
442447

448+
final class OpenAdditonalConnection extends Request {
449+
OpenAdditonalConnection({
450+
required super.requestId,
451+
super.databaseId,
452+
});
453+
454+
factory OpenAdditonalConnection.deserialize(JSObject object) {
455+
return OpenAdditonalConnection(
456+
requestId: object.requestId,
457+
databaseId: object.databaseId,
458+
);
459+
}
460+
461+
@override
462+
MessageType<Message> get type => MessageType.openAdditionalConnection;
463+
}
464+
443465
final class SimpleSuccessResponse extends Response {
444466
final JSAny? response;
445467

@@ -462,6 +484,29 @@ final class SimpleSuccessResponse extends Response {
462484
}
463485
}
464486

487+
final class EndpointResponse extends Response {
488+
final WebEndpoint endpoint;
489+
490+
EndpointResponse({required super.requestId, required this.endpoint});
491+
492+
factory EndpointResponse.deserialize(JSObject object) {
493+
return EndpointResponse(
494+
requestId: object.requestId,
495+
endpoint: object[_UniqueFieldNames.responseData] as WebEndpoint,
496+
);
497+
}
498+
499+
@override
500+
MessageType<Message> get type => MessageType.endpointResponse;
501+
502+
@override
503+
void serialize(JSObject object, List<JSObject> transferred) {
504+
super.serialize(object, transferred);
505+
object[_UniqueFieldNames.responseData] = endpoint;
506+
transferred.add(endpoint.port);
507+
}
508+
}
509+
465510
final class RowsResponse extends Response {
466511
final ResultSet resultSet;
467512

sqlite3_web/lib/src/worker.dart

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ final class Shared extends WorkerEnvironment {
106106
/// A database opened by a client.
107107
final class _ConnectionDatabase {
108108
final DatabaseState database;
109+
final int id;
110+
109111
StreamSubscription<SqliteUpdate>? updates;
110112

111-
_ConnectionDatabase(this.database);
113+
_ConnectionDatabase(this.database, [int? id]) : id = id ?? database.id;
112114

113115
Future<void> close() async {
114116
updates?.cancel();
@@ -173,17 +175,19 @@ final class _ClientConnection extends ProtocolChannel
173175
case OpenRequest():
174176
await _runner.loadWasmModule(request.wasmUri);
175177
DatabaseState? database;
178+
_ConnectionDatabase? connectionDatabase;
176179

177180
try {
178181
database =
179182
_runner.findDatabase(request.databaseName, request.storageMode);
180183
await database.opened;
181-
_openedDatabases.add(_ConnectionDatabase(database));
184+
connectionDatabase = _ConnectionDatabase(database);
185+
_openedDatabases.add(connectionDatabase);
182186
return SimpleSuccessResponse(
183187
response: database.id.toJS, requestId: request.requestId);
184188
} catch (e) {
185189
if (database != null) {
186-
_openedDatabases.remove(database.id);
190+
_openedDatabases.remove(connectionDatabase);
187191
await database.decrementRefCount();
188192
}
189193

@@ -220,6 +224,16 @@ final class _ClientConnection extends ProtocolChannel
220224
}
221225
return SimpleSuccessResponse(
222226
response: null, requestId: request.requestId);
227+
case OpenAdditonalConnection():
228+
final database = _databaseFor(request)!.database;
229+
database.refCount++;
230+
final (endpoint, channel) = await createChannel();
231+
232+
final client = _runner._accept(channel);
233+
client._openedDatabases.add(_ConnectionDatabase(database, 0));
234+
235+
return EndpointResponse(
236+
requestId: request.requestId, endpoint: endpoint);
223237
case CloseDatabase():
224238
_openedDatabases.remove(database!);
225239
await database.close();
@@ -247,7 +261,7 @@ final class _ClientConnection extends ProtocolChannel
247261

248262
_ConnectionDatabase? _databaseFor(Request request) {
249263
if (request.databaseId case final id?) {
250-
return _openedDatabases.firstWhere((e) => e.database.id == id);
264+
return _openedDatabases.firstWhere((e) => e.id == id);
251265
} else {
252266
return null;
253267
}
@@ -381,11 +395,13 @@ final class WorkerRunner {
381395
}
382396
}
383397

384-
void _accept(StreamChannel<Message> channel) {
398+
_ClientConnection _accept(StreamChannel<Message> channel) {
385399
final connection = _ClientConnection(
386400
runner: this, channel: channel, id: _nextConnectionId++);
387401
_connections.add(connection);
388402
connection.closed.whenComplete(() => _connections.remove(connection));
403+
404+
return connection;
389405
}
390406

391407
Future<CompatibilityResult> checkCompatibility(CompatibilityCheck check) {

0 commit comments

Comments
 (0)