Skip to content

Commit f40ecf9

Browse files
feat: Add Proxy support for @powersync/node (#573)
Co-authored-by: stevensJourney <[email protected]>
1 parent e71dc94 commit f40ecf9

File tree

10 files changed

+159
-89
lines changed

10 files changed

+159
-89
lines changed

.changeset/hip-lamps-draw.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/node': minor
3+
---
4+
5+
Introduced support for specifying proxy environment variables for the connection methods. For HTTP it supports `HTTP_PROXY` or `HTTPS_PROXY`, and for WebSockets it supports `WS_PROXY` and `WSS_PROXY`.

.changeset/thick-lies-invent.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Added `fetchOptions` to AbstractRemoteOptions. Allows consumers to include fields such as `dispatcher` (e.g. for proxy support) to the fetch invocations.
6+
Also ensuring all options provided to `connect()` are passed onwards, allows packages to have their own option definitions for `connect()` and the abstract `generateSyncStreamImplementation()`.

demos/example-node/src/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import repl_factory from 'node:repl';
21
import { once } from 'node:events';
2+
import repl_factory from 'node:repl';
33

44
import { PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
55
import { default as Logger } from 'js-logger';
6-
import { AppSchema, DemoConnector } from './powersync.js';
76
import { exit } from 'node:process';
7+
import { AppSchema, DemoConnector } from './powersync.js';
88

99
const main = async () => {
1010
const logger = Logger.get('PowerSyncDemo');

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
417417
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
418418
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
419419
return {
420+
...options,
420421
retryDelayMs:
421422
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
422423
crudUploadThrottleMs:
@@ -436,12 +437,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
436437
throw new Error('Cannot connect using a closed client');
437438
}
438439

439-
const { retryDelayMs, crudUploadThrottleMs } = this.resolvedConnectionOptions(options);
440+
const resolvedConnectOptions = this.resolvedConnectionOptions(options);
440441

441-
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
442-
retryDelayMs,
443-
crudUploadThrottleMs
444-
});
442+
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
445443
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
446444
statusChanged: (status) => {
447445
this.currentStatus = new SyncStatus({
@@ -555,7 +553,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
555553
* This method does include transaction ids in the result, but does not group
556554
* data by transaction. One batch may contain data from multiple transactions,
557555
* and a single transaction may be split over multiple batches.
558-
*
556+
*
559557
* @param limit Maximum number of CRUD entries to include in the batch
560558
* @returns A batch of CRUD operations to upload, or null if there are none
561559
*/
@@ -594,7 +592,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
594592
*
595593
* Unlike {@link getCrudBatch}, this only returns data from a single transaction at a time.
596594
* All data for the transaction is loaded into memory.
597-
*
595+
*
598596
* @returns A transaction of CRUD operations to upload, or null if there are none
599597
*/
600598
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
@@ -633,7 +631,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
633631
* Get an unique client id for this database.
634632
*
635633
* The id is not reset when the database is cleared, only when the database is deleted.
636-
*
634+
*
637635
* @returns A unique identifier for the database instance
638636
*/
639637
async getClientId(): Promise<string> {
@@ -661,7 +659,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
661659
/**
662660
* Execute a SQL write (INSERT/UPDATE/DELETE) query
663661
* and optionally return results.
664-
*
662+
*
665663
* @param sql The SQL query to execute
666664
* @param parameters Optional array of parameters to bind to the query
667665
* @returns The query result as an object with structured key-value pairs
@@ -674,7 +672,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
674672
/**
675673
* Execute a SQL write (INSERT/UPDATE/DELETE) query directly on the database without any PowerSync processing.
676674
* This bypasses certain PowerSync abstractions and is useful for accessing the raw database results.
677-
*
675+
*
678676
* @param sql The SQL query to execute
679677
* @param parameters Optional array of parameters to bind to the query
680678
* @returns The raw query result from the underlying database as a nested array of raw values, where each row is
@@ -689,7 +687,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
689687
* Execute a write query (INSERT/UPDATE/DELETE) multiple times with each parameter set
690688
* and optionally return results.
691689
* This is faster than executing separately with each parameter set.
692-
*
690+
*
693691
* @param sql The SQL query to execute
694692
* @param parameters Optional 2D array of parameter sets, where each inner array is a set of parameters for one execution
695693
* @returns The query result
@@ -701,7 +699,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
701699

702700
/**
703701
* Execute a read-only query and return results.
704-
*
702+
*
705703
* @param sql The SQL query to execute
706704
* @param parameters Optional array of parameters to bind to the query
707705
* @returns An array of results
@@ -713,7 +711,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
713711

714712
/**
715713
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
716-
*
714+
*
717715
* @param sql The SQL query to execute
718716
* @param parameters Optional array of parameters to bind to the query
719717
* @returns The first result if found, or null if no results are returned
@@ -725,7 +723,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
725723

726724
/**
727725
* Execute a read-only query and return the first result, error if the ResultSet is empty.
728-
*
726+
*
729727
* @param sql The SQL query to execute
730728
* @param parameters Optional array of parameters to bind to the query
731729
* @returns The first result matching the query
@@ -761,7 +759,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
761759
* Open a read-only transaction.
762760
* Read transactions can run concurrently to a write transaction.
763761
* Changes from any write transaction are not visible to read transactions started before it.
764-
*
762+
*
765763
* @param callback Function to execute within the transaction
766764
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
767765
* @returns The result of the callback
@@ -786,7 +784,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
786784
* Open a read-write transaction.
787785
* This takes a global lock - only one write transaction can execute against the database at a time.
788786
* Statements within the transaction must be done on the provided {@link Transaction} interface.
789-
*
787+
*
790788
* @param callback Function to execute within the transaction
791789
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
792790
* @returns The result of the callback
@@ -865,7 +863,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
865863
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
866864
*
867865
* Note that the `onChange` callback member of the handler is required.
868-
*
866+
*
869867
* @param sql The SQL query to execute
870868
* @param parameters Optional array of parameters to bind to the query
871869
* @param handler Callbacks for handling results and errors
@@ -915,7 +913,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
915913
* Execute a read query every time the source tables are modified.
916914
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
917915
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
918-
*
916+
*
919917
* @param sql The SQL query to execute
920918
* @param parameters Optional array of parameters to bind to the query
921919
* @param options Options for configuring watch behavior
@@ -944,7 +942,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
944942
* Resolves the list of tables that are used in a SQL query.
945943
* If tables are specified in the options, those are used directly.
946944
* Otherwise, analyzes the query using EXPLAIN to determine which tables are accessed.
947-
*
945+
*
948946
* @param sql The SQL query to analyze
949947
* @param parameters Optional parameters for the SQL query
950948
* @param options Optional watch options that may contain explicit table list
@@ -1077,7 +1075,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
10771075
*
10781076
* This is preferred over {@link watchWithAsyncGenerator} when multiple queries need to be performed
10791077
* together when data is changed.
1080-
*
1078+
*
10811079
* Note: do not declare this as `async *onChange` as it will not work in React Native.
10821080
*
10831081
* @param options Options for configuring watch behavior

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,23 @@ export type AbstractRemoteOptions = {
8585
* Binding should be done before passing here.
8686
*/
8787
fetchImplementation: FetchImplementation | FetchImplementationProvider;
88+
89+
/**
90+
* Optional options to pass directly to all `fetch` calls.
91+
*
92+
* This can include fields such as `dispatcher` (e.g. for proxy support),
93+
* `cache`, or any other fetch-compatible options.
94+
*/
95+
fetchOptions?: {};
8896
};
8997

9098
export const DEFAULT_REMOTE_OPTIONS: AbstractRemoteOptions = {
9199
socketUrlTransformer: (url) =>
92100
url.replace(/^https?:\/\//, function (match) {
93101
return match === 'https://' ? 'wss://' : 'ws://';
94102
}),
95-
fetchImplementation: new FetchImplementationProvider()
103+
fetchImplementation: new FetchImplementationProvider(),
104+
fetchOptions: {}
96105
};
97106

98107
export abstract class AbstractRemote {
@@ -231,6 +240,10 @@ export abstract class AbstractRemote {
231240
*/
232241
abstract getBSON(): Promise<BSONImplementation>;
233242

243+
protected createSocket(url: string): WebSocket {
244+
return new WebSocket(url);
245+
}
246+
234247
/**
235248
* Connects to the sync/stream websocket endpoint
236249
*/
@@ -249,7 +262,8 @@ export abstract class AbstractRemote {
249262

250263
const connector = new RSocketConnector({
251264
transport: new WebsocketClientTransport({
252-
url: this.options.socketUrlTransformer(request.url)
265+
url: this.options.socketUrlTransformer(request.url),
266+
wsCreator: (url) => this.createSocket(url)
253267
}),
254268
setup: {
255269
keepAlive: KEEP_ALIVE_MS,
@@ -421,6 +435,7 @@ export abstract class AbstractRemote {
421435
body: JSON.stringify(data),
422436
signal: controller.signal,
423437
cache: 'no-store',
438+
...(this.options.fetchOptions ?? {}),
424439
...options.fetchOptions
425440
}).catch((ex) => {
426441
if (ex.name == 'AbortError') {

packages/node/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,22 @@ contains everything you need to know to get started implementing PowerSync in yo
5656

5757
A simple example using `@powersync/node` is available in the [`demos/example-node/`](../demos/example-node) directory.
5858

59+
# Proxy Support
60+
61+
This SDK supports HTTP, HTTPS, and WebSocket proxies via environment variables.
62+
63+
## HTTP Connection Method
64+
65+
Internally we probe the http environment variables and apply it to fetch requests ([undici](https://www.npmjs.com/package/undici/v/5.6.0))
66+
67+
- Set the `HTTPS_PROXY` or `HTTP_PROXY` environment variable to automatically route HTTP requests through a proxy.
68+
69+
## WEB Socket Connection Method
70+
71+
Internally the [proxy-agent](https://www.npmjs.com/package/proxy-agent) dependency for WebSocket proxies, which has its own internal code for automatically picking up the appropriate environment variables:
72+
73+
- Set the `WS_PROXY` or `WSS_PROXY` environment variable to route the webocket connections through a proxy.
74+
5975
# Found a bug or need help?
6076

6177
- Join our [Discord server](https://discord.gg/powersync) where you can browse topics from our community, ask questions, share feedback, or just say hello :)

packages/node/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@
5151
"@powersync/common": "workspace:*",
5252
"async-lock": "^1.4.0",
5353
"bson": "^6.6.0",
54-
"comlink": "^4.4.2"
54+
"comlink": "^4.4.2",
55+
"proxy-agent": "^6.5.0",
56+
"undici": "^7.8.0",
57+
"ws": "^8.18.1"
5558
},
5659
"devDependencies": {
60+
"@powersync/drizzle-driver": "workspace:*",
5761
"@types/async-lock": "^1.4.0",
5862
"drizzle-orm": "^0.35.2",
59-
"@powersync/drizzle-driver": "workspace:*",
6063
"rollup": "4.14.3",
6164
"typescript": "^5.5.3",
6265
"vitest": "^3.0.5"

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import {
22
AbstractPowerSyncDatabase,
33
AbstractStreamingSyncImplementation,
4+
AdditionalConnectionOptions,
45
BucketStorageAdapter,
56
DBAdapter,
67
PowerSyncBackendConnector,
8+
PowerSyncConnectionOptions,
79
PowerSyncDatabaseOptions,
810
PowerSyncDatabaseOptionsWithSettings,
11+
RequiredAdditionalConnectionOptions,
912
SqliteBucketStorage,
1013
SQLOpenFactory
1114
} from '@powersync/common';
@@ -15,11 +18,22 @@ import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyn
1518

1619
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
1720
import { NodeSQLOpenOptions } from './options.js';
21+
import { Dispatcher } from 'undici';
1822

1923
export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
2024
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
2125
};
2226

27+
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
28+
/**
29+
* Optional custom dispatcher for HTTP connections (e.g. using undici).
30+
* Only used when the connection method is SyncStreamConnectionMethod.HTTP
31+
*/
32+
dispatcher?: Dispatcher;
33+
};
34+
35+
export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;
36+
2337
/**
2438
* A PowerSync database which provides SQLite functionality
2539
* which is automatically synced.
@@ -54,10 +68,18 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
5468
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
5569
}
5670

71+
connect(
72+
connector: PowerSyncBackendConnector,
73+
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
74+
): Promise<void> {
75+
return super.connect(connector, options);
76+
}
77+
5778
protected generateSyncStreamImplementation(
58-
connector: PowerSyncBackendConnector
79+
connector: PowerSyncBackendConnector,
80+
options: NodeAdditionalConnectionOptions
5981
): AbstractStreamingSyncImplementation {
60-
const remote = new NodeRemote(connector);
82+
const remote = new NodeRemote(connector, this.options.logger, { dispatcher: options.dispatcher });
6183

6284
return new NodeStreamingSyncImplementation({
6385
adapter: this.bucketStorageAdapter,

packages/node/src/sync/stream/NodeRemote.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import {
1212
RemoteConnector
1313
} from '@powersync/common';
1414
import { BSON } from 'bson';
15+
import Agent from 'proxy-agent';
16+
import { EnvHttpProxyAgent, Dispatcher } from 'undici';
17+
import { WebSocket } from 'ws';
1518

1619
export const STREAMING_POST_TIMEOUT_MS = 30_000;
1720

@@ -21,18 +24,38 @@ class NodeFetchProvider extends FetchImplementationProvider {
2124
}
2225
}
2326

27+
export type NodeRemoteOptions = AbstractRemoteOptions & {
28+
dispatcher?: Dispatcher;
29+
};
30+
2431
export class NodeRemote extends AbstractRemote {
2532
constructor(
2633
protected connector: RemoteConnector,
2734
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
28-
options?: Partial<AbstractRemoteOptions>
35+
options?: Partial<NodeRemoteOptions>
2936
) {
37+
// EnvHttpProxyAgent automatically uses relevant env vars for HTTP
38+
const dispatcher = options?.dispatcher ?? new EnvHttpProxyAgent();
39+
3040
super(connector, logger, {
3141
...(options ?? {}),
32-
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider()
42+
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
43+
fetchOptions: {
44+
dispatcher
45+
}
3346
});
3447
}
3548

49+
protected createSocket(url: string): globalThis.WebSocket {
50+
return new WebSocket(url, {
51+
// Automatically uses relevant env vars for web sockets
52+
agent: new Agent.ProxyAgent(),
53+
headers: {
54+
'User-Agent': this.getUserAgent()
55+
}
56+
}) as any as globalThis.WebSocket; // This is compatible in Node environments
57+
}
58+
3659
getUserAgent(): string {
3760
return [
3861
super.getUserAgent(),

0 commit comments

Comments
 (0)