Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 115 additions & 82 deletions src/comlink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,94 +298,127 @@ function isAllowedOrigin(
return false;
}

// Track exposed objects per endpoint to prevent multiple event listeners
const exposedObjectsMap = new WeakMap<Endpoint, Set<any>>();
const endpointListeners = new WeakMap<Endpoint, (ev: MessageEvent) => void>();

export function expose(
obj: any,
ep: Endpoint = globalThis as any,
allowedOrigins: (string | RegExp)[] = ["*"]
) {
ep.addEventListener("message", function callback(ev: MessageEvent) {
if (!ev || !ev.data) {
return;
}
if (!isAllowedOrigin(allowedOrigins, ev.origin)) {
console.warn(`Invalid origin '${ev.origin}' for comlink proxy`);
return;
}
const { id, type, path } = {
path: [] as string[],
...(ev.data as Message),
};
const argumentList = (ev.data.argumentList || []).map(fromWireValue);
let returnValue;
try {
const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
const rawValue = path.reduce((obj, prop) => obj[prop], obj);
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parent[path.slice(-1)[0]] = fromWireValue(ev.data.value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parent, argumentList);
}
break;
case MessageType.CONSTRUCT:
{
const value = new rawValue(...argumentList);
returnValue = proxy(value);
}
break;
case MessageType.ENDPOINT:
{
const { port1, port2 } = new MessageChannel();
expose(obj, port2);
returnValue = transfer(port1, [port1]);
}
break;
case MessageType.RELEASE:
{
returnValue = undefined;
}
break;
default:
return;
// Get or create array of exposed objects for this endpoint
let exposedObjects = exposedObjectsMap.get(ep);
if (!exposedObjects) {
exposedObjects = new Set<any>();
exposedObjectsMap.set(ep, exposedObjects);
}

// Add this object to the array (latest object takes precedence)
exposedObjects.add(obj);

// Only add the event listener if this is the first exposed object on this endpoint
if (endpointListeners.has(ep)) {
console.warn("Endpoint has already been added for this object");
} else {
const callback = function (ev: MessageEvent) {
if (!ev || !ev.data) {
return;
}
} catch (value) {
returnValue = { value, [throwMarker]: 0 };
}
Promise.resolve(returnValue)
.catch((value) => {
return { value, [throwMarker]: 0 };
})
.then((returnValue) => {
const [wireValue, transferables] = toWireValue(returnValue);
ep.postMessage({ ...wireValue, id }, transferables);
if (type === MessageType.RELEASE) {
// detach and deactive after sending release response above.
ep.removeEventListener("message", callback as any);
closeEndPoint(ep);
if (finalizer in obj && typeof obj[finalizer] === "function") {
obj[finalizer]();
}
if (!isAllowedOrigin(allowedOrigins, ev.origin)) {
console.warn(`Invalid origin '${ev.origin}' for comlink proxy`);
return;
}
const { id, type, path } = {
path: [] as string[],
...(ev.data as Message),
};
const argumentList = (ev.data.argumentList || []).map(fromWireValue);
let returnValue;

// Get the most recently exposed object (maintains backward compatibility)
const currentExposedObjects = exposedObjectsMap.get(ep);
const targetObj =
currentExposedObjects && currentExposedObjects.size > 0
? Array.from(currentExposedObjects)[currentExposedObjects.size - 1]
: obj;

try {
const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], targetObj);
const rawValue = path.reduce((obj, prop) => obj[prop], targetObj);
switch (type) {
case MessageType.GET:
{
returnValue = rawValue;
}
break;
case MessageType.SET:
{
parent[path.slice(-1)[0]] = fromWireValue(ev.data.value);
returnValue = true;
}
break;
case MessageType.APPLY:
{
returnValue = rawValue.apply(parent, argumentList);
}
break;
case MessageType.CONSTRUCT:
{
const value = new rawValue(...argumentList);
returnValue = proxy(value);
}
break;
case MessageType.ENDPOINT:
{
const { port1, port2 } = new MessageChannel();
expose(targetObj, port2);
returnValue = transfer(port1, [port1]);
}
break;
case MessageType.RELEASE:
{
returnValue = undefined;
}
break;
default:
return;
}
})
.catch((error) => {
// Send Serialization Error To Caller
const [wireValue, transferables] = toWireValue({
value: new TypeError("Unserializable return value"),
[throwMarker]: 0,
} catch (value) {
returnValue = { value, [throwMarker]: 0 };
}
Promise.resolve(returnValue)
.catch((value) => {
return { value, [throwMarker]: 0 };
})
.then((returnValue) => {
const [wireValue, transferables] = toWireValue(returnValue);
ep.postMessage({ ...wireValue, id }, transferables);
if (type === MessageType.RELEASE) {
// detach and deactivate after sending release response above.
ep.removeEventListener("message", callback as any);
endpointListeners.delete(ep);
exposedObjectsMap.delete(ep);
closeEndPoint(ep);
if (finalizer in targetObj && typeof targetObj[finalizer] === "function") {
targetObj[finalizer]();
}
}
})
.catch((error) => {
// Send Serialization Error To Caller
const [wireValue, transferables] = toWireValue({
value: new TypeError("Unserializable return value"),
[throwMarker]: 0,
});
ep.postMessage({ ...wireValue, id }, transferables);
});
ep.postMessage({ ...wireValue, id }, transferables);
});
} as any);
};

ep.addEventListener("message", callback as any);
endpointListeners.set(ep, callback);
}

if (ep.start) {
ep.start();
}
Expand All @@ -400,7 +433,7 @@ function closeEndPoint(endpoint: Endpoint) {
}

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
const pendingListeners : PendingListenersMap = new Map();
const pendingListeners: PendingListenersMap = new Map();

ep.addEventListener("message", function handleMessage(ev: Event) {
const { data } = ev as MessageEvent;
Expand Down Expand Up @@ -643,7 +676,7 @@ function requestResponseMessage(
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
});
});
}

function generateUUID(): string {
Expand Down
35 changes: 35 additions & 0 deletions tests/fixtures/multiple-expose-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Test worker that exposes multiple objects to test the "last exposed takes precedence" behavior
*/

importScripts("/base/dist/umd/comlink.js");

const object1 = {
type: "object1",
value: 100,

getType() {
return this.type;
},

getValue() {
return this.value;
},
};

const object2 = {
type: "object2",
value: 200,

getType() {
return this.type;
},

getValue() {
return this.value;
},
};

// Expose both objects - the second one should take precedence
Comlink.expose(object1);
Comlink.expose(object2);
30 changes: 30 additions & 0 deletions tests/fixtures/multiple-worker1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Test worker 1 - has its own state and identity
*/

importScripts("/base/dist/umd/comlink.js");

const worker1Object = {
workerId: "worker1",
counter: 0,

getWorkerId() {
return this.workerId;
},

getCounter() {
return this.counter;
},

incrementCounter() {
this.counter++;
return this.counter;
},

setCounter(value) {
this.counter = value;
return this.counter;
},
};

Comlink.expose(worker1Object);
30 changes: 30 additions & 0 deletions tests/fixtures/multiple-worker2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Test worker 2 - has its own state and identity
*/

importScripts("/base/dist/umd/comlink.js");

const worker2Object = {
workerId: "worker2",
counter: 0,

getWorkerId() {
return this.workerId;
},

getCounter() {
return this.counter;
},

incrementCounter() {
this.counter++;
return this.counter;
},

setCounter(value) {
this.counter = value;
return this.counter;
},
};

Comlink.expose(worker2Object);
Loading