From 44c03ba436829eeba48484ff35b23af7fbfce14b Mon Sep 17 00:00:00 2001 From: Joe Cellucci Date: Fri, 29 Aug 2025 09:35:59 -0500 Subject: [PATCH 1/3] add fix for exposing multiple workers in the same context --- src/comlink.ts | 197 +++++++++++++++++++++++++++++-------------------- 1 file changed, 115 insertions(+), 82 deletions(-) diff --git a/src/comlink.ts b/src/comlink.ts index 27c13694..68ef5e5f 100644 --- a/src/comlink.ts +++ b/src/comlink.ts @@ -298,94 +298,127 @@ function isAllowedOrigin( return false; } +// Track exposed objects per endpoint to prevent multiple event listeners +const exposedObjectsMap = new WeakMap>(); +const endpointListeners = new WeakMap void>(); + export function expose( obj: any, ep: Endpoint = globalThis as any, allowedOrigins: (string | RegExp)[] = ["*"] ) { - ep.addEventListener("message", function callback(ev: MessageEvent) { - if (!ev || !ev.data) { - return; - } - if (!isAllowedOrigin(allowedOrigins, ev.origin)) { - console.warn(`Invalid origin '${ev.origin}' for comlink proxy`); - return; - } - const { id, type, path } = { - path: [] as string[], - ...(ev.data as Message), - }; - const argumentList = (ev.data.argumentList || []).map(fromWireValue); - let returnValue; - try { - const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj); - const rawValue = path.reduce((obj, prop) => obj[prop], obj); - switch (type) { - case MessageType.GET: - { - returnValue = rawValue; - } - break; - case MessageType.SET: - { - parent[path.slice(-1)[0]] = fromWireValue(ev.data.value); - returnValue = true; - } - break; - case MessageType.APPLY: - { - returnValue = rawValue.apply(parent, argumentList); - } - break; - case MessageType.CONSTRUCT: - { - const value = new rawValue(...argumentList); - returnValue = proxy(value); - } - break; - case MessageType.ENDPOINT: - { - const { port1, port2 } = new MessageChannel(); - expose(obj, port2); - returnValue = transfer(port1, [port1]); - } - break; - case MessageType.RELEASE: - { - returnValue = undefined; - } - break; - default: - return; + // Get or create array of exposed objects for this endpoint + let exposedObjects = exposedObjectsMap.get(ep); + if (!exposedObjects) { + exposedObjects = new Set(); + exposedObjectsMap.set(ep, exposedObjects); + } + + // Add this object to the array (latest object takes precedence) + exposedObjects.add(obj); + + // Only add the event listener if this is the first exposed object on this endpoint + if (!endpointListeners.has(ep)) { + const callback = function (ev: MessageEvent) { + if (!ev || !ev.data) { + return; } - } catch (value) { - returnValue = { value, [throwMarker]: 0 }; - } - Promise.resolve(returnValue) - .catch((value) => { - return { value, [throwMarker]: 0 }; - }) - .then((returnValue) => { - const [wireValue, transferables] = toWireValue(returnValue); - ep.postMessage({ ...wireValue, id }, transferables); - if (type === MessageType.RELEASE) { - // detach and deactive after sending release response above. - ep.removeEventListener("message", callback as any); - closeEndPoint(ep); - if (finalizer in obj && typeof obj[finalizer] === "function") { - obj[finalizer](); - } + if (!isAllowedOrigin(allowedOrigins, ev.origin)) { + console.warn(`Invalid origin '${ev.origin}' for comlink proxy`); + return; + } + const { id, type, path } = { + path: [] as string[], + ...(ev.data as Message), + }; + const argumentList = (ev.data.argumentList || []).map(fromWireValue); + let returnValue; + + // Get the most recently exposed object (maintains backward compatibility) + const currentExposedObjects = exposedObjectsMap.get(ep); + const targetObj = + currentExposedObjects && currentExposedObjects.size > 0 + ? Array.from(currentExposedObjects)[currentExposedObjects.size - 1] + : obj; + + try { + const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], targetObj); + const rawValue = path.reduce((obj, prop) => obj[prop], targetObj); + switch (type) { + case MessageType.GET: + { + returnValue = rawValue; + } + break; + case MessageType.SET: + { + parent[path.slice(-1)[0]] = fromWireValue(ev.data.value); + returnValue = true; + } + break; + case MessageType.APPLY: + { + returnValue = rawValue.apply(parent, argumentList); + } + break; + case MessageType.CONSTRUCT: + { + const value = new rawValue(...argumentList); + returnValue = proxy(value); + } + break; + case MessageType.ENDPOINT: + { + const { port1, port2 } = new MessageChannel(); + expose(targetObj, port2); + returnValue = transfer(port1, [port1]); + } + break; + case MessageType.RELEASE: + { + returnValue = undefined; + } + break; + default: + return; } - }) - .catch((error) => { - // Send Serialization Error To Caller - const [wireValue, transferables] = toWireValue({ - value: new TypeError("Unserializable return value"), - [throwMarker]: 0, + } catch (value) { + returnValue = { value, [throwMarker]: 0 }; + } + Promise.resolve(returnValue) + .catch((value) => { + return { value, [throwMarker]: 0 }; + }) + .then((returnValue) => { + const [wireValue, transferables] = toWireValue(returnValue); + ep.postMessage({ ...wireValue, id }, transferables); + if (type === MessageType.RELEASE) { + // detach and deactivate after sending release response above. + ep.removeEventListener("message", callback as any); + endpointListeners.delete(ep); + exposedObjectsMap.delete(ep); + closeEndPoint(ep); + if (finalizer in targetObj && typeof targetObj[finalizer] === "function") { + targetObj[finalizer](); + } + } + }) + .catch((error) => { + // Send Serialization Error To Caller + const [wireValue, transferables] = toWireValue({ + value: new TypeError("Unserializable return value"), + [throwMarker]: 0, + }); + ep.postMessage({ ...wireValue, id }, transferables); }); - ep.postMessage({ ...wireValue, id }, transferables); - }); - } as any); + }; + + ep.addEventListener("message", callback as any); + endpointListeners.set(ep, callback); + } else { + console.log("No callback found for endpoint"); + } + if (ep.start) { ep.start(); } @@ -400,7 +433,7 @@ function closeEndPoint(endpoint: Endpoint) { } export function wrap(ep: Endpoint, target?: any): Remote { - const pendingListeners : PendingListenersMap = new Map(); + const pendingListeners: PendingListenersMap = new Map(); ep.addEventListener("message", function handleMessage(ev: Event) { const { data } = ev as MessageEvent; @@ -643,7 +676,7 @@ function requestResponseMessage( ep.start(); } ep.postMessage({ id, ...msg }, transfers); -}); + }); } function generateUUID(): string { From 9ab0338c854bd7ff1ad2da0b6a8395dca57164d2 Mon Sep 17 00:00:00 2001 From: Joseph Cellucci Date: Fri, 29 Aug 2025 09:35:23 -0500 Subject: [PATCH 2/3] add tests --- tests/fixtures/multiple-expose-worker.js | 35 +++++ tests/fixtures/multiple-worker1.js | 30 +++++ tests/fixtures/multiple-worker2.js | 30 +++++ tests/multiple-workers.comlink.test.js | 164 +++++++++++++++++++++++ 4 files changed, 259 insertions(+) create mode 100644 tests/fixtures/multiple-expose-worker.js create mode 100644 tests/fixtures/multiple-worker1.js create mode 100644 tests/fixtures/multiple-worker2.js create mode 100644 tests/multiple-workers.comlink.test.js diff --git a/tests/fixtures/multiple-expose-worker.js b/tests/fixtures/multiple-expose-worker.js new file mode 100644 index 00000000..af851912 --- /dev/null +++ b/tests/fixtures/multiple-expose-worker.js @@ -0,0 +1,35 @@ +/** + * Test worker that exposes multiple objects to test the "last exposed takes precedence" behavior + */ + +importScripts("/base/dist/umd/comlink.js"); + +const object1 = { + type: "object1", + value: 100, + + getType() { + return this.type; + }, + + getValue() { + return this.value; + }, +}; + +const object2 = { + type: "object2", + value: 200, + + getType() { + return this.type; + }, + + getValue() { + return this.value; + }, +}; + +// Expose both objects - the second one should take precedence +Comlink.expose(object1); +Comlink.expose(object2); diff --git a/tests/fixtures/multiple-worker1.js b/tests/fixtures/multiple-worker1.js new file mode 100644 index 00000000..43cb8a66 --- /dev/null +++ b/tests/fixtures/multiple-worker1.js @@ -0,0 +1,30 @@ +/** + * Test worker 1 - has its own state and identity + */ + +importScripts("/base/dist/umd/comlink.js"); + +const worker1Object = { + workerId: "worker1", + counter: 0, + + getWorkerId() { + return this.workerId; + }, + + getCounter() { + return this.counter; + }, + + incrementCounter() { + this.counter++; + return this.counter; + }, + + setCounter(value) { + this.counter = value; + return this.counter; + }, +}; + +Comlink.expose(worker1Object); diff --git a/tests/fixtures/multiple-worker2.js b/tests/fixtures/multiple-worker2.js new file mode 100644 index 00000000..9d6473bc --- /dev/null +++ b/tests/fixtures/multiple-worker2.js @@ -0,0 +1,30 @@ +/** + * Test worker 2 - has its own state and identity + */ + +importScripts("/base/dist/umd/comlink.js"); + +const worker2Object = { + workerId: "worker2", + counter: 0, + + getWorkerId() { + return this.workerId; + }, + + getCounter() { + return this.counter; + }, + + incrementCounter() { + this.counter++; + return this.counter; + }, + + setCounter(value) { + this.counter = value; + return this.counter; + }, +}; + +Comlink.expose(worker2Object); diff --git a/tests/multiple-workers.comlink.test.js b/tests/multiple-workers.comlink.test.js new file mode 100644 index 00000000..0f08190e --- /dev/null +++ b/tests/multiple-workers.comlink.test.js @@ -0,0 +1,164 @@ +/** + * Copyright 2017 Google Inc. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Comlink from "/base/dist/esm/comlink.mjs"; + +describe("Comlink multiple workers isolation", function () { + let workers = []; + + afterEach(function () { + // Clean up all workers + workers.forEach(worker => worker.terminate()); + workers = []; + }); + + it("ensures messages are not received by wrong workers", async function () { + // Create two separate workers with different exposed objects + const worker1 = new Worker("/base/tests/fixtures/multiple-worker1.js"); + const worker2 = new Worker("/base/tests/fixtures/multiple-worker2.js"); + workers.push(worker1, worker2); + + // Wait a bit for workers to initialize + await new Promise(resolve => setTimeout(resolve, 100)); + + // Wrap both workers + const proxy1 = Comlink.wrap(worker1); + const proxy2 = Comlink.wrap(worker2); + + // Test that each worker responds correctly to its own methods + expect(await proxy1.getWorkerId()).to.equal("worker1"); + expect(await proxy2.getWorkerId()).to.equal("worker2"); + + // Test that calling methods on one worker doesn't affect the other + await proxy1.incrementCounter(); + await proxy1.incrementCounter(); + expect(await proxy1.getCounter()).to.equal(2); + expect(await proxy2.getCounter()).to.equal(0); // Worker2's counter should be unaffected + + await proxy2.incrementCounter(); + expect(await proxy1.getCounter()).to.equal(2); // Worker1's counter should be unaffected + expect(await proxy2.getCounter()).to.equal(1); + }); + + it("handles multiple exposed objects on same endpoint correctly", async function () { + const worker = new Worker("/base/tests/fixtures/multiple-expose-worker.js"); + workers.push(worker); + + // Wait for worker to initialize + await new Promise(resolve => setTimeout(resolve, 100)); + + const proxy = Comlink.wrap(worker); + + // The worker exposes multiple objects, but only the last one should be active + // (based on the implementation where last exposed object takes precedence) + expect(await proxy.getType()).to.equal("object2"); + expect(await proxy.getValue()).to.equal(200); + }); + + it("prevents cross-worker message pollution", async function () { + // Create multiple workers + const worker1 = new Worker("/base/tests/fixtures/multiple-worker1.js"); + const worker2 = new Worker("/base/tests/fixtures/multiple-worker2.js"); + const worker3 = new Worker("/base/tests/fixtures/multiple-worker1.js"); // Same type as worker1 + workers.push(worker1, worker2, worker3); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const proxy1 = Comlink.wrap(worker1); + const proxy2 = Comlink.wrap(worker2); + const proxy3 = Comlink.wrap(worker3); + + // Verify each worker has correct identity + expect(await proxy1.getWorkerId()).to.equal("worker1"); + expect(await proxy2.getWorkerId()).to.equal("worker2"); + expect(await proxy3.getWorkerId()).to.equal("worker1"); + + // Perform operations that could potentially cross-contaminate if message isolation fails + await Promise.all([ + proxy1.incrementCounter(), + proxy2.incrementCounter(), + proxy3.incrementCounter(), + ]); + + await Promise.all([ + proxy1.incrementCounter(), + proxy2.incrementCounter(), + ]); + + // Verify each worker maintained its own state + expect(await proxy1.getCounter()).to.equal(2); + expect(await proxy2.getCounter()).to.equal(2); + expect(await proxy3.getCounter()).to.equal(1); + }); + + it("handles rapid concurrent operations without message mixing", async function () { + const worker1 = new Worker("/base/tests/fixtures/multiple-worker1.js"); + const worker2 = new Worker("/base/tests/fixtures/multiple-worker2.js"); + workers.push(worker1, worker2); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const proxy1 = Comlink.wrap(worker1); + const proxy2 = Comlink.wrap(worker2); + + // Perform many rapid operations that could cause message mixing if not properly isolated + const operations = []; + for (let i = 0; i < 50; i++) { + operations.push(proxy1.incrementCounter()); + operations.push(proxy2.incrementCounter()); + } + + await Promise.all(operations); + + // Each worker should have incremented exactly 50 times + expect(await proxy1.getCounter()).to.equal(50); + expect(await proxy2.getCounter()).to.equal(50); + }); + + it("maintains correct endpoint listeners after worker communication", async function () { + const worker1 = new Worker("/base/tests/fixtures/multiple-worker1.js"); + const worker2 = new Worker("/base/tests/fixtures/multiple-worker2.js"); + workers.push(worker1, worker2); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const proxy1 = Comlink.wrap(worker1); + const proxy2 = Comlink.wrap(worker2); + + // Create new endpoints to test endpoint isolation + const endpoint1 = await proxy1[Comlink.createEndpoint](); + const endpoint2 = await proxy2[Comlink.createEndpoint](); + + const endpointProxy1 = Comlink.wrap(endpoint1); + const endpointProxy2 = Comlink.wrap(endpoint2); + + // Test that endpoints maintain proper isolation + expect(await endpointProxy1.getWorkerId()).to.equal("worker1"); + expect(await endpointProxy2.getWorkerId()).to.equal("worker2"); + + // Test operations through endpoints + await endpointProxy1.incrementCounter(); + await endpointProxy2.incrementCounter(); + + expect(await endpointProxy1.getCounter()).to.equal(1); + expect(await endpointProxy2.getCounter()).to.equal(1); + + // Verify original proxies still work correctly + expect(await proxy1.getCounter()).to.equal(1); // Same state as endpoint + expect(await proxy2.getCounter()).to.equal(1); // Same state as endpoint + + // Clean up endpoints + endpointProxy1[Comlink.releaseProxy](); + endpointProxy2[Comlink.releaseProxy](); + }); +}); From 603e0ff6c10058b357d8856606d6870fb8c5c677 Mon Sep 17 00:00:00 2001 From: Joseph Cellucci Date: Fri, 29 Aug 2025 09:52:47 -0500 Subject: [PATCH 3/3] reverse control and fix log message --- src/comlink.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/comlink.ts b/src/comlink.ts index 68ef5e5f..05a8087e 100644 --- a/src/comlink.ts +++ b/src/comlink.ts @@ -318,7 +318,9 @@ export function expose( exposedObjects.add(obj); // Only add the event listener if this is the first exposed object on this endpoint - if (!endpointListeners.has(ep)) { + if (endpointListeners.has(ep)) { + console.warn("Endpoint has already been added for this object"); + } else { const callback = function (ev: MessageEvent) { if (!ev || !ev.data) { return; @@ -415,8 +417,6 @@ export function expose( ep.addEventListener("message", callback as any); endpointListeners.set(ep, callback); - } else { - console.log("No callback found for endpoint"); } if (ep.start) {