Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 46 additions & 37 deletions src/comlink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ type PendingListenersMap = Map<
string,
(value: WireValue | PromiseLike<WireValue>) => void
>;
type EndpointWithPendingListeners = {
endpoint: Endpoint;
pendingListeners: PendingListenersMap;
};

/**
* Internal transfer handler to handle thrown exceptions.
Expand Down Expand Up @@ -396,7 +400,7 @@ function closeEndPoint(endpoint: Endpoint) {
}

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
const pendingListeners : PendingListenersMap = new Map();
const pendingListeners: PendingListenersMap = new Map();

ep.addEventListener("message", function handleMessage(ev: Event) {
const { data } = ev as MessageEvent;
Expand All @@ -415,7 +419,7 @@ export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
}
});

return createProxy<T>(ep, pendingListeners, [], target) as any;
return createProxy<T>({ endpoint: ep, pendingListeners }, [], target) as any;
}

function throwIfProxyReleased(isReleased: boolean) {
Expand All @@ -424,11 +428,11 @@ function throwIfProxyReleased(isReleased: boolean) {
}
}

function releaseEndpoint(ep: Endpoint) {
return requestResponseMessage(ep, new Map(), {
function releaseEndpoint(epWithPendingListeners: EndpointWithPendingListeners) {
return requestResponseMessage(epWithPendingListeners, {
type: MessageType.RELEASE,
}).then(() => {
closeEndPoint(ep);
closeEndPoint(epWithPendingListeners.endpoint);
});
}

Expand All @@ -441,24 +445,32 @@ interface FinalizationRegistry<T> {
): void;
unregister(unregisterToken: object): void;
}
declare var FinalizationRegistry: FinalizationRegistry<Endpoint>;

const proxyCounter = new WeakMap<Endpoint, number>();
declare var FinalizationRegistry: FinalizationRegistry<EndpointWithPendingListeners>;

const proxyCounter = new WeakMap<EndpointWithPendingListeners, number>();
const proxyFinalizers =
"FinalizationRegistry" in globalThis &&
new FinalizationRegistry((ep: Endpoint) => {
const newCount = (proxyCounter.get(ep) || 0) - 1;
proxyCounter.set(ep, newCount);
if (newCount === 0) {
releaseEndpoint(ep);
new FinalizationRegistry(
(epWithPendingListeners: EndpointWithPendingListeners) => {
const newCount = (proxyCounter.get(epWithPendingListeners) || 0) - 1;
proxyCounter.set(epWithPendingListeners, newCount);
if (newCount === 0) {
releaseEndpoint(epWithPendingListeners).finally(() => {
epWithPendingListeners.pendingListeners.clear();
});
}
}
});
);

function registerProxy(proxy: object, ep: Endpoint) {
const newCount = (proxyCounter.get(ep) || 0) + 1;
proxyCounter.set(ep, newCount);
function registerProxy(
proxy: object,
epWithPendingListeners: EndpointWithPendingListeners
) {
const newCount = (proxyCounter.get(epWithPendingListeners) || 0) + 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const newCount = (proxyCounter.get(epWithPendingListeners) || 0) + 1;
const newCount = (proxyCounter.get(epWithPendingListeners) ?? 0) + 1;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaving this as is as the logical OR operator works with much older browser versions. The nullish coalescing operator was introduced much later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw - that is handled during the transpilation step of ts -> js. When you specify a specific ES version as the target it will produce the code for that ES version so you can write your ts using modern styles.

proxyCounter.set(epWithPendingListeners, newCount);
if (proxyFinalizers) {
proxyFinalizers.register(proxy, ep, proxy);
proxyFinalizers.register(proxy, epWithPendingListeners, proxy);
}
}

Expand All @@ -469,13 +481,12 @@ function unregisterProxy(proxy: object) {
}

function createProxy<T>(
ep: Endpoint,
pendingListeners: PendingListenersMap,
epWithPendingListeners: EndpointWithPendingListeners,
path: (string | number | symbol)[] = [],
target: object = function () {}
): Remote<T> {
let isProxyReleased = false;
const propProxyCache : Map<(string | symbol), Remote<unknown>> = new Map();
const propProxyCache: Map<string | symbol, Remote<unknown>> = new Map();
const proxy = new Proxy(target, {
get(_target, prop) {
throwIfProxyReleased(isProxyReleased);
Expand All @@ -486,16 +497,17 @@ function createProxy<T>(
}
propProxyCache.clear();
unregisterProxy(proxy);
releaseEndpoint(ep);
pendingListeners.clear();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue that clearing the listeners here is premature because the releaseEndpoint call needs to access the listeners to resolve? If that's the case it looks like releaseEndpoint itself returns a promise so you could do a .finally(() => pendingListeners.clear()) here instead of passing it to releaseEndpoint?

Or is it important to pass pendingListeners so it can be passed to requestResponseMessage ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it important to pass pendingListeners so it can be passed to requestResponseMessage ?

Yes, that's the important bit. If pendingListeners are not passed, the resolver function is not found and the requestResponseMessage promise will never settle.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why do we allow pendingListeners to be undefined? In what situation is that ok?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary when releasing the endpoint when the proxy is garbage collected. However, thinking about this more, we should also have access to the pendingListeners map as it is tightly coupled to the endpoint now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a commit that changes the endpoint and pendingListeners to be passed around in a single object (EndpointWithPendingListeners). There is probably a better name for that type/object...

releaseEndpoint(epWithPendingListeners).finally(() => {
epWithPendingListeners.pendingListeners.clear();
});
isProxyReleased = true;
};
}
if (prop === "then") {
if (path.length === 0) {
return { then: () => proxy };
}
const r = requestResponseMessage(ep, pendingListeners, {
const r = requestResponseMessage(epWithPendingListeners, {
type: MessageType.GET,
path: path.map((p) => p.toString()),
}).then(fromWireValue);
Expand All @@ -507,7 +519,7 @@ function createProxy<T>(
return cachedProxy;
}

const propProxy = createProxy(ep, pendingListeners, [...path, prop]);
const propProxy = createProxy(epWithPendingListeners, [...path, prop]);
propProxyCache.set(prop, propProxy);
return propProxy;
},
Expand All @@ -517,8 +529,7 @@ function createProxy<T>(
// boolean. To show good will, we return true asynchronously ¯\_(ツ)_/¯
const [value, transferables] = toWireValue(rawValue);
return requestResponseMessage(
ep,
pendingListeners,
epWithPendingListeners,
{
type: MessageType.SET,
path: [...path, prop].map((p) => p.toString()),
Expand All @@ -531,18 +542,17 @@ function createProxy<T>(
throwIfProxyReleased(isProxyReleased);
const last = path[path.length - 1];
if ((last as any) === createEndpoint) {
return requestResponseMessage(ep, pendingListeners, {
return requestResponseMessage(epWithPendingListeners, {
type: MessageType.ENDPOINT,
}).then(fromWireValue);
}
// We just pretend that `bind()` didn’t happen.
if (last === "bind") {
return createProxy(ep, pendingListeners, path.slice(0, -1));
return createProxy(epWithPendingListeners, path.slice(0, -1));
}
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
epWithPendingListeners,
{
type: MessageType.APPLY,
path: path.map((p) => p.toString()),
Expand All @@ -555,8 +565,7 @@ function createProxy<T>(
throwIfProxyReleased(isProxyReleased);
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
epWithPendingListeners,
{
type: MessageType.CONSTRUCT,
path: path.map((p) => p.toString()),
Expand All @@ -566,7 +575,7 @@ function createProxy<T>(
).then(fromWireValue);
},
});
registerProxy(proxy, ep);
registerProxy(proxy, epWithPendingListeners);
return proxy as any;
}

Expand Down Expand Up @@ -635,18 +644,18 @@ function fromWireValue(value: WireValue): any {
}

function requestResponseMessage(
ep: Endpoint,
pendingListeners: PendingListenersMap,
epWithPendingListeners: EndpointWithPendingListeners,
msg: Message,
transfers?: Transferable[]
): Promise<WireValue> {
const ep = epWithPendingListeners.endpoint;
const pendingListeners = epWithPendingListeners.pendingListeners;
return new Promise((resolve) => {
const id = Math.trunc(Math.random() * Number.MAX_SAFE_INTEGER).toString();
pendingListeners.set(id, resolve);
if (ep.start) {
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
});
});
}

12 changes: 12 additions & 0 deletions tests/node/main.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ describe("node", () => {
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);
});

it("releaseProxy closes MessagePort created by createEndpoint", async function () {
const proxy = Comlink.wrap(nodeEndpoint(this.worker));
const otherEp = await proxy[Comlink.createEndpoint]();
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);

await new Promise((resolve) => {
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
});
});
});

describe("Comlink across workers (wrapped object)", function () {
Expand Down
12 changes: 12 additions & 0 deletions tests/worker.comlink.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,16 @@ describe("Comlink across workers", function () {
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);
});

it("releaseProxy closes MessagePort created by createEndpoint", async function () {
const proxy = Comlink.wrap(this.worker);
const otherEp = await proxy[Comlink.createEndpoint]();
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);

await new Promise((resolve) => {
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
});
});
});