Skip to content

Commit 9009590

Browse files
authored
Unsubscribe from worker observables when master unsubscribes (#263)
* Proxy `.unsubscribe()` calls to worker observables * Fix tiny-worker message handling bug
1 parent 944bc8c commit 9009590

File tree

4 files changed

+61
-12
lines changed

4 files changed

+61
-12
lines changed

src/master/invocation-proxy.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
Worker as WorkerType
1818
} from "../types/master"
1919
import {
20+
MasterJobCancelMessage,
2021
MasterJobRunMessage,
2122
MasterMessageType,
2223
WorkerJobErrorMessage,
@@ -71,8 +72,19 @@ function createObservableForJob<ResultType>(worker: WorkerType, jobUID: number):
7172
worker.removeEventListener("message", messageHandler)
7273
}
7374
}) as EventListener
75+
7476
worker.addEventListener("message", messageHandler)
75-
return () => worker.removeEventListener("message", messageHandler)
77+
78+
return () => {
79+
if (asyncType === "observable" || !asyncType) {
80+
const cancelMessage: MasterJobCancelMessage = {
81+
type: MasterMessageType.cancel,
82+
uid: jobUID
83+
}
84+
worker.postMessage(cancelMessage)
85+
}
86+
worker.removeEventListener("message", messageHandler)
87+
}
7688
})
7789
}
7890

src/types/messages.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@ export interface SerializedError {
99
// Messages sent by master:
1010

1111
export enum MasterMessageType {
12+
cancel = "cancel",
1213
run = "run"
1314
}
1415

16+
export type MasterJobCancelMessage = {
17+
type: MasterMessageType.cancel,
18+
uid: number
19+
}
20+
1521
export type MasterJobRunMessage = {
1622
type: MasterMessageType.run,
1723
uid: number,
1824
method?: string,
1925
args: any[]
2026
}
2127

22-
export type MasterSentMessage = MasterJobRunMessage
28+
export type MasterSentMessage = MasterJobCancelMessage | MasterJobRunMessage
2329

2430
////////////////////////////
2531
// Messages sent by worker:

src/worker/implementation.tiny-worker.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,22 @@ const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function
2424
self.postMessage(data)
2525
}
2626

27+
let muxingHandlerSetUp = false
28+
const messageHandlers = new Set<(data: any) => void>()
29+
2730
const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) {
28-
const messageHandler = (messageEvent: MessageEvent) => {
29-
onMessage(messageEvent.data)
30-
}
31-
const unsubscribe = () => {
32-
self.removeEventListener("message", messageHandler as EventListener)
31+
if (!muxingHandlerSetUp) {
32+
// We have one multiplexing message handler as tiny-worker's
33+
// addEventListener() only allows you to set a single message handler
34+
self.addEventListener("message", ((event: MessageEvent) => {
35+
messageHandlers.forEach(handler => handler(event.data))
36+
}) as EventListener)
37+
muxingHandlerSetUp = true
3338
}
34-
self.addEventListener("message", messageHandler as EventListener)
39+
40+
messageHandlers.add(onMessage)
41+
42+
const unsubscribe = () => messageHandlers.delete(onMessage)
3543
return unsubscribe
3644
}
3745

src/worker/index.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import isSomeObservable from "is-observable"
2-
import { Observable } from "observable-fns"
2+
import { Observable, Subscription } from "observable-fns"
33
import { deserialize, serialize } from "../common"
44
import { isTransferDescriptor, TransferDescriptor } from "../transferable"
55
import {
6+
MasterJobCancelMessage,
67
MasterJobRunMessage,
78
MasterMessageType,
89
SerializedError,
@@ -24,6 +25,9 @@ export const isWorkerRuntime = Implementation.isWorkerRuntime
2425

2526
let exposeCalled = false
2627

28+
const activeSubscriptions = new Map<number, Subscription<any>>()
29+
30+
const isMasterJobCancelMessage = (thing: any): thing is MasterJobCancelMessage => thing && thing.type === MasterMessageType.cancel
2731
const isMasterJobRunMessage = (thing: any): thing is MasterJobRunMessage => thing && thing.type === MasterMessageType.run
2832

2933
/**
@@ -124,11 +128,18 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) {
124128
postJobStartMessage(jobUID, resultType)
125129

126130
if (isObservable(syncResult)) {
127-
syncResult.subscribe(
131+
const subscription = syncResult.subscribe(
128132
value => postJobResultMessage(jobUID, false, serialize(value)),
129-
error => postJobErrorMessage(jobUID, serialize(error) as any),
130-
() => postJobResultMessage(jobUID, true)
133+
error => {
134+
postJobErrorMessage(jobUID, serialize(error) as any)
135+
activeSubscriptions.delete(jobUID)
136+
},
137+
() => {
138+
postJobResultMessage(jobUID, true)
139+
activeSubscriptions.delete(jobUID)
140+
}
131141
)
142+
activeSubscriptions.set(jobUID, subscription)
132143
} else {
133144
try {
134145
const result = await syncResult
@@ -174,6 +185,18 @@ export function expose(exposed: WorkerFunction | WorkerModule<any>) {
174185
} else {
175186
throw Error(`Invalid argument passed to expose(). Expected a function or an object, got: ${exposed}`)
176187
}
188+
189+
Implementation.subscribeToMasterMessages(messageData => {
190+
if (isMasterJobCancelMessage(messageData)) {
191+
const jobUID = messageData.uid
192+
const subscription = activeSubscriptions.get(jobUID)
193+
194+
if (subscription) {
195+
subscription.unsubscribe()
196+
activeSubscriptions.delete(jobUID)
197+
}
198+
}
199+
})
177200
}
178201

179202
if (typeof self !== "undefined" && typeof self.addEventListener === "function" && Implementation.isWorkerRuntime()) {

0 commit comments

Comments
 (0)