diff --git a/src/master/implementation.browser.ts b/src/master/implementation.browser.ts index 87846b97..f6104905 100644 --- a/src/master/implementation.browser.ts +++ b/src/master/implementation.browser.ts @@ -1,6 +1,6 @@ // tslint:disable max-classes-per-file -import { ImplementationExport, ThreadsWorkerOptions } from "../types/master" +import { ImplementationExport, ThreadsWorkerOptions, TransferList } from "../types/master" import { getBundleURL } from "./get-bundle-url.browser" export const defaultPoolSize = typeof navigator !== "undefined" && navigator.hardwareConcurrency @@ -18,16 +18,6 @@ function createSourceBlobURL(code: string): string { } function selectWorkerImplementation(): ImplementationExport { - if (typeof Worker === "undefined") { - // Might happen on Safari, for instance - // The idea is to only fail if the constructor is actually used - return class NoWebWorker { - constructor() { - throw Error("No web worker implementation available. You might have tried to spawn a worker within a worker in a browser that doesn't support workers in workers.") - } - } as any - } - class WebWorker extends Worker { constructor(url: string | URL, options?: ThreadsWorkerOptions) { if (typeof url === "string" && options && options._baseURL) { @@ -49,6 +39,20 @@ function selectWorkerImplementation(): ImplementationExport { } } + function getWorkerImpl() { + if (typeof Worker === "undefined") { + // Might happen on Safari, for instance + // The idea is to only fail if the constructor is actually used + return class NoWebWorker { + constructor() { + throw Error("No web worker implementation available. You might have tried to spawn a worker within a worker in a browser that doesn't support workers in workers.") + } + } as any + } + + return WebWorker + } + class BlobWorker extends WebWorker { constructor(blob: Blob, options?: ThreadsWorkerOptions) { const url = window.URL.createObjectURL(blob) @@ -61,9 +65,46 @@ function selectWorkerImplementation(): ImplementationExport { } } + class SharedWebWorker extends SharedWorker { + constructor(url: string | URL, options?: ThreadsWorkerOptions) { + if (typeof url === "string" && options && options._baseURL) { + url = new URL(url, options._baseURL) + } else if (typeof url === "string" && !isAbsoluteURL(url) && getBundleURL().match(/^file:\/\//i)) { + url = new URL(url, getBundleURL().replace(/\/[^\/]+$/, "/")) + if (options?.CORSWorkaround ?? true) { + url = createSourceBlobURL(`importScripts(${JSON.stringify(url)});`) + } + } + if (typeof url === "string" && isAbsoluteURL(url)) { + // Create source code blob loading JS file via `importScripts()` + // to circumvent worker CORS restrictions + if (options?.CORSWorkaround ?? true) { + url = createSourceBlobURL(`importScripts(${JSON.stringify(url)});`) + } + } + super(url.toString(), options) + this.port.start() + } + } + + function getSharedWorkerImpl() { + if (typeof SharedWorker === "undefined") { + // Might happen on Safari, for instance + // The idea is to only fail if the constructor is actually used + return class NoSharedWebWorker { + constructor() { + throw Error("No shared web worker implementation available. Maybe your browser doesn't support shared web workers.") + } + } as any + } + + return SharedWebWorker + } + return { blob: BlobWorker, - default: WebWorker + default: getWorkerImpl(), + shared: getSharedWorkerImpl() } } @@ -78,5 +119,5 @@ export function getWorkerImplementation(): ImplementationExport { export function isWorkerRuntime() { const isWindowContext = typeof self !== "undefined" && typeof Window !== "undefined" && self instanceof Window - return typeof self !== "undefined" && self.postMessage && !isWindowContext ? true : false + return typeof self !== "undefined" && typeof self.postMessage === "function" && !isWindowContext ? true : false } diff --git a/src/master/implementation.node.ts b/src/master/implementation.node.ts index bf5a6ddb..8a13d7db 100644 --- a/src/master/implementation.node.ts +++ b/src/master/implementation.node.ts @@ -79,6 +79,12 @@ function rebaseScriptPath(scriptPath: string, ignoreRegex: RegExp) { return rebasedScriptPath } +class NoSharedWebWorker { + constructor() { + throw Error("Shared workers are not supported in node.") + } +} + function resolveScriptPath(scriptPath: string, baseURL?: string | undefined) { const makeRelative = (filePath: string) => { // eval() hack is also webpack-related @@ -164,7 +170,8 @@ function initWorkerThreadsWorker(): ImplementationExport { return { blob: BlobWorker as any, - default: Worker as any + default: Worker as any, + shared: NoSharedWebWorker as any } } @@ -245,7 +252,8 @@ function initTinyWorker(): ImplementationExport { return { blob: BlobWorker as any, - default: Worker as any + default: Worker as any, + shared: NoSharedWebWorker as any } } @@ -273,7 +281,7 @@ export function getWorkerImplementation(): ImplementationExport { export function isWorkerRuntime() { if (isTinyWorker) { - return typeof self !== "undefined" && self.postMessage ? true : false + return typeof self !== "undefined" && typeof self.postMessage === "function" ? true : false } else { // Webpack hack const isMainThread = typeof __non_webpack_require__ === "function" diff --git a/src/master/implementation.ts b/src/master/implementation.ts index b6839478..45933433 100644 --- a/src/master/implementation.ts +++ b/src/master/implementation.ts @@ -9,7 +9,7 @@ import * as BrowserImplementation from "./implementation.browser" import * as NodeImplementation from "./implementation.node" -const runningInNode = typeof process !== 'undefined' && process.arch !== 'browser' && 'pid' in process +const runningInNode = typeof process !== 'undefined' && 'pid' in process const implementation = runningInNode ? NodeImplementation : BrowserImplementation /** Default size of pools. Depending on the platform the value might vary from device to device. */ diff --git a/src/master/index.ts b/src/master/index.ts index ed1b2da1..afe47ee3 100644 --- a/src/master/index.ts +++ b/src/master/index.ts @@ -1,6 +1,6 @@ // tslint:disable no-duplicate-imports import type { BlobWorker as BlobWorkerClass } from "../types/master" -import { Worker as WorkerType } from "../types/master" +import { Worker as WorkerType, SharedWorker as SharedWorkerType } from "../types/master" import { getWorkerImplementation, isWorkerRuntime } from "./implementation" export { FunctionThread, ModuleThread } from "../types/master" @@ -11,9 +11,13 @@ export { isWorkerRuntime } export type BlobWorker = typeof BlobWorkerClass export type Worker = WorkerType +export type SharedWorker = SharedWorkerType /** Separate class to spawn workers from source code blobs or strings. */ export const BlobWorker = getWorkerImplementation().blob /** Worker implementation. Either web worker or a node.js Worker class. */ export const Worker = getWorkerImplementation().default + +/** SharedWorker implementation. Can only be a shared web worker class. */ +export const SharedWorker = getWorkerImplementation().shared \ No newline at end of file diff --git a/src/master/invocation-proxy.ts b/src/master/invocation-proxy.ts index 231eb1d2..66429341 100644 --- a/src/master/invocation-proxy.ts +++ b/src/master/invocation-proxy.ts @@ -14,7 +14,8 @@ import { ModuleMethods, ModuleProxy, ProxyableFunction, - Worker as WorkerType + Worker as WorkerType, + SharedWorker as SharedWorkerType } from "../types/master" import { MasterJobCancelMessage, @@ -26,6 +27,7 @@ import { WorkerMessageType } from "../types/messages" +type TWorker = WorkerType | SharedWorkerType const debugMessages = DebugLogger("threads:master:messages") let nextJobUID = 1 @@ -36,7 +38,7 @@ const isJobErrorMessage = (data: any): data is WorkerJobErrorMessage => data && const isJobResultMessage = (data: any): data is WorkerJobResultMessage => data && data.type === WorkerMessageType.result const isJobStartMessage = (data: any): data is WorkerJobStartMessage => data && data.type === WorkerMessageType.running -function createObservableForJob(worker: WorkerType, jobUID: number): Observable { +function createObservableForJob(worker: WorkerType | SharedWorkerType, jobUID: number): Observable { return new Observable(observer => { let asyncType: "observable" | "promise" | undefined @@ -81,7 +83,8 @@ function createObservableForJob(worker: WorkerType, jobUID: number): type: MasterMessageType.cancel, uid: jobUID } - worker.postMessage(cancelMessage) + if('port' in worker) worker.port.postMessage(cancelMessage); + else worker.postMessage(cancelMessage) } worker.removeEventListener("message", messageHandler) } @@ -115,7 +118,7 @@ function prepareArguments(rawArgs: any[]): { args: any[], transferables: Transfe } } -export function createProxyFunction(worker: WorkerType, method?: string) { +export function createProxyFunction(worker: TWorker, method?: string) { return ((...rawArgs: Args) => { const uid = nextJobUID++ const { args, transferables } = prepareArguments(rawArgs) @@ -129,7 +132,8 @@ export function createProxyFunction(worker: Work debugMessages("Sending command to run function to worker:", runMessage) try { - worker.postMessage(runMessage, transferables) + if('port' in worker) worker.port.postMessage(runMessage, transferables) + else worker.postMessage(runMessage, transferables) } catch (error) { return ObservablePromise.from(Promise.reject(error)) } @@ -139,7 +143,7 @@ export function createProxyFunction(worker: Work } export function createProxyModule( - worker: WorkerType, + worker: TWorker, methodNames: string[] ): ModuleProxy { const proxy: any = {} diff --git a/src/master/register.ts b/src/master/register.ts index 2f37034b..c30eafa9 100644 --- a/src/master/register.ts +++ b/src/master/register.ts @@ -1,9 +1,11 @@ -import { Worker as WorkerImplementation } from "./index" +import { Worker as WorkerImplementation, SharedWorker as SharedWorkerImplementation } from "./index" declare const window: any if (typeof global !== "undefined") { - (global as any).Worker = WorkerImplementation + (global as any).Worker = WorkerImplementation; + (global as any).SharedWorker = SharedWorkerImplementation; } else if (typeof window !== "undefined") { - (window as any).Worker = WorkerImplementation + (window as any).Worker = WorkerImplementation; + (window as any).SharedWorker = SharedWorkerImplementation; } diff --git a/src/master/spawn.ts b/src/master/spawn.ts index 485cf5d1..d1441c10 100644 --- a/src/master/spawn.ts +++ b/src/master/spawn.ts @@ -9,6 +9,7 @@ import { PrivateThreadProps, StripAsync, Worker as WorkerType, + SharedWorker as SharedWorkerType, WorkerEvent, WorkerEventType, WorkerInternalErrorEvent, @@ -19,6 +20,7 @@ import { WorkerInitMessage, WorkerUncaughtErrorMessage } from "../types/messages import { WorkerFunction, WorkerModule } from "../types/worker" import { createProxyFunction, createProxyModule } from "./invocation-proxy" +type TWorker = WorkerType | SharedWorkerType type ArbitraryWorkerInterface = WorkerFunction & WorkerModule & { somekeythatisneverusedinproductioncode123: "magicmarker123" } type ArbitraryThreadType = FunctionThread & ModuleThread @@ -58,7 +60,7 @@ async function withTimeout(promise: Promise, timeoutInMs: number, errorMes return result } -function receiveInitMessage(worker: WorkerType): Promise { +function receiveInitMessage(worker: TWorker): Promise { return new Promise((resolve, reject) => { const messageHandler = ((event: MessageEvent) => { debugMessages("Message from worker before finishing initialization:", event.data) @@ -74,7 +76,7 @@ function receiveInitMessage(worker: WorkerType): Promise { }) } -function createEventObservable(worker: WorkerType, workerTermination: Promise): Observable { +function createEventObservable(worker: TWorker, workerTermination: Promise): Observable { return new Observable(observer => { const messageHandler = ((messageEvent: MessageEvent) => { const workerEvent: WorkerMessageEvent = { @@ -106,23 +108,27 @@ function createEventObservable(worker: WorkerType, workerTermination: Promise, terminate: () => Promise } { +function createTerminator(worker: TWorker): { termination: Promise, terminate: () => Promise } { const [termination, resolver] = createPromiseWithResolver() const terminate = async () => { debugThreadUtils("Terminating worker") // Newer versions of worker_threads workers return a promise - await worker.terminate() + if('port' in worker) { + // TODO: send termination message to shared worker. + worker.port.close() + } + else await worker.terminate() resolver() } return { terminate, termination } } -function setPrivateThreadProps(raw: T, worker: WorkerType, workerEvents: Observable, terminate: () => Promise): T & PrivateThreadProps { +function setPrivateThreadProps(raw: T, worker: TWorker, workerEvents: Observable, terminate: () => Promise): T & PrivateThreadProps { const workerErrors = workerEvents .filter(event => event.type === WorkerEventType.internalError) .map(errorEvent => (errorEvent as WorkerInternalErrorEvent).error) - // tslint:disable-next-line prefer-object-spread + // tslint:disable-next-line:prefer-object-spread return Object.assign(raw, { [$errors]: workerErrors, [$events]: workerEvents, @@ -141,7 +147,7 @@ function setPrivateThreadProps(raw: T, worker: WorkerType, workerEvents: Obse * @param [options.timeout] Init message timeout. Default: 10000 or set by environment variable. */ export async function spawn = ArbitraryWorkerInterface>( - worker: WorkerType, + worker: TWorker, options?: { timeout?: number } ): Promise> { debugSpawn("Initializing new thread") diff --git a/src/observable-promise.ts b/src/observable-promise.ts index f4cd5f24..05cd5a07 100644 --- a/src/observable-promise.ts +++ b/src/observable-promise.ts @@ -44,7 +44,9 @@ export class ObservablePromise extends Observable implements Promise { private rejection: Error | undefined private state: "fulfilled" | "pending" | "rejected" = "pending" - public readonly [Symbol.toStringTag]: "[object ObservablePromise]" + get [Symbol.toStringTag]() { + return "[object ObservablePromise]"; + } constructor(init: Initializer) { super((originalObserver: SubscriptionObserver) => { diff --git a/src/types/master.ts b/src/types/master.ts index 604e7fa9..b2c9bde4 100644 --- a/src/types/master.ts +++ b/src/types/master.ts @@ -51,7 +51,7 @@ export interface PrivateThreadProps { [$errors]: Observable [$events]: Observable [$terminate]: () => Promise - [$worker]: Worker + [$worker]: Worker | SharedWorker } export type FunctionThread = ProxyableFunction & PrivateThreadProps @@ -79,6 +79,12 @@ export interface Worker extends EventTarget { /** In nodejs 10+ return type is Promise while with tiny-worker and in browser return type is void */ terminate(callback?: (error?: Error, exitCode?: number) => void): void | Promise } + +/** SharedWorker instance. Either a web worker or a node.js Worker provided by `worker_threads` or `tiny-worker`. */ +export interface SharedWorker extends EventTarget { + port: MessagePort +} + export interface ThreadsWorkerOptions extends WorkerOptions { /** Prefix for the path passed to the Worker constructor. Web worker only. */ _baseURL?: string @@ -111,9 +117,15 @@ export declare class BlobWorker extends WorkerImplementation { public static fromText(source: string, options?: ThreadsWorkerOptions): WorkerImplementation } +export declare class SharedWorkerImplementation extends EventTarget implements SharedWorker { + constructor(path: string, options?: ThreadsWorkerOptions) + port: MessagePort +} + export interface ImplementationExport { blob: typeof BlobWorker default: typeof WorkerImplementation + shared: typeof SharedWorkerImplementation } /** Event as emitted by worker thread. Subscribe to using `Thread.events(thread)`. */ diff --git a/src/types/worker.ts b/src/types/worker.ts index 3e1790e4..6b050853 100644 --- a/src/types/worker.ts +++ b/src/types/worker.ts @@ -2,8 +2,8 @@ type UnsubscribeFn = () => void export interface AbstractedWorkerAPI { isWorkerRuntime(): boolean - postMessageToMaster(message: any, transferList?: Transferable[]): void - subscribeToMasterMessages(onMessage: (data: any) => void): UnsubscribeFn + postMessageToMaster(context: any, message: any, transferList?: Transferable[]): void + subscribeToMasterMessages(context: any, onMessage: (context: any, data: any) => void): UnsubscribeFn } export type WorkerFunction = ((...args: any[]) => any) | (() => any) diff --git a/src/worker/implementation.browser.ts b/src/worker/implementation.browser.ts index 1988ad9c..57781981 100644 --- a/src/worker/implementation.browser.ts +++ b/src/worker/implementation.browser.ts @@ -13,21 +13,21 @@ declare const self: WorkerGlobalScope const isWorkerRuntime: AbstractedWorkerAPI["isWorkerRuntime"] = function isWorkerRuntime() { const isWindowContext = typeof self !== "undefined" && typeof Window !== "undefined" && self instanceof Window - return typeof self !== "undefined" && self.postMessage && !isWindowContext ? true : false + return typeof self !== "undefined" && typeof self.postMessage === "function" && !isWindowContext ? true : false } -const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(data, transferList?) { - self.postMessage(data, transferList) +const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(context, data, transferList?) { + context.postMessage(data, transferList) } -const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) { +const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(context, onMessage) { const messageHandler = (messageEvent: MessageEvent) => { - onMessage(messageEvent.data) + onMessage(context, messageEvent.data) } const unsubscribe = () => { - self.removeEventListener("message", messageHandler as EventListener) + context.removeEventListener("message", messageHandler as EventListener) } - self.addEventListener("message", messageHandler as EventListener) + context.addEventListener("message", messageHandler as EventListener) return unsubscribe } diff --git a/src/worker/implementation.tiny-worker.ts b/src/worker/implementation.tiny-worker.ts index 97c18b11..949f0d2c 100644 --- a/src/worker/implementation.tiny-worker.ts +++ b/src/worker/implementation.tiny-worker.ts @@ -16,23 +16,23 @@ if (typeof self === "undefined") { } const isWorkerRuntime: AbstractedWorkerAPI["isWorkerRuntime"] = function isWorkerRuntime() { - return typeof self !== "undefined" && self.postMessage ? true : false + return typeof self !== "undefined" && typeof self.postMessage === "function" ? true : false } -const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(data) { +const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(context, data) { // TODO: Warn that Transferables are not supported on first attempt to use feature - self.postMessage(data) + context.postMessage(data) } let muxingHandlerSetUp = false -const messageHandlers = new Set<(data: any) => void>() +const messageHandlers = new Set<(context: any, data: any) => void>() -const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) { +const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(context, onMessage) { if (!muxingHandlerSetUp) { // We have one multiplexing message handler as tiny-worker's // addEventListener() only allows you to set a single message handler self.addEventListener("message", ((event: MessageEvent) => { - messageHandlers.forEach(handler => handler(event.data)) + messageHandlers.forEach(handler => handler(context, event.data)) }) as EventListener) muxingHandlerSetUp = true } diff --git a/src/worker/implementation.worker_threads.ts b/src/worker/implementation.worker_threads.ts index 8bf79510..5251e838 100644 --- a/src/worker/implementation.worker_threads.ts +++ b/src/worker/implementation.worker_threads.ts @@ -14,18 +14,18 @@ const isWorkerRuntime: AbstractedWorkerAPI["isWorkerRuntime"] = function isWorke return !WorkerThreads().isMainThread } -const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(data, transferList) { +const postMessageToMaster: AbstractedWorkerAPI["postMessageToMaster"] = function postMessageToMaster(context, data, transferList) { assertMessagePort(WorkerThreads().parentPort).postMessage(data, transferList as any) } -const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(onMessage) { +const subscribeToMasterMessages: AbstractedWorkerAPI["subscribeToMasterMessages"] = function subscribeToMasterMessages(context, onMessage) { const parentPort = WorkerThreads().parentPort if (!parentPort) { throw Error("Invariant violation: MessagePort to parent is not available.") } const messageHandler = (message: any) => { - onMessage(message) + onMessage(context, message) } const unsubscribe = () => { assertMessagePort(parentPort).off("message", messageHandler) diff --git a/src/worker/index.ts b/src/worker/index.ts index 79bbf0be..1973bdf4 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -46,17 +46,17 @@ function deconstructTransfer(thing: any) { : { payload: thing, transferables: undefined } } -function postFunctionInitMessage() { +function postFunctionInitMessage(context: any) { const initMessage: WorkerInitMessage = { type: WorkerMessageType.init, exposed: { type: "function" } } - Implementation.postMessageToMaster(initMessage) + Implementation.postMessageToMaster(context, initMessage) } -function postModuleInitMessage(methodNames: string[]) { +function postModuleInitMessage(context: any, methodNames: string[]) { const initMessage: WorkerInitMessage = { type: WorkerMessageType.init, exposed: { @@ -64,20 +64,20 @@ function postModuleInitMessage(methodNames: string[]) { methods: methodNames } } - Implementation.postMessageToMaster(initMessage) + Implementation.postMessageToMaster(context, initMessage) } -function postJobErrorMessage(uid: number, rawError: Error | TransferDescriptor) { +function postJobErrorMessage(context: any, uid: number, rawError: Error | TransferDescriptor) { const { payload: error, transferables } = deconstructTransfer(rawError) const errorMessage: WorkerJobErrorMessage = { type: WorkerMessageType.error, uid, error: serialize(error) as any as SerializedError } - Implementation.postMessageToMaster(errorMessage, transferables) + Implementation.postMessageToMaster(context, errorMessage, transferables) } -function postJobResultMessage(uid: number, completed: boolean, resultValue?: any) { +function postJobResultMessage(context: any, uid: number, completed: boolean, resultValue?: any) { const { payload, transferables } = deconstructTransfer(resultValue) const resultMessage: WorkerJobResultMessage = { type: WorkerMessageType.result, @@ -85,25 +85,25 @@ function postJobResultMessage(uid: number, completed: boolean, resultValue?: any complete: completed ? true : undefined, payload } - Implementation.postMessageToMaster(resultMessage, transferables) + Implementation.postMessageToMaster(context, resultMessage, transferables) } -function postJobStartMessage(uid: number, resultType: WorkerJobStartMessage["resultType"]) { +function postJobStartMessage(context: any, uid: number, resultType: WorkerJobStartMessage["resultType"]) { const startMessage: WorkerJobStartMessage = { type: WorkerMessageType.running, uid, resultType } - Implementation.postMessageToMaster(startMessage) + Implementation.postMessageToMaster(context, startMessage) } -function postUncaughtErrorMessage(error: Error) { +function postUncaughtErrorMessage(context: any, error: Error) { try { const errorMessage: WorkerUncaughtErrorMessage = { type: WorkerMessageType.uncaughtError, error: serialize(error) as any as SerializedError } - Implementation.postMessageToMaster(errorMessage) + Implementation.postMessageToMaster(context, errorMessage) } catch (subError) { // tslint:disable-next-line no-console console.error( @@ -115,27 +115,27 @@ function postUncaughtErrorMessage(error: Error) { } } -async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) { +async function runFunction(context: any, jobUID: number, fn: WorkerFunction, args: any[]) { let syncResult: any try { syncResult = fn(...args) } catch (error) { - return postJobErrorMessage(jobUID, error) + return postJobErrorMessage(context, jobUID, error) } const resultType = isObservable(syncResult) ? "observable" : "promise" - postJobStartMessage(jobUID, resultType) + postJobStartMessage(context, jobUID, resultType) if (isObservable(syncResult)) { const subscription = syncResult.subscribe( - value => postJobResultMessage(jobUID, false, serialize(value)), + value => postJobResultMessage(context, jobUID, false, serialize(value)), error => { - postJobErrorMessage(jobUID, serialize(error) as any) + postJobErrorMessage(context, jobUID, serialize(error) as any) activeSubscriptions.delete(jobUID) }, () => { - postJobResultMessage(jobUID, true) + postJobResultMessage(context, jobUID, true) activeSubscriptions.delete(jobUID) } ) @@ -143,9 +143,9 @@ async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) { } else { try { const result = await syncResult - postJobResultMessage(jobUID, true, serialize(result)) + postJobResultMessage(context, jobUID, true, serialize(result)) } catch (error) { - postJobErrorMessage(jobUID, serialize(error) as any) + postJobErrorMessage(context, jobUID, serialize(error) as any) } } } @@ -166,49 +166,68 @@ export function expose(exposed: WorkerFunction | WorkerModule) { } exposeCalled = true - if (typeof exposed === "function") { - Implementation.subscribeToMasterMessages(messageData => { - if (isMasterJobRunMessage(messageData) && !messageData.method) { - runFunction(messageData.uid, exposed, messageData.args.map(deserialize)) - } - }) - postFunctionInitMessage() - } else if (typeof exposed === "object" && exposed) { - Implementation.subscribeToMasterMessages(messageData => { - if (isMasterJobRunMessage(messageData) && messageData.method) { - runFunction(messageData.uid, exposed[messageData.method], messageData.args.map(deserialize)) + const innerExpose = (workerContext: any) => { + if (typeof exposed === "function") { + Implementation.subscribeToMasterMessages(workerContext, (context, messageData) => { + if (isMasterJobRunMessage(messageData) && !messageData.method) { + runFunction(context, messageData.uid, exposed, messageData.args.map(deserialize)) + } + }) + postFunctionInitMessage(workerContext) + } else if (typeof exposed === "object" && exposed) { + Implementation.subscribeToMasterMessages(workerContext, (context, messageData) => { + if (isMasterJobRunMessage(messageData) && messageData.method) { + runFunction(context, messageData.uid, exposed[messageData.method], messageData.args.map(deserialize)) + } + }) + + const methodNames = Object.keys(exposed).filter(key => typeof exposed[key] === "function") + postModuleInitMessage(workerContext, methodNames) + } else { + throw Error(`Invalid argument passed to expose(). Expected a function or an object, got: ${exposed}`) + } + + Implementation.subscribeToMasterMessages(workerContext, (context, messageData) => { + if (isMasterJobCancelMessage(messageData)) { + const jobUID = messageData.uid + const subscription = activeSubscriptions.get(jobUID) + + if (subscription) { + subscription.unsubscribe() + activeSubscriptions.delete(jobUID) + } } }) - - const methodNames = Object.keys(exposed).filter(key => typeof exposed[key] === "function") - postModuleInitMessage(methodNames) - } else { - throw Error(`Invalid argument passed to expose(). Expected a function or an object, got: ${exposed}`) } - Implementation.subscribeToMasterMessages(messageData => { - if (isMasterJobCancelMessage(messageData)) { - const jobUID = messageData.uid - const subscription = activeSubscriptions.get(jobUID) + // If it's a SharedWorker + // TODO: make a better check, make sure it works properly, etc + if (typeof (self as any).onconnect === "function") { + (self as any).onconnect = (e: any) => { + const port = e.ports[0] + port.start() - if (subscription) { - subscription.unsubscribe() - activeSubscriptions.delete(jobUID) - } + innerExpose(port) + + // TODO: it somehow needs to handle the port closing, but apparently that isn't so simple... + // see spawn.ts: + // TODO: send termination message to shared worker. } - }) + } + else innerExpose(self) } +// TODO: will this work with SharedWorker? if (typeof self !== "undefined" && typeof self.addEventListener === "function" && Implementation.isWorkerRuntime()) { self.addEventListener("error", event => { // Post with some delay, so the master had some time to subscribe to messages - setTimeout(() => postUncaughtErrorMessage(event.error || event), 250) + setTimeout(() => postUncaughtErrorMessage(self, event.error || event), 250) }) self.addEventListener("unhandledrejection", event => { const error = (event as any).reason if (error && typeof (error as any).message === "string") { // Post with some delay, so the master had some time to subscribe to messages - setTimeout(() => postUncaughtErrorMessage(error), 250) + setTimeout(() => postUncaughtErrorMessage(self, error), 250) } }) } @@ -216,12 +235,12 @@ if (typeof self !== "undefined" && typeof self.addEventListener === "function" & if (typeof process !== "undefined" && typeof process.on === "function" && Implementation.isWorkerRuntime()) { process.on("uncaughtException", (error) => { // Post with some delay, so the master had some time to subscribe to messages - setTimeout(() => postUncaughtErrorMessage(error), 250) + setTimeout(() => postUncaughtErrorMessage(self, error), 250) }) process.on("unhandledRejection", (error) => { if (error && typeof (error as any).message === "string") { // Post with some delay, so the master had some time to subscribe to messages - setTimeout(() => postUncaughtErrorMessage(error as any), 250) + setTimeout(() => postUncaughtErrorMessage(self, error as any), 250) } }) } diff --git a/test/spawn.chromium.mocha.ts b/test/spawn.chromium.mocha.ts index 6a5cb350..16842d6b 100644 --- a/test/spawn.chromium.mocha.ts +++ b/test/spawn.chromium.mocha.ts @@ -43,4 +43,10 @@ describe("threads in browser", function() { expect(await increment()).to.equal(3) await Thread.terminate(increment) }) + + it("can spawn and terminate a shared webworker", async function() { + const helloWorld = await spawn<() => string>(new SharedWorker("./workers/hello-world.js")) + expect(await helloWorld()).to.equal("Hello World") + await Thread.terminate(helloWorld) + }) })