Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/comlink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ function closeEndPoint(endpoint: Endpoint) {
}

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
const pendingListeners : PendingListenersMap = new Map();
const pendingListeners: PendingListenersMap = new Map();

ep.addEventListener("message", function handleMessage(ev: Event) {
const { data } = ev as MessageEvent;
Expand Down Expand Up @@ -424,11 +424,12 @@ function throwIfProxyReleased(isReleased: boolean) {
}
}

function releaseEndpoint(ep: Endpoint) {
return requestResponseMessage(ep, new Map(), {
function releaseEndpoint(ep: Endpoint, pendingListeners?: PendingListenersMap) {
return requestResponseMessage(ep, pendingListeners || new Map(), {
type: MessageType.RELEASE,
}).then(() => {
closeEndPoint(ep);
pendingListeners?.clear();
});
}

Expand Down Expand Up @@ -486,8 +487,7 @@ function createProxy<T>(
}
propProxyCache.clear();
unregisterProxy(proxy);
releaseEndpoint(ep);
pendingListeners.clear();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue that clearing the listeners here is premature because the releaseEndpoint call needs to access the listeners to resolve? If that's the case it looks like releaseEndpoint itself returns a promise so you could do a .finally(() => pendingListeners.clear()) here instead of passing it to releaseEndpoint?

Or is it important to pass pendingListeners so it can be passed to requestResponseMessage ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it important to pass pendingListeners so it can be passed to requestResponseMessage ?

Yes, that's the important bit. If pendingListeners are not passed, the resolver function is not found and the requestResponseMessage promise will never settle.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why do we allow pendingListeners to be undefined? In what situation is that ok?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary when releasing the endpoint when the proxy is garbage collected. However, thinking about this more, we should also have access to the pendingListeners map as it is tightly coupled to the endpoint now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a commit that changes the endpoint and pendingListeners to be passed around in a single object (EndpointWithPendingListeners). There is probably a better name for that type/object...

releaseEndpoint(ep, pendingListeners);
isProxyReleased = true;
};
}
Expand Down Expand Up @@ -647,6 +647,6 @@ function requestResponseMessage(
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
});
});
}

12 changes: 12 additions & 0 deletions tests/node/main.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ describe("node", () => {
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);
});

it("releaseProxy closes MessagePort created by createEndpoint", async function () {
const proxy = Comlink.wrap(nodeEndpoint(this.worker));
const otherEp = await proxy[Comlink.createEndpoint]();
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);

await new Promise((resolve) => {
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
});
});
});

describe("Comlink across workers (wrapped object)", function () {
Expand Down
12 changes: 12 additions & 0 deletions tests/worker.comlink.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,16 @@ describe("Comlink across workers", function () {
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);
});

it("releaseProxy closes MessagePort created by createEndpoint", async function () {
const proxy = Comlink.wrap(this.worker);
const otherEp = await proxy[Comlink.createEndpoint]();
const otherProxy = Comlink.wrap(otherEp);
expect(await otherProxy(20, 1)).to.equal(21);

await new Promise((resolve) => {
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
});
});
});