From 14ecef049308f77e5edafee2f1c315a55fafe717 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Mon, 17 Jun 2019 14:31:38 +0200 Subject: [PATCH 01/11] Implement parametric slot feature Introducing the concept of parametric slot: - A new "param" argument on `trigger` and `.on` allows us to send data through a slot for a specific parameter with `slot(, )`, and reciprocally to listen to a specific parameter with `slot.on(, )`. Lazy callbacks are now called for each parameter separately: connect is called with param as argument the first time someone connects somewhere with `.on(, ...)` and same thing for disconnect. - is this first parameter is skipped, then a default parameter is used. This help us when the slot has no need for such a parameter and for retro-compatibility concerns. --- README.md | 35 ++++-- src/Constants.ts | 1 + src/Handler.ts | 2 +- src/Message.ts | 46 ++++++-- src/Slot.ts | 245 +++++++++++++++++++++++++++------------- src/Transport.ts | 236 ++++++++++++++++++++++++--------------- src/index.ts | 1 + test/Event.test.ts | 9 +- test/Slot.test.ts | 246 +++++++++++++++++++++++++---------------- test/TestChannel.ts | 2 + test/Transport.test.ts | 70 ++++++------ 11 files changed, 580 insertions(+), 313 deletions(-) create mode 100644 src/Constants.ts diff --git a/README.md b/README.md index 4bc73cd..4bcbcca 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,15 @@ Once connected, the clients can start by using the slots on the event bus // firstModule.ts import EventBus from './firstModule.EventBus.ts' +// Slots can be called with a parameter, here 'michel' +EventBus.say('michel', 'Hello') + +// Or one can rely on the default parameter: here DEFAULT_PARAMETER +// is implicitely used. +EventBus.say('Hello') + // Triggering an event always returns a promise -EventBus.sayHello('michel').then(() => { +EventBus.say('michel', 'Hello').then(() => { ... }) @@ -88,12 +95,14 @@ EventBus.ping() // secondModule.ts import EventBus from './secondModule.EventBus.ts' -EventBus.ping().on(() => { +// Add a listener on the default parameter +EventBus.ping.on(() => { console.log('pong') }) -EventBus.sayHello.on(name => { - console.log(`${name} said hello!`) +// Or listen to a specific parameter +EventBus.say.on('michel', (words) => { + console.log('michel said', words) }) // Event subscribers can respond to the event synchronously (by returning a value) @@ -130,21 +139,27 @@ Remote or local clients are considered equally. If a client was already connecte at the time when `lazy` is called, the "connect" callback is called immediately. ```typescript -const connect = () => { - console.log('Someone somewhere has begun listening to the slot with `.on`.') +const connect = (param) => { + console.log(`Someone somewhere has begun listening to the slot with .on on ${param}.`) } -const disconnect = () => { - console.log('No one is listening to the slot anymore.') +const disconnect = (param) => { + console.log(`No one is listening to the slot anymore on ${param}.`) } const disconnectLazy = EventBus.ping.lazy(connect, disconnect) const unsubscribe = EventBus.ping().on(() => { }) -// console output: 'Someone somewhere has begun listening to the slot with `.on`.' +// console output: 'Someone somewhere has begun listening to the slot with .on on $_DEFAULT_$.' + +unsubscribe() +// console output: 'No one is listening to the slot anymore on $_DEFAULT_$.' + +const unsubscribe = EventBus.ping().on('parameter', () => { }) +// console output: 'Someone somewhere has begun listening to the slot with .on on parameter.' unsubscribe() -// console output: 'No one is listening to the slot anymore.' +// console output: 'No one is listening to the slot anymore on parameter.' // Remove the callbacks. // "disconnect" is called one last time if there were subscribers left on the slot. diff --git a/src/Constants.ts b/src/Constants.ts new file mode 100644 index 0000000..cca35da --- /dev/null +++ b/src/Constants.ts @@ -0,0 +1 @@ +export const DEFAULT_PARAM = '$_DEFAULT_$' diff --git a/src/Handler.ts b/src/Handler.ts index 74d7c7e..448fab7 100644 --- a/src/Handler.ts +++ b/src/Handler.ts @@ -38,7 +38,7 @@ function callOneHandler(h: Handler, data: any): Promise { * of all these handlers is discarded. */ export function callHandlers(data: any, handlers: Handler[]): Promise { - if (handlers.length === 0) { + if (!handlers || handlers.length === 0) { // No one is listening return new Promise(resolve => { /* NOOP, this promise will never resolve */ }) } else if (handlers.length === 1) { diff --git a/src/Message.ts b/src/Message.ts index 2a0bf0b..c2115b2 100644 --- a/src/Message.ts +++ b/src/Message.ts @@ -1,14 +1,44 @@ -export type TransportRequest = { type: 'request', slotName: string, id: string, data: any } -export type TransportResponse = { type: 'response', slotName: string, id: string, data: any } -export type TransportError = { type: 'error', slotName: string, id: string, message: string, stack?: string } -export type TransportRegistrationMessage = { type: 'handler_registered', slotName: string } -export type TransportUnregistrationMessage = { type: 'handler_unregistered', slotName: string } +export type TransportRequest = { + type: 'request', + slotName: string, + id: string, + data: any, + param: string +} + +export type TransportResponse = { + type: 'response', + slotName: string, + id: string, + data: any, + param: string +} + +export type TransportError = { + id: string, + message: string, + param: string, + slotName: string, + stack?: string, + type: 'error' +} + +export type TransportRegistrationMessage = { + type: 'handler_registered', + slotName: string, + param: string +} +export type TransportUnregistrationMessage = { + type: 'handler_unregistered', + slotName: string, + param: string +} export type TransportMessage = - TransportRegistrationMessage - | TransportUnregistrationMessage + TransportError + | TransportRegistrationMessage | TransportRequest | TransportResponse - | TransportError + | TransportUnregistrationMessage export function isTransportMessage(m: { type: string }): m is TransportMessage { switch (m.type) { diff --git a/src/Slot.ts b/src/Slot.ts index 70555b3..ddc9bf8 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -1,14 +1,40 @@ import { Transport } from './Transport' import { Handler, callHandlers } from './Handler' +import { DEFAULT_PARAM } from './Constants' const signalNotConnected = () => { throw new Error('Slot not connected') } const FAKE_SLOT: any = () => signalNotConnected() FAKE_SLOT.on = signalNotConnected -export type SimpleCallback = () => void +export type LazyCallback = (param: string) => void export type Unsubscribe = () => void +// Key to store local handlers in the `handlers` map +const LOCAL_TRANSPORT = 'LOCAL_TRANSPORT' + +// Type to store handlers, by transport, by param +type TransportHandlers = { [param: string]: Handler[] } +type Handlers = { [handlerKey: string]: TransportHandlers } + +// Find handlers for given param accross transports +const getParamHandlers = (param: string, handlers: Handlers): Handler[] => + Object.keys(handlers).reduce((paramHandlers, transportKey) => { + return paramHandlers.concat(handlers[transportKey][param] || []) + }, [] as Handler[]) + +// Find all params with registered callbacks +const findAllUsedParams = (handlers: Handlers): string[] => + Object.keys(handlers).reduce((params, transportKey) => { + const transportHandlers = handlers[transportKey] + const registeredParams = Object.keys(transportHandlers).filter( + param => (transportHandlers[param] || []).length > 0 + ) + const paramsMaybeDuplicate = [...params, ...registeredParams] + const paramsUniq = [...new Set(paramsMaybeDuplicate)] + return paramsUniq + }, [] as string[]) + /** * Represents an event shared by two modules. * @@ -18,13 +44,23 @@ export type Unsubscribe = () => void * The slot can also be subscribed to, by using the `on` property. */ export interface Slot { - - // Make the Slot callable: this is how an event is triggered // TODO: Find a solution to make it possible to omit the requestData as // optional only when explicitly typed as such by the client. - (requestData: RequestData): Promise - on: (handler: Handler) => Unsubscribe - lazy: (connect: () => void, disconnect: () => void) => Unsubscribe + + // Trigger the slot with explicit param + (param: string, requestData: RequestData): Promise, + + // Trigger the slot with default param + (requestData: RequestData): Promise, + + // Listen to events sent through the slot on explicit param + on(param: string, handler: Handler): Unsubscribe, + + // Listen to events sent through the slot on default param + on(handler: Handler): Unsubscribe + + // Provide the slot with lazy callbacks + lazy(connect: LazyCallback, disconnect: LazyCallback): Unsubscribe, } /** @@ -36,88 +72,108 @@ export function slot(): Slot(slotName: string, transports: Transport[]): Slot { +export function connectSlot( + slotName: string, + transports: Transport[] +): Slot { - // These will be all the handlers for this slot (eg. all the callbacks registered with `Slot.on()`) - const handlers = [] as Handler[] + /* + * ======================== + * Internals + * ======================== + */ - // For each transport we create a Promise that will be fulfilled only - // when the far-end has registered a handler. - // This prevents `triggers` from firing *before* any far-end is listening. - let remoteHandlersConnected = [] as Promise[] + // These will be all the handlers for this slot, for each transport, for each param + const handlers: Handlers = Object.assign( + // local handlers are stored under LOCAL_TRANSPORT key + { [LOCAL_TRANSPORT]: {} }, - // Lazy - const lazyConnectCallbacks: SimpleCallback[] = [] - const lazyDisonnectCallbacks: SimpleCallback[] = [] + // transport handlers are stored by index + transports.map(_t => ({})) + ) - const callLazyConnectCallbacks = () => lazyConnectCallbacks.forEach(c => c()) - const callLazyDisonnectCallbacks = () => lazyDisonnectCallbacks.forEach(c => c()) + // Lazy callbacks + const lazyConnectCallbacks: LazyCallback[] = [] + const lazyDisonnectCallbacks: LazyCallback[] = [] - // Signal to all transports that we will accept handlers for this slotName - transports.forEach(t => { + const callLazyConnectCallbacks = (param: string) => + lazyConnectCallbacks.forEach(c => c(param)) - // Variable holds the promise's `resolve` function. A little hack - // allowing us to have the notion of "deferred" promise fulfillment. - let onHandlerRegistered: Function - let remoteHandlerPromise: Promise + const callLazyDisonnectCallbacks = (param: string) => + lazyDisonnectCallbacks.forEach(c => c(param)) - const awaitHandlerRegistration = () => { - // We store a reference to this promise to be resolved once the far-end has responded. - remoteHandlerPromise = new Promise(resolve => onHandlerRegistered = resolve) - remoteHandlersConnected.push(remoteHandlerPromise) + // Signal to all transports that we will accept handlers for this slotName + transports.forEach((transport, transportKey) => { + + const remoteHandlerRegistered = ( + param: string, + handler: Handler + ) => { + const paramHandlers = handlers[transportKey][param] || [] + handlers[transportKey][param] = paramHandlers.concat(handler) + if (getParamHandlers(param, handlers).length === 1) callLazyConnectCallbacks(param) } - awaitHandlerRegistration() + const remoteHandlerUnregistered = ( + param: string, + handler: Handler + ) => { + const paramHandlers = handlers[transportKey][param] || [] + const handlerIndex = paramHandlers.indexOf(handler) + if (handlerIndex > -1) handlers[transportKey][param].splice(handlerIndex, 1) + if (getParamHandlers(param, handlers).length === 0) callLazyDisonnectCallbacks(param) + } - t.onRemoteHandlerRegistered(slotName, (handler: Handler) => { - handlers.push(handler) - if (handlers.length === 1) callLazyConnectCallbacks() + transport.addRemoteHandlerRegistrationCallback(slotName, remoteHandlerRegistered) + transport.addRemoteHandlerUnregistrationCallback(slotName, remoteHandlerUnregistered) + }) - // We signal that the transport is ready for this slot by resolving the - // promise stored in `remoteHandlersConnected`. - onHandlerRegistered() - }) + /* + * ======================== + * API + * ======================== + */ - t.onRemoteHandlerUnregistered(slotName, handler => { - const handlerIndex = handlers.indexOf(handler) - handlers.splice(handlerIndex, 1) - if (handlers.length === 0) callLazyDisonnectCallbacks() + /* + * Sends data through the slot. + */ - // When the channel disconnects we also need to remove the - // promise blocking the trigger. - remoteHandlersConnected.splice(remoteHandlersConnected.indexOf(remoteHandlerPromise), 1) + // Signature for Slot() using default param + function trigger(data: any): Promise - // And also insert a new promise that will be re-fulfilled when - // remote handlers are re-registered. - awaitHandlerRegistration() - }) - }) + // Signature for Slot(, ) + function trigger(param: string, data: any): Promise + // Combined signatures + function trigger(firstArg: string | any, secondArg?: any): Promise { + const data: any = secondArg || firstArg + const param: string = secondArg ? firstArg : DEFAULT_PARAM + const allHandlers = getParamHandlers(param, handlers) + return callHandlers(data, allHandlers) + } - // Called when a client triggers (calls) the slot - // Before calling the handler, we also check that all the declared transports - // are ready to answer to the request. - // If no transports were declared, call directly the handlers. - const trigger: any = (data: any) => (transports.length) ? - Promise.all(remoteHandlersConnected).then(() => callHandlers(data, handlers)) : - callHandlers(data, handlers) + /* + * Allows a client to be notified when a first + * client connects to the slot with `.on`, and when the + * last client disconnects from it. + */ - trigger.lazy = ( - firstClientConnectCallback: SimpleCallback, - lastClientDisconnectCallback: SimpleCallback - ): Unsubscribe => { + function lazy( + firstClientConnectCallback: LazyCallback, + lastClientDisconnectCallback: LazyCallback + ): Unsubscribe { lazyConnectCallbacks.push(firstClientConnectCallback) lazyDisonnectCallbacks.push(lastClientDisconnectCallback) - // Call connect callback - if (handlers.length > 0) firstClientConnectCallback() + // Call connect callback immediately if handlers were already registered + findAllUsedParams(handlers).forEach(firstClientConnectCallback) return () => { // Call disconnect callback - if (handlers.length > 0) lastClientDisconnectCallback() + findAllUsedParams(handlers).forEach(lastClientDisconnectCallback) - // Stop lazy connect and disconnect process + // Stop lazy connect and disconnect processes const connectIx = lazyConnectCallbacks.indexOf(firstClientConnectCallback) if (connectIx > -1) lazyConnectCallbacks.splice(connectIx, 1) @@ -126,34 +182,67 @@ export function connectSlot(slotName: string, transports: Trans } } - // Called when a client subscribes to the slot (with `Slot.on()`) - trigger.on = (handler: Handler): Unsubscribe => { + /* + * Allows a client to be notified when someone + * sends data through the slot. + */ + + // Signature for Slot.on() using default param + function on( + handler: Handler + ): Unsubscribe + + // Signature for Slot.on(, ) + function on( + param: string, + handler: Handler + ): Unsubscribe + + // Combined signatures + function on( + paramOrHandler: string | Handler, + handlerIfParam?: Handler + ): Unsubscribe { + + // Get param and handler from arguments, depending if param was passed or not + let param = "" + let handler: Handler = () => new Promise(r => r()) + + if (typeof paramOrHandler === 'string') { + param = paramOrHandler + handler = handlerIfParam || handler + } + else { + param = DEFAULT_PARAM + handler = paramOrHandler + } // Register a remote handler with all of our remote transports - transports.forEach(t => t.registerHandler(slotName, handler)) + transports.forEach(t => t.registerHandler(slotName, param, handler)) // Store this handler - handlers.push(handler) + handlers[LOCAL_TRANSPORT][param] = + (handlers[LOCAL_TRANSPORT][param] || []).concat(handler) - // Call lazy connect callbacks if needed - if (handlers.length === 1) callLazyConnectCallbacks() + // Call lazy connect callbacks if there is at least one handler + const paramHandlers = getParamHandlers(param, handlers) + if (paramHandlers.length === 1) callLazyConnectCallbacks(param) // Return the unsubscription function return () => { // Unregister remote handler with all of our remote transports - transports.forEach(t => t.unregisterHandler(slotName, handler)) + transports.forEach(t => t.unregisterHandler(slotName, param, handler)) - const ix = handlers.indexOf(handler) - if (ix !== -1) { - handlers.splice(ix, 1) - } + const localParamHandlers = handlers[LOCAL_TRANSPORT][param] || [] + const ix = localParamHandlers.indexOf(handler) + if (ix !== -1) handlers[LOCAL_TRANSPORT][param].splice(ix, 1) - // Call lazy disconnect callbacks if needed - if (!handlers.length) callLazyDisonnectCallbacks() + // Call lazy disconnect callbacks if there are no handlers anymore + const paramHandlers = getParamHandlers(param, handlers) + if (paramHandlers.length === 0) callLazyDisonnectCallbacks(param) } - } - return trigger as Slot + return Object.assign(trigger, { on, lazy }) } diff --git a/src/Transport.ts b/src/Transport.ts index 2c6ce53..418fb0f 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -1,12 +1,12 @@ import { Handler, callHandlers } from './Handler' import { Channel } from './Channel' import { + TransportError, + TransportMessage, TransportRegistrationMessage, TransportUnregistrationMessage, - TransportError, - TransportRequest, TransportResponse, - TransportMessage + TransportRequest } from './Message' let _ID = 0 @@ -34,34 +34,51 @@ export type PendingRequests = { } } +type RemoteHandlerCallback = + (param: string, handler: Handler) => void + export class Transport { - private _localHandlers: { [slotName: string]: Handler[] } = {} - private _localHandlerRegistrations: TransportRegistrationMessage[] = [] + private _localHandlers: { + [slotName: string]: { + [param: string]: Handler[] + } + } = {} + + private _localHandlerRegistrations: { + [param: string]: TransportRegistrationMessage[] + } = {} /** * Handlers created by the Transport. When an event is triggered locally, * these handlers will make a request to the far end to handle this event, * and store a PendingRequest */ - private _remoteHandlers: { [slotName: string]: Handler } = {} + private _remoteHandlers: { + [slotName: string]: { + [param: string]: Handler + } + } = {} /** * Callbacks provided by each slot allowing to register remote handlers * created by the Transport */ - private _remoteHandlerRegistrationCallbacks: { [slotName: string]: (newHandler: Handler) => void } = {} + private _remoteHandlerRegistrationCallbacks: + { [slotName: string]: RemoteHandlerCallback } = {} /** * Callbacks provided by each slot allowing to unregister the remote handlers * created by the Transport, typically when the remote connection is closed. */ - private _remoteHandlerDeletionCallbacks: { [slotName: string]: (newHandler: Handler) => void } = {} + private _remoteHandlerDeletionCallbacks: + { [slotName: string]: RemoteHandlerCallback } = {} /** * Requests that have been sent to the far end, but have yet to be fulfilled */ - private _pendingRequests: PendingRequests = {} + private _pendingRequests: { [param: string]: PendingRequests } = {} + private _channelReady = false constructor(private _channel: Channel) { @@ -73,8 +90,10 @@ export class Transport { return this._responseReceived(message) case 'handler_registered': return this._registerRemoteHandler(message) + case 'handler_registered': + return this._registerRemoteHandler(message) case 'handler_unregistered': - return this._unregisterRemoteHandler(message.slotName) + return this._unregisterRemoteHandler(message) case 'error': return this._errorReceived(message) default: @@ -85,8 +104,10 @@ export class Transport { this._channelReady = true // When the far end connects, signal which local handlers are set - this._localHandlerRegistrations.forEach(msg => { - this._channel.send(msg) + Object.keys(this._localHandlerRegistrations).forEach(param => { + this._localHandlerRegistrations[param].forEach(msg => { + this._channel.send(msg) + }) }) }) this._channel.onDisconnect(() => { @@ -108,32 +129,33 @@ export class Transport { * and send either a response or an error mirroring the request id, * depending on the status of the resulting promise */ - private _requestReceived({ slotName, data, id }: TransportRequest): void { + private _requestReceived({ slotName, data, id, param }: TransportRequest): void { // Get local handlers - const handlers = this._localHandlers[slotName] - if (!handlers) { - return - } + const slotHandlers = this._localHandlers[slotName] + if (!slotHandlers) return + + const handlers = slotHandlers[param] + if (!handlers) return // Call local handlers with the request data callHandlers(data, handlers) - // If the resulting promise is fulfilled, send a response to the far end .then(response => this._channel.send({ type: 'response', slotName, id, - data: response - }) - ) + data: response, + param + })) // If the resulting promise is rejected, send an error to the far end .catch((error: Error) => this._channel.send({ - type: 'error', - slotName, id, message: `${error}`, - stack: error.stack || '' + param, + slotName, + stack: error.stack || '', + type: 'error' })) } @@ -141,26 +163,28 @@ export class Transport { * When a response is received from the far end, resolve the pending promise * with the received data */ - private _responseReceived({ slotName, data, id }: TransportResponse): void { - if (!this._pendingRequests[slotName][id]) { + private _responseReceived({ slotName, data, id, param }: TransportResponse): void { + const slotRequests = this._pendingRequests[slotName] + if (!slotRequests || !slotRequests[param] || !slotRequests[param][id]) { return } - this._pendingRequests[slotName][id].resolve(data) - delete this._pendingRequests[slotName][id] + + slotRequests[param][id].resolve(data) + delete slotRequests[param][id] } /** * When an error is received from the far end, reject the pending promise * with the received data */ - private _errorReceived({ slotName, id, message, stack }: TransportError): void { - if (!this._pendingRequests[slotName][id]) { - return - } - const error = new Error(`${message} on ${slotName}`) - error.stack = stack || error.stack - this._pendingRequests[slotName][id].reject(error) - delete this._pendingRequests[slotName][id] + private _errorReceived({ slotName, id, message, stack, param }: TransportError): void { + const slotRequests = this._pendingRequests[slotName] + if (!slotRequests || !slotRequests[param] || !slotRequests[param][id]) return + + const error = new Error(`${message} on ${slotName} with param ${param}`) + error.stack = stack || error.stack // TODO: stack is not mutable on edge, check? + this._pendingRequests[slotName][param][id].reject(error) + delete this._pendingRequests[slotName][param][id] } /** @@ -170,14 +194,15 @@ export class Transport { * and rejection function * */ - private _registerRemoteHandler({ slotName }: TransportMessage): void { + private _registerRemoteHandler({ slotName, param }: TransportRegistrationMessage): void { + const addHandler = this._remoteHandlerRegistrationCallbacks[slotName] - if (!addHandler) { - return - } - if (this._remoteHandlers[slotName]) { - return - } + if (!addHandler) return + + const slotHandlers = this._remoteHandlers[slotName] + + if (slotHandlers && slotHandlers[param]) return + const remoteHandler = (requestData: any) => new Promise((resolve, reject) => { // If the channel is not ready, reject immediately // TODO think of a better (buffering...) solution in the future @@ -190,71 +215,90 @@ export class Transport { if (!this._pendingRequests[slotName]) { this._pendingRequests[slotName] = {} } + if (!this._pendingRequests[slotName][param]) { + this._pendingRequests[slotName][param] = {} + } const id = getId() - this._pendingRequests[slotName][id] = { resolve, reject } + this._pendingRequests[slotName][param][id] = { resolve, reject } // Send a request to the far end this._channel.send({ type: 'request', id, slotName, + param, data: requestData }) // Handle request timeout if needed setTimeout(() => { - if (this._pendingRequests[slotName][id]) { - this._pendingRequests[slotName][id].reject(new Error(`${ERRORS.TIMED_OUT} on ${slotName}`)) - delete this._pendingRequests[slotName][id] + const request = this._pendingRequests[slotName][param][id] + if (request) { + const error = new Error(`${ERRORS.TIMED_OUT} on ${slotName} with param ${param}`) + request.reject(error) + delete this._pendingRequests[slotName][param][id] } }, this._channel.timeout) }) - this._remoteHandlers[slotName] = remoteHandler - addHandler(remoteHandler) + + this._remoteHandlers[slotName] = this._remoteHandlers[slotName] || {} + this._remoteHandlers[slotName][param] = remoteHandler + + addHandler(param, remoteHandler) } - private _unregisterRemoteHandler(slotName: string): void { + private _unregisterRemoteHandler( + { slotName, param }: { slotName: string, param: string } + ): void { const unregisterRemoteHandler = this._remoteHandlerDeletionCallbacks[slotName] - const remoteHandler = this._remoteHandlers[slotName] + const slotHandlers = this._remoteHandlers[slotName] + if (!slotHandlers) return + + const remoteHandler = slotHandlers[param] if (remoteHandler && unregisterRemoteHandler) { - unregisterRemoteHandler(remoteHandler) - delete this._remoteHandlers[slotName] + unregisterRemoteHandler(param, remoteHandler) + delete this._remoteHandlers[slotName][param] } } private _unregisterAllRemoteHandlers(): void { Object.keys(this._remoteHandlerDeletionCallbacks) .forEach(slotName => { - this._unregisterRemoteHandler(slotName) + const slotHandlers = this._remoteHandlers[slotName] + if (!slotHandlers) return + + const params = Object.keys(slotHandlers).filter(param => + (slotHandlers[param] || []).length > 0 + ) + + params.forEach(param => this._unregisterRemoteHandler({ slotName, param })) }) } private _rejectAllPendingRequests(e: Error): void { Object.keys(this._pendingRequests).forEach(slotName => { - Object.keys(this._pendingRequests[slotName]).forEach(id => { - this._pendingRequests[slotName][id].reject(e) + Object.keys(this._pendingRequests[slotName]).forEach(param => { + Object.keys(this._pendingRequests[slotName][param]).forEach(id => { + this._pendingRequests[slotName][param][id].reject(e) + }) }) this._pendingRequests[slotName] = {} }) } - /** - * Called on slot creation. - * The provided callbacks will be used when remote handlers are registered, - * to add a corresponding local handler. - */ - public onRemoteHandlerRegistered(slotName: string, addLocalHandler: (h: Handler) => void): void { + public addRemoteHandlerRegistrationCallback( + slotName: string, + addLocalHandler: (p: string, h: Handler) => void + ): void { if (!this._remoteHandlerRegistrationCallbacks[slotName]) { this._remoteHandlerRegistrationCallbacks[slotName] = addLocalHandler } } - /** - * Called on slot creation. - * The provided callbacks will be used when the far end disconnects, - * to remove the handlers we had registered. - */ - public onRemoteHandlerUnregistered(slotName: string, removeHandler: (h: Handler) => void): void { + public addRemoteHandlerUnregistrationCallback( + slotName: string, + removeHandler: (p: string, h: Handler) => void + ): void { if (!this._remoteHandlerDeletionCallbacks[slotName]) { this._remoteHandlerDeletionCallbacks[slotName] = removeHandler } @@ -264,21 +308,34 @@ export class Transport { * Called when a local handler is registered, to send a `handler_registered` * message to the far end. */ - public registerHandler(slotName: string, handler: Handler): void { + public registerHandler( + slotName: string, + param: string, + handler: Handler + ): void { if (!this._localHandlers[slotName]) { - this._localHandlers[slotName] = [] + this._localHandlers[slotName] = {} } - this._localHandlers[slotName].push(handler) + if (!this._localHandlers[slotName][param]) { + this._localHandlers[slotName][param] = [] + } + this._localHandlers[slotName][param].push(handler) + /** * We notify the far end when adding the first handler only, as they * only need to know if at least one handler is connected. */ - if (this._localHandlers[slotName].length === 1) { + if (this._localHandlers[slotName][param].length === 1) { const registrationMessage: TransportRegistrationMessage = { type: 'handler_registered', + param, slotName } - this._localHandlerRegistrations.push(registrationMessage) + this._localHandlerRegistrations[param] = + this._localHandlerRegistrations[param] || [] + + this._localHandlerRegistrations[param].push(registrationMessage) + if (this._channelReady) { this._channel.send(registrationMessage) } @@ -289,23 +346,24 @@ export class Transport { * Called when a local handler is unregistered, to send a `handler_unregistered` * message to the far end. */ - public unregisterHandler(slotName: string, handler: Handler): void { - if (this._localHandlers[slotName]) { - const ix = this._localHandlers[slotName].indexOf(handler) + public unregisterHandler( + slotName: string, + param: string, + handler: Handler + ): void { + const slotLocalHandlers = this._localHandlers[slotName] + if (slotLocalHandlers && slotLocalHandlers[param]) { + const ix = slotLocalHandlers[param].indexOf(handler) if (ix > -1) { - this._localHandlers[slotName].splice(ix, 1) - /** - * We notify the far end when removing the last handler only, as they - * only need to know if at least one handler is connected. - */ - if (this._localHandlers[slotName].length === 0) { - const unregistrationMessage: TransportUnregistrationMessage = { - type: 'handler_unregistered', - slotName - } - if (this._channelReady) { - this._channel.send(unregistrationMessage) - } + slotLocalHandlers[param].splice(ix, 1) + // TODO: shouldn't we send the message only when the last handler is removed? + const unregistrationMessage: TransportUnregistrationMessage = { + type: 'handler_unregistered', + param, + slotName + } + if (this._channelReady) { + this._channel.send(unregistrationMessage) } } } diff --git a/src/index.ts b/src/index.ts index 8e1b7a1..c3a7167 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,3 +4,4 @@ export { Channel } from './Channel' export { GenericChannel } from './Channels/GenericChannel' export { ChunkedChannel } from './Channels/ChunkedChannel' export { TransportMessage } from './Message' +export { DEFAULT_PARAM } from './Constants' diff --git a/test/Event.test.ts b/test/Event.test.ts index 30230b5..326a8a4 100644 --- a/test/Event.test.ts +++ b/test/Event.test.ts @@ -2,8 +2,9 @@ import 'should' import {slot} from './../src/Slot' import {combineEvents, createEventBus} from './../src/Events' -import {TransportMessage} from './../src/Message' +// import {TransportMessage} from './../src/Message' import {TestChannel} from './TestChannel' +import { DEFAULT_PARAM } from './../src/Constants' import * as sinon from 'sinon' describe('combineEvents()', () => { @@ -39,6 +40,8 @@ describe('createEventBus()', () => { numberToString: slot() } + const param = DEFAULT_PARAM + it('should correctly create an event bus with no channels', async () => { // Attempting to use the events without having @@ -68,6 +71,7 @@ describe('createEventBus()', () => { eventBus.numberToString.on(num => num.toString()) channel.sendSpy.calledWith({ type: 'handler_registered', + param, slotName: 'numberToString' }).should.be.True() @@ -76,6 +80,7 @@ describe('createEventBus()', () => { channel.fakeReceive({ type: 'request', slotName: 'numberToString', + param, id: '0', data: 5 }) @@ -84,9 +89,9 @@ describe('createEventBus()', () => { channel.sendSpy.calledWith({ type: 'response', slotName: 'numberToString', + param, id: '0', data: '5' }).should.be.True() }) - }) diff --git a/test/Slot.test.ts b/test/Slot.test.ts index 299c359..813428f 100644 --- a/test/Slot.test.ts +++ b/test/Slot.test.ts @@ -4,6 +4,7 @@ import { spy } from 'sinon' import { connectSlot } from './../src/Slot' import { TestChannel } from './TestChannel' import { Transport } from './../src/Transport' +import { DEFAULT_PARAM } from './../src/Constants' const makeTestTransport = () => { const channel = new TestChannel() @@ -12,101 +13,126 @@ const makeTestTransport = () => { return { channel, transport } } -describe('connectSlot()', () => { +describe('connectSlot', () => { + + context('without parameter', () => { + + describe('trigger', () => { + it('should use default parameter', async () => { + const numberToString = connectSlot('numberToString', []) + numberToString.on(DEFAULT_PARAM, num => `${num.toString()}`) + + const res = await numberToString(56) + res.should.eql('56') + }) + }) + + describe('on', () => { + it('should use default parameter', async () => { + const numberToString = connectSlot('numberToString', []) + numberToString.on(num => `${num.toString()}`) + + const res = await numberToString(DEFAULT_PARAM, 56) + res.should.eql('56') + }) + }) + }) context('with no transports', () => { - it('should call the local handler if there is only one', () => { + it('should call a single local handler registered for a parameter', async () => { const numberToString = connectSlot('numberToString', []) - numberToString.on(num => num.toString()) - return numberToString(56) - .then(res => res.should.eql('56')) + numberToString.on('a', num => `a${num.toString()}`) + + const res = await numberToString('a', 56) + res.should.eql('a56') }) it('should call all handlers if there is more than one', async () => { const broadcastBool = connectSlot('broadcastBool', []) const results: string[] = [] - broadcastBool.on(b => { - results.push(`1:${b}`) - }) - broadcastBool.on(b => { - results.push(`2:${b}`) - }) - broadcastBool.on(b => { - results.push(`3:${b}`) - }) - await broadcastBool(true) - results.should.eql([ - '1:true', - '2:true', - '3:true' - ]) + + broadcastBool.on('a', b => { results.push(`1:${b}`) }) + broadcastBool.on('a', b => { results.push(`2:${b}`) }) + broadcastBool.on('a', b => { results.push(`3:${b}`) }) + + // Should not be called: different parameter + broadcastBool.on('b', b => { results.push(`4:${b}`) }) + + await broadcastBool('a', true) + + results.should.eql([ '1:true', '2:true', '3:true' ]) + }) + + it('should allow subscribing to multiple parameters', async () => { + const broadcastBool = connectSlot('broadcastBool', []) + let value = 0 + + broadcastBool.on('add', n => { value += n }) + broadcastBool.on('remove', n => { value -= n }) + + await broadcastBool('add', 3) + value.should.eql(3) + + await broadcastBool('remove', 2) + value.should.eql(1) }) it('should allow to unregister handlers', async () => { - const broadcastNumber = connectSlot('broadcastNumber', []) - let results: number[] = [] - const runTest = (n: number) => { - results = [] - return broadcastNumber(n) - } - - // Add two handlers and save references to - // the unsub function of the second - broadcastNumber.on(n => { - results.push(n - 1) - }) - const unreg = broadcastNumber.on(n => { - results.push(n - 2) - }) + const broadcastBool = connectSlot('broadcastBool', []) + let value = 0 + + const unsub = broadcastBool.on('add', n => { value += n }) + broadcastBool.on('add', n => { value += n }) - // Trigger the event once - await runTest(5) + await broadcastBool('add', 3) + value.should.eql(6) // 2 * 3 - // both handlers should have been called - results.should.eql([4, 3]) + unsub() - // unregister the second handler, then trigger - // the event a second time: only the second handler - // should be called - unreg() - await runTest(56) - results.should.eql([55]) + await broadcastBool('add', 3) + value.should.eql(9) // 6 + 1 * 3 }) - it('should call lazy connect and disconnect', () => { + it('should call lazy connect and disconnect with parameter', () => { const broadcastBool = connectSlot('broadcastBool', []) + const param = 'param' + const connect = spy() const disconnect = spy() broadcastBool.lazy(connect, disconnect) - const unsubscribe = broadcastBool.on(() => {}) + const unsubscribe = broadcastBool.on(param, () => {}) - if (!connect.called) throw new Error('connect should have been called') - if (disconnect.called) throw new Error('disconnect should not have been called') + if (!connect.calledWith(param)) + throw new Error('connect should have been called with parameter') + + if (disconnect.called) + throw new Error('disconnect should not have been called with parameter') unsubscribe() - if (!disconnect.called) throw new Error('disconnect should have been called') + if (!disconnect.calledWith(param)) + throw new Error('disconnect should have been called with parameter') }) - }) context('with local and remote handlers', () => { - it('should call both local handlers and remote handlers', async () => { + // TODO: feature to be dicussed + it.skip('should call both local handlers and remote handlers', async () => { const {channel, transport} = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport]) let localCalled = false - broadcastBool.on(b => { localCalled = true }) + broadcastBool.on(_b => { localCalled = true }) const triggerPromise = broadcastBool(true) // Handlers should not be called until a remote handler is registered await Promise.resolve() localCalled.should.be.False() - channel.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool', param: DEFAULT_PARAM}) // setTimeout(0) to yield control to ts-event-bus internals, // so that the call to handlers can be processed @@ -126,16 +152,23 @@ describe('connectSlot()', () => { type: 'response', id: request.id, slotName: 'broadcastBool', - data: null + data: null, + param: DEFAULT_PARAM }) await triggerPromise }) describe('lazy', () => { it('should call connect and disconnect', () => { + const param = 'param' + const { channel: channel1, transport: transport1 } = makeTestTransport() const { channel: channel2, transport: transport2 } = makeTestTransport() - const broadcastBool = connectSlot('broadcastBool', [transport1, transport2]) + + const broadcastBool = connectSlot( + 'broadcastBool', + [transport1, transport2] + ) const connect = spy() const disconnect = spy() @@ -143,89 +176,114 @@ describe('connectSlot()', () => { broadcastBool.lazy(connect, disconnect) // Simulate two remote connextions to the slot - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) - channel2.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) + + channel2.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) // Connect should have been called once - if (!connect.calledOnce) throw new Error('connect should have been called once') + if (!connect.calledOnceWith(param)) + throw new Error('connect should have been called once with param') // Disconnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') + if (disconnect.called) + throw new Error('disconnect should not have been called') // Disconnect first remote client - channel1.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disconnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') + if (disconnect.called) + throw new Error('disconnect should not have been called') // Disconnect second remote client - channel2.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + channel2.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disconnect should have been called once - if (!disconnect.calledOnce) throw new Error('disconnect should have been called once') + if (!disconnect.calledOnceWith(param)) + throw new Error('disconnect should have been called once with param') }) it('should support multiple lazy calls', () => { const { channel: channel1, transport: transport1 } = makeTestTransport() + const param = 'param' + const connect1 = spy() const disconnect1 = spy() const connect2 = spy() const disconnect2 = spy() - const broadcastBool = connectSlot('broadcastBool', [transport1]) + const broadcastBool = connectSlot( + 'broadcastBool', + [transport1] + ) broadcastBool.lazy(connect1, disconnect1) broadcastBool.lazy(connect2, disconnect2) - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) // Connects should have been called once - if (!connect1.calledOnce) throw new Error('connect1 should have been called once') - if (!connect2.calledOnce) throw new Error('connect1 should have been called once') + if (!connect1.calledOnceWith(param)) + throw new Error('connect1 should have been called once with param') - channel1.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + if (!connect2.calledOnceWith(param)) + throw new Error('connect2 should have been called once with param') + + channel1.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disonnects should have been called once - if (!disconnect1.calledOnce) throw new Error('connect1 should have been called once') - if (!disconnect2.calledOnce) throw new Error('connect1 should have been called once') + if (!disconnect1.calledOnceWith(param)) + throw new Error('disconnect1 should have been called once with param') + + if (!disconnect2.calledOnceWith(param)) + throw new Error('disconnect2 should have been called once with param') }) it('should call connect if transport was registered before lazy was called', () => { + const param = 'param' const { channel: channel1, transport: transport1 } = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport1]) const connect = spy() // Register remote handler *before* calling lazy - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) broadcastBool.lazy(connect, () => {}) // Connect should have been called once - if (!connect.calledOnce) throw new Error('connect should have been called once') - }) - - it('should call disconnect on unsubscribe when remote client is connected', () => { - const { channel: channel1, transport: transport1 } = makeTestTransport() - const broadcastBool = connectSlot('broadcastBool', [transport1]) - - const disconnect = spy() - - // Register remote handler - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) - - // Connect lazy - const unsubscribe = broadcastBool.lazy(() => {}, disconnect) - - // Disonnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') - - unsubscribe() - - // Disconnect should have been called once - if (!disconnect.calledOnce) throw new Error('disconnect should have been called once') + if (!connect.calledOnceWith(param)) + throw new Error('connect should have been called once with param') }) }) }) diff --git a/test/TestChannel.ts b/test/TestChannel.ts index aa5b44f..e0d81e6 100644 --- a/test/TestChannel.ts +++ b/test/TestChannel.ts @@ -2,6 +2,7 @@ import * as sinon from 'sinon' import { ChunkedChannel } from './../src/Channels/ChunkedChannel' import { GenericChannel } from './../src/Channels/GenericChannel' import { TransportMessage } from './../src/Message' +import { DEFAULT_PARAM } from './../src/Constants' export class TestChannel extends GenericChannel { @@ -27,6 +28,7 @@ export class TestChannel extends GenericChannel { this._messageReceived({ type: 'error', slotName: 'test', + param: DEFAULT_PARAM, id: '1', message: 'error' }) diff --git a/test/Transport.test.ts b/test/Transport.test.ts index ee884ac..e49968a 100644 --- a/test/Transport.test.ts +++ b/test/Transport.test.ts @@ -5,8 +5,8 @@ import { TransportMessage } from './../src/Message' import { TestChannel } from './TestChannel' import * as sinon from 'sinon' import { SinonSpy } from 'sinon' -import { createEventBus } from '../src/Events' -import { slot } from '../src/Slot' + +const param = 'param' describe('Transport', () => { @@ -15,7 +15,7 @@ describe('Transport', () => { const channel = new TestChannel() const stubMethods: Array = ['onConnect', 'onDisconnect', 'onData', 'onError'] stubMethods.forEach(method => sinon.stub(channel, method)) - const transport = new Transport(channel) + new Transport(channel) stubMethods.forEach(methodName => it(`should subscribe to its channel\'s ${methodName} method`, () => { const method = channel[methodName] as any as sinon.SinonSpy @@ -43,9 +43,10 @@ describe('Transport', () => { it('should send a handler_registered message for each slot when a local handler is registered', () => { Object.keys(slots).forEach(slotName => { - transport.registerHandler(slotName, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_registered', + param, slotName }).should.be.True() }) @@ -53,10 +54,10 @@ describe('Transport', () => { it('should not send a handler_registered message when an additional local handler is registered', () => { const slotName = 'getCarrotStock' - transport.registerHandler(slotName, slots[slotName][0]) - transport.registerHandler(slotName, slots[slotName][1]) + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) channel.sendSpy - .withArgs({ type: 'handler_registered', slotName }) + .withArgs({ type: 'handler_registered', slotName, param }) .calledOnce // should have been called exactly once .should.be.True() }) @@ -68,12 +69,13 @@ describe('Transport', () => { const handler = slots[slotName][0] // Register handler on slot - transport.registerHandler(slotName, handler) + transport.registerHandler(slotName, param, handler) const request: TransportMessage = { type: 'request', slotName, id: '5', + param, data: { height: 5, constitution: 'strong' @@ -88,6 +90,7 @@ describe('Transport', () => { slotName, type: 'response', id: '5', + param, data: { color: 'blue' } @@ -99,13 +102,14 @@ describe('Transport', () => { const slotName = 'buildCelery' // Register one handler on slot - transport.registerHandler(slotName, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][0]) // Unregister it - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_unregistered', + param, slotName }).should.be.True() }) @@ -116,15 +120,16 @@ describe('Transport', () => { const handler = slots[slotName][0] // Register one handler on slot - transport.registerHandler(slotName, handler) + transport.registerHandler(slotName, param, handler) // Unregister it - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) const request: TransportMessage = { type: 'request', slotName, id: '5', + param, data: { height: 5, constitution: 'strong' @@ -140,11 +145,11 @@ describe('Transport', () => { const slotName = 'getCarrotStock' // Register two handlers on slot - transport.registerHandler(slotName, slots[slotName][0]) - transport.registerHandler(slotName, slots[slotName][1]) + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) // Unregister one handler only - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_unregistered', @@ -163,16 +168,16 @@ describe('Transport', () => { beforeEach(() => { addLocalHandler = sinon.spy() removeLocalHandler = sinon.spy() - transport.onRemoteHandlerRegistered(slotName, addLocalHandler) - channel.fakeReceive({ type: 'handler_registered', slotName }) - localHandler = addLocalHandler.lastCall.args[0] + transport.addRemoteHandlerRegistrationCallback(slotName, addLocalHandler) + channel.fakeReceive({ type: 'handler_registered', slotName, param }) + localHandler = addLocalHandler.lastCall.args[1] }) it('should add a local handler when a remote handler registration is received', () => { addLocalHandler.called.should.be.True() }) - it('should resolve a local pending request when a response is received', () => { + it('should resolve a local pending request when a response is received', async () => { const requestData = { carrotType: 'red' } const pendingPromise = localHandler(requestData) const request = channel.sendSpy.lastCall.args[0] @@ -186,38 +191,41 @@ describe('Transport', () => { type: 'response', id: request.id, slotName, + param, data: responseData }) - return pendingPromise - .then((response: number) => response.should.eql(responseData)) + const response = await pendingPromise + response.should.eql(responseData) }) - it('should reject a local pending request when an error is received', () => { + it('should reject a local pending request when an error is received', async () => { const pendingPromise = localHandler({ carrotType: 'blue' }) - const { id } = channel.sendSpy.lastCall.args[0] + const { id, param } = channel.sendSpy.lastCall.args[0] channel.fakeReceive({ type: 'error', id, + param, slotName, message: 'all out of blue' }) - return pendingPromise - .then(() => { - throw new Error('Promise should have been rejected') - }) - .catch(err => `${err}`.should.eql('Error: all out of blue on getCarrotStock')) + try { + await pendingPromise + throw new Error('Promise should have been rejected') + } + catch (err) { + `${err}`.should.eql('Error: all out of blue on getCarrotStock with param param') + } }) it('should remove a local handler when a remote handler unregistration is received', () => { - transport.onRemoteHandlerUnregistered(slotName, removeLocalHandler) + transport.addRemoteHandlerUnregistrationCallback(slotName, removeLocalHandler) channel.fakeReceive({ type: 'handler_unregistered', + param, slotName }) removeLocalHandler.called.should.be.True() }) }) - }) - }) From f5edbac6b3f1092ceabe16a8195b1ddc5002e179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Tue, 16 Jul 2019 18:21:02 +0200 Subject: [PATCH 02/11] Fix combineEvents and throw for duplicated events. The generic `C[]` was forcing every eventDeclaration to have this same C type, which is not how we intend to use combineEvents. Therefore I am reverting the change, and updating the test so that the case is detected. I am also adding the proposed safeguard and throwing when encountering duplicate events declaration in combineEvents. --- src/Events.ts | 31 +++++++++++++++++++++++++++++-- test/Event.test.ts | 31 +++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/Events.ts b/src/Events.ts index 067bd61..19f95fd 100644 --- a/src/Events.ts +++ b/src/Events.ts @@ -6,8 +6,35 @@ export interface EventDeclaration { [slotName: string]: Slot } -export function combineEvents(...args: C[]): { [P in keyof C]: C[P] } { - // TODO: throw if event buses have duplicate events +export function combineEvents< + C1 extends EventDeclaration, C2 extends EventDeclaration, C3 extends EventDeclaration, + C4 extends EventDeclaration, C5 extends EventDeclaration, C6 extends EventDeclaration, + C7 extends EventDeclaration, C8 extends EventDeclaration, C9 extends EventDeclaration, + C10 extends EventDeclaration, C11 extends EventDeclaration, C12 extends EventDeclaration, + C13 extends EventDeclaration, C14 extends EventDeclaration, C15 extends EventDeclaration, + C16 extends EventDeclaration, C17 extends EventDeclaration, C18 extends EventDeclaration, + C19 extends EventDeclaration, C20 extends EventDeclaration, C21 extends EventDeclaration, + C22 extends EventDeclaration, C23 extends EventDeclaration, C24 extends EventDeclaration + >( + _c1: C1, _c2: C2, _c3?: C3, _c4?: C4, _c5?: C5, _c6?: C6, _c7?: C7, _c8?: C8, + _c9?: C9, _c10?: C10, _c11?: C11, _c12?: C12, _c13?: C13, _c14?: C14, _c15?: C15, + _c16?: C16, _c17?: C17, _c18?: C18, _c19?: C19, _c20?: C20, _c21?: C21, _c22?: C22, + _c23?: C23, _c24?: C24 + ): C1 & C2 & C3 & C4 & C5 & C6 & C7 & C8 & C9 & C10 & C11 & C12 & C13 & C14 & C15 & C16 + & C17 & C18 & C19 & C20 & C21 & C22 & C23 & C24 { + + const args = Array.from(arguments) + + const keys = args.reduce((keys, arg) => { + return [...keys, ...Object.keys(arg)] + }, []) + + const uniqKeys = [...new Set(keys)] + + if (keys.length > uniqKeys.length) { + throw new Error('ts-event-bus: duplicate slots encountered in combineEvents.') + } + return Object.assign({}, ...args) } diff --git a/test/Event.test.ts b/test/Event.test.ts index 326a8a4..2a53613 100644 --- a/test/Event.test.ts +++ b/test/Event.test.ts @@ -10,17 +10,20 @@ import * as sinon from 'sinon' describe('combineEvents()', () => { it('should correctly combine several EventDeclarations', () => { + const helloEvents = { + hello: slot<{ name: string }>() + } + const howAreEvents = { + how: slot<{ mode: 'simple' | 'advanced'}>(), + are: slot<{ tense: number }>() + } + const youEvents = { + you: slot<{ reflective: boolean }>() + } const combined = combineEvents( - { - hello: slot<{ name: string }>() - }, - { - how: slot<{ mode: 'simple' | 'advanced'}>(), - are: slot<{ tense: number }>() - }, - { - you: slot<{ reflective: boolean }>() - } + helloEvents, + howAreEvents, + youEvents ) Object.keys(combined).should.eql(['hello', 'how', 'are', 'you']) @@ -32,6 +35,14 @@ describe('combineEvents()', () => { // combined.you({ reflective: 5 }) }) + it('should throw with duplicate slot declarations', () => { + const helloEvents = { + hello: slot<{ name: string }>() + } + const failing = () => combineEvents(helloEvents, helloEvents) + failing.should.throw(/duplicate slots/) + }) + }) describe('createEventBus()', () => { From e3d13206cdd423be7159c190aa824cbb3532fbf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Tue, 16 Jul 2019 18:27:43 +0200 Subject: [PATCH 03/11] Fix vulnerabilities: mixin-deep, lodash, set-value --- package.json | 4 +++- yarn.lock | 47 +++++++++++++++-------------------------------- 2 files changed, 18 insertions(+), 33 deletions(-) diff --git a/package.json b/package.json index 491e759..58b263f 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,9 @@ ], "resolutions": { "js-yaml": ">=3.13.1", - "lodash": ">=4.17.11" + "lodash": ">=4.17.12", + "mixin-deep": ">=1.3.2 <2.0.0 || >=2.0.1", + "set-value": ">=2.0.1 <3.0.0 || >=3.0.1" }, "license": "Apache-2.0" } diff --git a/yarn.lock b/yarn.lock index a0b6ae2..eb38450 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1530,10 +1530,6 @@ flush-write-stream@^1.0.0: inherits "^2.0.1" readable-stream "^2.0.4" -for-in@^1.0.2: - version "1.0.2" - resolved "https://registry.yarnpkg.com/for-in/-/for-in-1.0.2.tgz#81068d295a8142ec0ac726c6e2200c30fb6d5e80" - fragment-cache@^0.2.1: version "0.2.1" resolved "https://registry.yarnpkg.com/fragment-cache/-/fragment-cache-0.2.1.tgz#4290fad27f13e89be7f33799c6bc5a0abfff0d19" @@ -1955,7 +1951,7 @@ is-number@^3.0.0: dependencies: kind-of "^3.0.2" -is-plain-object@^2.0.1, is-plain-object@^2.0.3, is-plain-object@^2.0.4: +is-plain-object@^2.0.4: version "2.0.4" resolved "https://registry.yarnpkg.com/is-plain-object/-/is-plain-object-2.0.4.tgz#2c163b3fafb1b606d9d17928f05c2a1c38e07677" dependencies: @@ -2095,10 +2091,10 @@ lodash.get@^4.4.2: version "4.4.2" resolved "https://registry.yarnpkg.com/lodash.get/-/lodash.get-4.4.2.tgz#2d177f652fa31e939b4438d5341499dfa3825e99" -lodash@>=4.17.11, lodash@^4.17.4: - version "4.17.11" - resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.11.tgz#b39ea6229ef607ecd89e2c8df12536891cac9b8d" - integrity sha512-cQKh8igo5QUhZ7lg38DYWAxMvjSAKG0A8wGSVimP07SIUEK2UO+arSRKbRZWtelMtN5V0Hkwh5ryOto/SshYIg== +lodash@>=4.17.12, lodash@^4.17.4: + version "4.17.14" + resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.14.tgz#9ce487ae66c96254fe20b599f21b6816028078ba" + integrity sha512-mmKYbW3GLuJeX+iGP+Y7Gp1AiGHGbXHCOh/jZmrawMmsE7MS4znI3RL2FsjbqOyMayHInjOeykW7PEajUk1/xw== lolex@^2.2.0, lolex@^2.3.2: version "2.3.2" @@ -2245,12 +2241,10 @@ mississippi@^2.0.0: stream-each "^1.1.0" through2 "^2.0.0" -mixin-deep@^1.2.0: - version "1.3.1" - resolved "https://registry.yarnpkg.com/mixin-deep/-/mixin-deep-1.3.1.tgz#a49e7268dce1a0d9698e45326c5626df3543d0fe" - dependencies: - for-in "^1.0.2" - is-extendable "^1.0.1" +"mixin-deep@>=1.3.2 <2.0.0 || >=2.0.1", mixin-deep@^1.2.0: + version "2.0.1" + resolved "https://registry.yarnpkg.com/mixin-deep/-/mixin-deep-2.0.1.tgz#9a6946bef4a368401b784970ae3caaaa6bab02fa" + integrity sha512-imbHQNRglyaplMmjBLL3V5R6Bfq5oM+ivds3SKgc6oRtzErEnBUUc5No11Z2pilkUvl42gJvi285xTNswcKCMA== mkdirp@0.5.1, mkdirp@^0.5.0, mkdirp@^0.5.1, mkdirp@~0.5.0: version "0.5.1" @@ -2969,23 +2963,12 @@ set-blocking@^2.0.0, set-blocking@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/set-blocking/-/set-blocking-2.0.0.tgz#045f9782d011ae9a6803ddd382b24392b3d890f7" -set-value@^0.4.3: - version "0.4.3" - resolved "https://registry.yarnpkg.com/set-value/-/set-value-0.4.3.tgz#7db08f9d3d22dc7f78e53af3c3bf4666ecdfccf1" - dependencies: - extend-shallow "^2.0.1" - is-extendable "^0.1.1" - is-plain-object "^2.0.1" - to-object-path "^0.3.0" - -set-value@^2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/set-value/-/set-value-2.0.0.tgz#71ae4a88f0feefbbf52d1ea604f3fb315ebb6274" +"set-value@>=2.0.1 <3.0.0 || >=3.0.1", set-value@^0.4.3, set-value@^2.0.0: + version "3.0.1" + resolved "https://registry.yarnpkg.com/set-value/-/set-value-3.0.1.tgz#52c82af7653ba69eb1db92e81f5cdb32739b9e95" + integrity sha512-w6n3GUPYAWQj4ZyHWzD7K2FnFXHx9OTwJYbWg+6nXjG8sCLfs9DGv+KlqglKIIJx+ks7MlFuwFW2RBPb+8V+xg== dependencies: - extend-shallow "^2.0.1" - is-extendable "^0.1.1" - is-plain-object "^2.0.3" - split-string "^3.0.1" + is-plain-object "^2.0.4" setimmediate@^1.0.4: version "1.0.5" @@ -3160,7 +3143,7 @@ spdx-license-ids@^3.0.0: resolved "https://registry.yarnpkg.com/spdx-license-ids/-/spdx-license-ids-3.0.4.tgz#75ecd1a88de8c184ef015eafb51b5b48bfd11bb1" integrity sha512-7j8LYJLeY/Yb6ACbQ7F76qy5jHkp0U6jgBfJsk97bwWlVUnUWsAgpyaCvo17h0/RQGnQ036tVDomiwoI4pDkQA== -split-string@^3.0.1, split-string@^3.0.2: +split-string@^3.0.2: version "3.1.0" resolved "https://registry.yarnpkg.com/split-string/-/split-string-3.1.0.tgz#7cb09dda3a86585705c64b39a6466038682e8fe2" dependencies: From d649078ed387821a7e0c17213c0f748bb3c38f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Thu, 18 Jul 2019 11:38:01 +0200 Subject: [PATCH 04/11] Put back buffering mechanism and implement noBuffer config option We put back the mechanism that allows to wait for transport to have registered handlers before actually trigger the slots. In the meantime, we add a config option on the slot to bypass this buffering feature when it is not needed. --- README.md | 14 ++++++ src/Events.ts | 3 +- src/Slot.ts | 109 +++++++++++++++++++++++++++++++++++++++------- test/Slot.test.ts | 56 ++++++++++++++++++++---- 4 files changed, 157 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 4bcbcca..0a44bb7 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,20 @@ unsubscribe() disconnectLazy() ``` +### Buffering + +When the eventBus is created with channels, slots will wait for all transports to have +registered callbacks before triggering. + +This buffering mechanism can be disabled at the slot level with the `noBuffer` config option: + +```typescript +const MyEvents = { + willWait: slot(), + wontWait: slot({ noBuffer: true }), +} +``` + ### Syntactic sugar You can combine events from different sources. diff --git a/src/Events.ts b/src/Events.ts index 19f95fd..3da65c9 100644 --- a/src/Events.ts +++ b/src/Events.ts @@ -42,7 +42,8 @@ export function createEventBus(args: { events: C, ch const transports = (args.channels || []).map(c => new Transport(c)) return Object.keys(args.events) .reduce((conn: C, slotName) => { - conn[slotName] = connectSlot(slotName, transports as Transport[]) + const config = args.events[slotName].config + conn[slotName] = connectSlot(slotName, transports as Transport[], config) return conn }, {} as any) } diff --git a/src/Slot.ts b/src/Slot.ts index ddc9bf8..307dc9d 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -35,6 +35,12 @@ const findAllUsedParams = (handlers: Handlers): string[] => return paramsUniq }, [] as string[]) +interface SlotConfig { + // This option will prevent the slot from buffering the + // requests if no remote handlers are set for some transports. + noBuffer?: boolean +} + /** * Represents an event shared by two modules. * @@ -61,6 +67,9 @@ export interface Slot { // Provide the slot with lazy callbacks lazy(connect: LazyCallback, disconnect: LazyCallback): Unsubscribe, + + // Retreive slot configuration + config?: SlotConfig } /** @@ -68,13 +77,16 @@ export interface Slot { * It returns a fake slot, that will throw if triggered or subscribed to. * Slots need to be connected in order to be functional. */ -export function slot(): Slot { - return FAKE_SLOT +export function slot( + config: SlotConfig = { noBuffer: false } +): Slot { + return Object.assign(FAKE_SLOT, config) } export function connectSlot( slotName: string, - transports: Transport[] + transports: Transport[], + config: SlotConfig = {} ): Slot { /* @@ -84,14 +96,47 @@ export function connectSlot( */ // These will be all the handlers for this slot, for each transport, for each param - const handlers: Handlers = Object.assign( - // local handlers are stored under LOCAL_TRANSPORT key - { [LOCAL_TRANSPORT]: {} }, - - // transport handlers are stored by index - transports.map(_t => ({})) + const handlers: Handlers = transports.reduce( + (acc, _t, ix) => ({ ...acc, [ix]: {} }), + { [LOCAL_TRANSPORT]: {} } ) + // For each transport we create a Promise that will be fulfilled only + // when the far-end has registered a handler. + // This prevents `triggers` from firing *before* any far-end is listening. + interface HandlerConnected { + registered: Promise + onRegister: () => void + } + + interface RemoteHandlersConnected { + [transportKey: string]: { + [param: string]: HandlerConnected + } + } + + const remoteHandlersConnected: RemoteHandlersConnected = + transports.reduce((acc, _t, transportKey) => + ({ ...acc, [transportKey]: {} }), + {} + ) + + const awaitHandlerRegistration = ( + transportKey: string, + param: string + ) => { + let onHandlerRegistered = () => { } + + const remoteHandlerRegistered = new Promise( + resolve => onHandlerRegistered = resolve + ) + + remoteHandlersConnected[transportKey][param] = { + registered: remoteHandlerRegistered, + onRegister: onHandlerRegistered + } + } + // Lazy callbacks const lazyConnectCallbacks: LazyCallback[] = [] const lazyDisonnectCallbacks: LazyCallback[] = [] @@ -106,22 +151,33 @@ export function connectSlot( transports.forEach((transport, transportKey) => { const remoteHandlerRegistered = ( - param: string, + param = DEFAULT_PARAM, handler: Handler ) => { + // Store handler const paramHandlers = handlers[transportKey][param] || [] handlers[transportKey][param] = paramHandlers.concat(handler) + + // Call lazy callbacks if needed if (getParamHandlers(param, handlers).length === 1) callLazyConnectCallbacks(param) + + // Release potential buffered events + if (!remoteHandlersConnected[transportKey][param]) { + awaitHandlerRegistration(String(transportKey), param) + } + + remoteHandlersConnected[transportKey][param].onRegister() } const remoteHandlerUnregistered = ( - param: string, + param = DEFAULT_PARAM, handler: Handler ) => { const paramHandlers = handlers[transportKey][param] || [] const handlerIndex = paramHandlers.indexOf(handler) if (handlerIndex > -1) handlers[transportKey][param].splice(handlerIndex, 1) if (getParamHandlers(param, handlers).length === 0) callLazyDisonnectCallbacks(param) + awaitHandlerRegistration(String(transportKey), param) } transport.addRemoteHandlerRegistrationCallback(slotName, remoteHandlerRegistered) @@ -145,11 +201,34 @@ export function connectSlot( function trigger(param: string, data: any): Promise // Combined signatures - function trigger(firstArg: string | any, secondArg?: any): Promise { + function trigger(firstArg: string | any, secondArg?: any) { const data: any = secondArg || firstArg const param: string = secondArg ? firstArg : DEFAULT_PARAM - const allHandlers = getParamHandlers(param, handlers) - return callHandlers(data, allHandlers) + + if (config.noBuffer || transports.length === 0) { + const allParamHandlers = getParamHandlers(param, handlers) + return callHandlers(data, allParamHandlers) + } + + else { + transports.forEach((_t, transportKey) => { + if (!remoteHandlersConnected[transportKey][param]) { + awaitHandlerRegistration(String(transportKey), param) + } + }) + + const transportPromises: Promise[] = transports.reduce( + (acc, _t, transportKey) => [ + ...acc, + remoteHandlersConnected[transportKey][param].registered + ], [] + ) + + return Promise.all(transportPromises).then(() => { + const allParamHandlers = getParamHandlers(param, handlers) + return callHandlers(data, allParamHandlers) + }) + } } /* @@ -244,5 +323,5 @@ export function connectSlot( } } - return Object.assign(trigger, { on, lazy }) + return Object.assign(trigger, { on, lazy, config }) } diff --git a/test/Slot.test.ts b/test/Slot.test.ts index 813428f..ad8ba3e 100644 --- a/test/Slot.test.ts +++ b/test/Slot.test.ts @@ -5,6 +5,7 @@ import { connectSlot } from './../src/Slot' import { TestChannel } from './TestChannel' import { Transport } from './../src/Transport' import { DEFAULT_PARAM } from './../src/Constants' +import { TransportRegistrationMessage } from './../src/Message' const makeTestTransport = () => { const channel = new TestChannel() @@ -121,9 +122,8 @@ describe('connectSlot', () => { context('with local and remote handlers', () => { - // TODO: feature to be dicussed - it.skip('should call both local handlers and remote handlers', async () => { - const {channel, transport} = makeTestTransport() + it('should call both local handlers and remote handlers', async () => { + const { channel, transport } = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport]) let localCalled = false broadcastBool.on(_b => { localCalled = true }) @@ -132,7 +132,12 @@ describe('connectSlot', () => { // Handlers should not be called until a remote handler is registered await Promise.resolve() localCalled.should.be.False() - channel.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool', param: DEFAULT_PARAM}) + + channel.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered' + }) // setTimeout(0) to yield control to ts-event-bus internals, // so that the call to handlers can be processed @@ -141,23 +146,56 @@ describe('connectSlot', () => { // Once a remote handler is registered, both local and remote should be called localCalled.should.be.True() const request = channel.sendSpy.lastCall.args[0] + request.should.match({ - type: 'request', + data: true, + param: DEFAULT_PARAM, slotName: 'broadcastBool', - data: true + type: 'request' }) // triggerPromise should resolve once a remote response is received channel.fakeReceive({ - type: 'response', + data: null, id: request.id, + param: DEFAULT_PARAM, slotName: 'broadcastBool', - data: null, - param: DEFAULT_PARAM + type: 'response' }) await triggerPromise }) + describe('noBuffer', () => { + it('should call local handlers even if no remote handler is registered', async () => { + const { channel, transport } = makeTestTransport() + const broadcastBool = connectSlot( + 'broadcastBool', + [transport], + { noBuffer: true } + ) + let localCalled = false + broadcastBool.on(_b => { localCalled = true }) + broadcastBool(true) + + // We should have called the trigger + localCalled.should.be.True() + + const registrationMessage: TransportRegistrationMessage = { + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered' + } + + channel.fakeReceive(registrationMessage) + await new Promise(resolve => setTimeout(resolve, 0)) + + // Remote should not have been called, as it was not registered + // at the time of the trigger. + const request = channel.sendSpy.lastCall.args[0] + request.should.match(registrationMessage) + }) + }) + describe('lazy', () => { it('should call connect and disconnect', () => { const param = 'param' From 30648fa617b13ad88762a6d0cc6ef7574b736e82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Thu, 18 Jul 2019 12:07:08 +0200 Subject: [PATCH 05/11] Renaming FAKE_SLOT -> notConnectedSlot for more clarity --- src/Slot.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Slot.ts b/src/Slot.ts index 307dc9d..3988d54 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -4,8 +4,8 @@ import { DEFAULT_PARAM } from './Constants' const signalNotConnected = () => { throw new Error('Slot not connected') } -const FAKE_SLOT: any = () => signalNotConnected() -FAKE_SLOT.on = signalNotConnected +const notConnectedSlot: any = () => signalNotConnected() +notConnectedSlot.on = signalNotConnected export type LazyCallback = (param: string) => void export type Unsubscribe = () => void @@ -80,7 +80,7 @@ export interface Slot { export function slot( config: SlotConfig = { noBuffer: false } ): Slot { - return Object.assign(FAKE_SLOT, config) + return Object.assign(notConnectedSlot, config) } export function connectSlot( From 0a145b2da5a2cd1a72b9dd9fe5062b802e5e2c20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Thu, 18 Jul 2019 18:08:24 +0200 Subject: [PATCH 06/11] Fix trigger optional param detection --- src/Slot.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Slot.ts b/src/Slot.ts index 3988d54..9340dec 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -202,8 +202,9 @@ export function connectSlot( // Combined signatures function trigger(firstArg: string | any, secondArg?: any) { - const data: any = secondArg || firstArg - const param: string = secondArg ? firstArg : DEFAULT_PARAM + const paramUsed = arguments.length === 2 + const data: any = paramUsed ? secondArg : firstArg + const param: string = paramUsed ? firstArg : DEFAULT_PARAM if (config.noBuffer || transports.length === 0) { const allParamHandlers = getParamHandlers(param, handlers) From 566c09b73e0c7b31d22f3e340b012d034ac97f77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Mon, 22 Jul 2019 11:29:54 +0200 Subject: [PATCH 07/11] Send unregistration message when removing the last local handler only --- src/Transport.ts | 21 +++++++++++++-------- test/Transport.test.ts | 11 +++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index 418fb0f..94d6fa8 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -356,14 +356,19 @@ export class Transport { const ix = slotLocalHandlers[param].indexOf(handler) if (ix > -1) { slotLocalHandlers[param].splice(ix, 1) - // TODO: shouldn't we send the message only when the last handler is removed? - const unregistrationMessage: TransportUnregistrationMessage = { - type: 'handler_unregistered', - param, - slotName - } - if (this._channelReady) { - this._channel.send(unregistrationMessage) + /** + * We notify the far end when removing the last handler only, as they + * only need to know if at least one handler is connected. + */ + if (slotLocalHandlers[param].length === 0) { + const unregistrationMessage: TransportUnregistrationMessage = { + type: 'handler_unregistered', + param, + slotName + } + if (this._channelReady) { + this._channel.send(unregistrationMessage) + } } } } diff --git a/test/Transport.test.ts b/test/Transport.test.ts index e49968a..ca99614 100644 --- a/test/Transport.test.ts +++ b/test/Transport.test.ts @@ -62,6 +62,17 @@ describe('Transport', () => { .should.be.True() }) + it('should not send a handler_unregistered message when an additional local handler is unregistered', () => { + const slotName = 'getCarrotStock' + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) + transport.unregisterHandler(slotName, param, slots[slotName][1]) + channel.sendSpy + .withArgs({ type: 'handler_unregistered', slotName, param }) + .called + .should.be.False() + }) + it('should call the appropriate handler when a request is received', async () => { From 776f0ca0d490021b0b50a3ac4c8442e53bf6a88e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Mon, 22 Jul 2019 11:37:36 +0200 Subject: [PATCH 08/11] Remove useless TODO check --- src/Transport.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport.ts b/src/Transport.ts index 94d6fa8..fc55889 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -182,7 +182,7 @@ export class Transport { if (!slotRequests || !slotRequests[param] || !slotRequests[param][id]) return const error = new Error(`${message} on ${slotName} with param ${param}`) - error.stack = stack || error.stack // TODO: stack is not mutable on edge, check? + error.stack = stack || error.stack this._pendingRequests[slotName][param][id].reject(error) delete this._pendingRequests[slotName][param][id] } From debdd1ece6fabe274a8193d44cfa2e6d69e514cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Tue, 6 Aug 2019 15:05:53 +0200 Subject: [PATCH 09/11] Fix unregisterAllRemoteHandlers syntax and test, remove duplicate onData case --- src/Transport.ts | 7 +------ test/Transport.test.ts | 11 +++++++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index fc55889..64e86aa 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -90,8 +90,6 @@ export class Transport { return this._responseReceived(message) case 'handler_registered': return this._registerRemoteHandler(message) - case 'handler_registered': - return this._registerRemoteHandler(message) case 'handler_unregistered': return this._unregisterRemoteHandler(message) case 'error': @@ -267,10 +265,7 @@ export class Transport { const slotHandlers = this._remoteHandlers[slotName] if (!slotHandlers) return - const params = Object.keys(slotHandlers).filter(param => - (slotHandlers[param] || []).length > 0 - ) - + const params = Object.keys(slotHandlers).filter(param => slotHandlers[param]) params.forEach(param => this._unregisterRemoteHandler({ slotName, param })) }) } diff --git a/test/Transport.test.ts b/test/Transport.test.ts index ca99614..b51f20b 100644 --- a/test/Transport.test.ts +++ b/test/Transport.test.ts @@ -237,6 +237,17 @@ describe('Transport', () => { }) removeLocalHandler.called.should.be.True() }) + + it('should unregister all remote handlers when channel gets disconnected', () => { + // Add remote handler unregistration callback + transport.addRemoteHandlerUnregistrationCallback(slotName, removeLocalHandler) + + // Disconnect channel + channel.callDisconnected() + + // Callback should have been called + removeLocalHandler.called.should.be.True() + }) }) }) }) From 11e22527f4a318962e43ca2032d55e6e8c5a8185 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Thu, 22 Aug 2019 16:11:28 +0200 Subject: [PATCH 10/11] Add `lazy` to `notConnectedSlot` as `signalNotConnected` --- src/Slot.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Slot.ts b/src/Slot.ts index 9340dec..d809ec0 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -4,8 +4,13 @@ import { DEFAULT_PARAM } from './Constants' const signalNotConnected = () => { throw new Error('Slot not connected') } -const notConnectedSlot: any = () => signalNotConnected() -notConnectedSlot.on = signalNotConnected +const notConnectedSlot: Slot = Object.assign( + () => signalNotConnected(), + { + on: signalNotConnected, + lazy: signalNotConnected + } +) export type LazyCallback = (param: string) => void export type Unsubscribe = () => void From 144293e5b81a089cabb562fc91b6a5c67158a7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Herv=C3=A9?= Date: Fri, 23 Aug 2019 11:38:48 +0200 Subject: [PATCH 11/11] Improve pendingRequests and localHandlers access --- src/Transport.ts | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Transport.ts b/src/Transport.ts index 64e86aa..9825ba4 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -210,13 +210,10 @@ export class Transport { // Keep a reference to the pending promise's // resolution and rejection callbacks - if (!this._pendingRequests[slotName]) { - this._pendingRequests[slotName] = {} - } - if (!this._pendingRequests[slotName][param]) { - this._pendingRequests[slotName][param] = {} - } const id = getId() + + this._pendingRequests[slotName] = this._pendingRequests[slotName] || {} + this._pendingRequests[slotName][param] = this._pendingRequests[slotName][param] || {} this._pendingRequests[slotName][param][id] = { resolve, reject } // Send a request to the far end @@ -230,7 +227,9 @@ export class Transport { // Handle request timeout if needed setTimeout(() => { - const request = this._pendingRequests[slotName][param][id] + const slotHandlers = this._pendingRequests[slotName] || {} + const paramHandlers = slotHandlers[param] || {} + const request = paramHandlers[id] if (request) { const error = new Error(`${ERRORS.TIMED_OUT} on ${slotName} with param ${param}`) request.reject(error) @@ -308,12 +307,9 @@ export class Transport { param: string, handler: Handler ): void { - if (!this._localHandlers[slotName]) { - this._localHandlers[slotName] = {} - } - if (!this._localHandlers[slotName][param]) { - this._localHandlers[slotName][param] = [] - } + + this._localHandlers[slotName] = this._localHandlers[slotName] || {} + this._localHandlers[slotName][param] = this._localHandlers[slotName][param] || [] this._localHandlers[slotName][param].push(handler) /**