diff --git a/README.md b/README.md index 4bc73cd..0a44bb7 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,15 @@ Once connected, the clients can start by using the slots on the event bus // firstModule.ts import EventBus from './firstModule.EventBus.ts' +// Slots can be called with a parameter, here 'michel' +EventBus.say('michel', 'Hello') + +// Or one can rely on the default parameter: here DEFAULT_PARAMETER +// is implicitely used. +EventBus.say('Hello') + // Triggering an event always returns a promise -EventBus.sayHello('michel').then(() => { +EventBus.say('michel', 'Hello').then(() => { ... }) @@ -88,12 +95,14 @@ EventBus.ping() // secondModule.ts import EventBus from './secondModule.EventBus.ts' -EventBus.ping().on(() => { +// Add a listener on the default parameter +EventBus.ping.on(() => { console.log('pong') }) -EventBus.sayHello.on(name => { - console.log(`${name} said hello!`) +// Or listen to a specific parameter +EventBus.say.on('michel', (words) => { + console.log('michel said', words) }) // Event subscribers can respond to the event synchronously (by returning a value) @@ -130,27 +139,47 @@ Remote or local clients are considered equally. If a client was already connecte at the time when `lazy` is called, the "connect" callback is called immediately. ```typescript -const connect = () => { - console.log('Someone somewhere has begun listening to the slot with `.on`.') +const connect = (param) => { + console.log(`Someone somewhere has begun listening to the slot with .on on ${param}.`) } -const disconnect = () => { - console.log('No one is listening to the slot anymore.') +const disconnect = (param) => { + console.log(`No one is listening to the slot anymore on ${param}.`) } const disconnectLazy = EventBus.ping.lazy(connect, disconnect) const unsubscribe = EventBus.ping().on(() => { }) -// console output: 'Someone somewhere has begun listening to the slot with `.on`.' +// console output: 'Someone somewhere has begun listening to the slot with .on on $_DEFAULT_$.' unsubscribe() -// console output: 'No one is listening to the slot anymore.' +// console output: 'No one is listening to the slot anymore on $_DEFAULT_$.' + +const unsubscribe = EventBus.ping().on('parameter', () => { }) +// console output: 'Someone somewhere has begun listening to the slot with .on on parameter.' + +unsubscribe() +// console output: 'No one is listening to the slot anymore on parameter.' // Remove the callbacks. // "disconnect" is called one last time if there were subscribers left on the slot. disconnectLazy() ``` +### Buffering + +When the eventBus is created with channels, slots will wait for all transports to have +registered callbacks before triggering. + +This buffering mechanism can be disabled at the slot level with the `noBuffer` config option: + +```typescript +const MyEvents = { + willWait: slot(), + wontWait: slot({ noBuffer: true }), +} +``` + ### Syntactic sugar You can combine events from different sources. diff --git a/package.json b/package.json index 491e759..58b263f 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,9 @@ ], "resolutions": { "js-yaml": ">=3.13.1", - "lodash": ">=4.17.11" + "lodash": ">=4.17.12", + "mixin-deep": ">=1.3.2 <2.0.0 || >=2.0.1", + "set-value": ">=2.0.1 <3.0.0 || >=3.0.1" }, "license": "Apache-2.0" } diff --git a/src/Constants.ts b/src/Constants.ts new file mode 100644 index 0000000..cca35da --- /dev/null +++ b/src/Constants.ts @@ -0,0 +1 @@ +export const DEFAULT_PARAM = '$_DEFAULT_$' diff --git a/src/Events.ts b/src/Events.ts index 067bd61..3da65c9 100644 --- a/src/Events.ts +++ b/src/Events.ts @@ -6,8 +6,35 @@ export interface EventDeclaration { [slotName: string]: Slot } -export function combineEvents(...args: C[]): { [P in keyof C]: C[P] } { - // TODO: throw if event buses have duplicate events +export function combineEvents< + C1 extends EventDeclaration, C2 extends EventDeclaration, C3 extends EventDeclaration, + C4 extends EventDeclaration, C5 extends EventDeclaration, C6 extends EventDeclaration, + C7 extends EventDeclaration, C8 extends EventDeclaration, C9 extends EventDeclaration, + C10 extends EventDeclaration, C11 extends EventDeclaration, C12 extends EventDeclaration, + C13 extends EventDeclaration, C14 extends EventDeclaration, C15 extends EventDeclaration, + C16 extends EventDeclaration, C17 extends EventDeclaration, C18 extends EventDeclaration, + C19 extends EventDeclaration, C20 extends EventDeclaration, C21 extends EventDeclaration, + C22 extends EventDeclaration, C23 extends EventDeclaration, C24 extends EventDeclaration + >( + _c1: C1, _c2: C2, _c3?: C3, _c4?: C4, _c5?: C5, _c6?: C6, _c7?: C7, _c8?: C8, + _c9?: C9, _c10?: C10, _c11?: C11, _c12?: C12, _c13?: C13, _c14?: C14, _c15?: C15, + _c16?: C16, _c17?: C17, _c18?: C18, _c19?: C19, _c20?: C20, _c21?: C21, _c22?: C22, + _c23?: C23, _c24?: C24 + ): C1 & C2 & C3 & C4 & C5 & C6 & C7 & C8 & C9 & C10 & C11 & C12 & C13 & C14 & C15 & C16 + & C17 & C18 & C19 & C20 & C21 & C22 & C23 & C24 { + + const args = Array.from(arguments) + + const keys = args.reduce((keys, arg) => { + return [...keys, ...Object.keys(arg)] + }, []) + + const uniqKeys = [...new Set(keys)] + + if (keys.length > uniqKeys.length) { + throw new Error('ts-event-bus: duplicate slots encountered in combineEvents.') + } + return Object.assign({}, ...args) } @@ -15,7 +42,8 @@ export function createEventBus(args: { events: C, ch const transports = (args.channels || []).map(c => new Transport(c)) return Object.keys(args.events) .reduce((conn: C, slotName) => { - conn[slotName] = connectSlot(slotName, transports as Transport[]) + const config = args.events[slotName].config + conn[slotName] = connectSlot(slotName, transports as Transport[], config) return conn }, {} as any) } diff --git a/src/Handler.ts b/src/Handler.ts index 74d7c7e..448fab7 100644 --- a/src/Handler.ts +++ b/src/Handler.ts @@ -38,7 +38,7 @@ function callOneHandler(h: Handler, data: any): Promise { * of all these handlers is discarded. */ export function callHandlers(data: any, handlers: Handler[]): Promise { - if (handlers.length === 0) { + if (!handlers || handlers.length === 0) { // No one is listening return new Promise(resolve => { /* NOOP, this promise will never resolve */ }) } else if (handlers.length === 1) { diff --git a/src/Message.ts b/src/Message.ts index 2a0bf0b..c2115b2 100644 --- a/src/Message.ts +++ b/src/Message.ts @@ -1,14 +1,44 @@ -export type TransportRequest = { type: 'request', slotName: string, id: string, data: any } -export type TransportResponse = { type: 'response', slotName: string, id: string, data: any } -export type TransportError = { type: 'error', slotName: string, id: string, message: string, stack?: string } -export type TransportRegistrationMessage = { type: 'handler_registered', slotName: string } -export type TransportUnregistrationMessage = { type: 'handler_unregistered', slotName: string } +export type TransportRequest = { + type: 'request', + slotName: string, + id: string, + data: any, + param: string +} + +export type TransportResponse = { + type: 'response', + slotName: string, + id: string, + data: any, + param: string +} + +export type TransportError = { + id: string, + message: string, + param: string, + slotName: string, + stack?: string, + type: 'error' +} + +export type TransportRegistrationMessage = { + type: 'handler_registered', + slotName: string, + param: string +} +export type TransportUnregistrationMessage = { + type: 'handler_unregistered', + slotName: string, + param: string +} export type TransportMessage = - TransportRegistrationMessage - | TransportUnregistrationMessage + TransportError + | TransportRegistrationMessage | TransportRequest | TransportResponse - | TransportError + | TransportUnregistrationMessage export function isTransportMessage(m: { type: string }): m is TransportMessage { switch (m.type) { diff --git a/src/Slot.ts b/src/Slot.ts index 70555b3..d809ec0 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -1,14 +1,51 @@ import { Transport } from './Transport' import { Handler, callHandlers } from './Handler' +import { DEFAULT_PARAM } from './Constants' const signalNotConnected = () => { throw new Error('Slot not connected') } -const FAKE_SLOT: any = () => signalNotConnected() -FAKE_SLOT.on = signalNotConnected +const notConnectedSlot: Slot = Object.assign( + () => signalNotConnected(), + { + on: signalNotConnected, + lazy: signalNotConnected + } +) -export type SimpleCallback = () => void +export type LazyCallback = (param: string) => void export type Unsubscribe = () => void +// Key to store local handlers in the `handlers` map +const LOCAL_TRANSPORT = 'LOCAL_TRANSPORT' + +// Type to store handlers, by transport, by param +type TransportHandlers = { [param: string]: Handler[] } +type Handlers = { [handlerKey: string]: TransportHandlers } + +// Find handlers for given param accross transports +const getParamHandlers = (param: string, handlers: Handlers): Handler[] => + Object.keys(handlers).reduce((paramHandlers, transportKey) => { + return paramHandlers.concat(handlers[transportKey][param] || []) + }, [] as Handler[]) + +// Find all params with registered callbacks +const findAllUsedParams = (handlers: Handlers): string[] => + Object.keys(handlers).reduce((params, transportKey) => { + const transportHandlers = handlers[transportKey] + const registeredParams = Object.keys(transportHandlers).filter( + param => (transportHandlers[param] || []).length > 0 + ) + const paramsMaybeDuplicate = [...params, ...registeredParams] + const paramsUniq = [...new Set(paramsMaybeDuplicate)] + return paramsUniq + }, [] as string[]) + +interface SlotConfig { + // This option will prevent the slot from buffering the + // requests if no remote handlers are set for some transports. + noBuffer?: boolean +} + /** * Represents an event shared by two modules. * @@ -18,13 +55,26 @@ export type Unsubscribe = () => void * The slot can also be subscribed to, by using the `on` property. */ export interface Slot { - - // Make the Slot callable: this is how an event is triggered // TODO: Find a solution to make it possible to omit the requestData as // optional only when explicitly typed as such by the client. - (requestData: RequestData): Promise - on: (handler: Handler) => Unsubscribe - lazy: (connect: () => void, disconnect: () => void) => Unsubscribe + + // Trigger the slot with explicit param + (param: string, requestData: RequestData): Promise, + + // Trigger the slot with default param + (requestData: RequestData): Promise, + + // Listen to events sent through the slot on explicit param + on(param: string, handler: Handler): Unsubscribe, + + // Listen to events sent through the slot on default param + on(handler: Handler): Unsubscribe + + // Provide the slot with lazy callbacks + lazy(connect: LazyCallback, disconnect: LazyCallback): Unsubscribe, + + // Retreive slot configuration + config?: SlotConfig } /** @@ -32,92 +82,183 @@ export interface Slot { * It returns a fake slot, that will throw if triggered or subscribed to. * Slots need to be connected in order to be functional. */ -export function slot(): Slot { - return FAKE_SLOT +export function slot( + config: SlotConfig = { noBuffer: false } +): Slot { + return Object.assign(notConnectedSlot, config) } -export function connectSlot(slotName: string, transports: Transport[]): Slot { +export function connectSlot( + slotName: string, + transports: Transport[], + config: SlotConfig = {} +): Slot { - // These will be all the handlers for this slot (eg. all the callbacks registered with `Slot.on()`) - const handlers = [] as Handler[] + /* + * ======================== + * Internals + * ======================== + */ + + // These will be all the handlers for this slot, for each transport, for each param + const handlers: Handlers = transports.reduce( + (acc, _t, ix) => ({ ...acc, [ix]: {} }), + { [LOCAL_TRANSPORT]: {} } + ) // For each transport we create a Promise that will be fulfilled only // when the far-end has registered a handler. // This prevents `triggers` from firing *before* any far-end is listening. - let remoteHandlersConnected = [] as Promise[] + interface HandlerConnected { + registered: Promise + onRegister: () => void + } - // Lazy - const lazyConnectCallbacks: SimpleCallback[] = [] - const lazyDisonnectCallbacks: SimpleCallback[] = [] + interface RemoteHandlersConnected { + [transportKey: string]: { + [param: string]: HandlerConnected + } + } - const callLazyConnectCallbacks = () => lazyConnectCallbacks.forEach(c => c()) - const callLazyDisonnectCallbacks = () => lazyDisonnectCallbacks.forEach(c => c()) + const remoteHandlersConnected: RemoteHandlersConnected = + transports.reduce((acc, _t, transportKey) => + ({ ...acc, [transportKey]: {} }), + {} + ) + + const awaitHandlerRegistration = ( + transportKey: string, + param: string + ) => { + let onHandlerRegistered = () => { } + + const remoteHandlerRegistered = new Promise( + resolve => onHandlerRegistered = resolve + ) + + remoteHandlersConnected[transportKey][param] = { + registered: remoteHandlerRegistered, + onRegister: onHandlerRegistered + } + } + + // Lazy callbacks + const lazyConnectCallbacks: LazyCallback[] = [] + const lazyDisonnectCallbacks: LazyCallback[] = [] + + const callLazyConnectCallbacks = (param: string) => + lazyConnectCallbacks.forEach(c => c(param)) + + const callLazyDisonnectCallbacks = (param: string) => + lazyDisonnectCallbacks.forEach(c => c(param)) // Signal to all transports that we will accept handlers for this slotName - transports.forEach(t => { + transports.forEach((transport, transportKey) => { + + const remoteHandlerRegistered = ( + param = DEFAULT_PARAM, + handler: Handler + ) => { + // Store handler + const paramHandlers = handlers[transportKey][param] || [] + handlers[transportKey][param] = paramHandlers.concat(handler) + + // Call lazy callbacks if needed + if (getParamHandlers(param, handlers).length === 1) callLazyConnectCallbacks(param) + + // Release potential buffered events + if (!remoteHandlersConnected[transportKey][param]) { + awaitHandlerRegistration(String(transportKey), param) + } - // Variable holds the promise's `resolve` function. A little hack - // allowing us to have the notion of "deferred" promise fulfillment. - let onHandlerRegistered: Function - let remoteHandlerPromise: Promise + remoteHandlersConnected[transportKey][param].onRegister() + } - const awaitHandlerRegistration = () => { - // We store a reference to this promise to be resolved once the far-end has responded. - remoteHandlerPromise = new Promise(resolve => onHandlerRegistered = resolve) - remoteHandlersConnected.push(remoteHandlerPromise) + const remoteHandlerUnregistered = ( + param = DEFAULT_PARAM, + handler: Handler + ) => { + const paramHandlers = handlers[transportKey][param] || [] + const handlerIndex = paramHandlers.indexOf(handler) + if (handlerIndex > -1) handlers[transportKey][param].splice(handlerIndex, 1) + if (getParamHandlers(param, handlers).length === 0) callLazyDisonnectCallbacks(param) + awaitHandlerRegistration(String(transportKey), param) } - awaitHandlerRegistration() + transport.addRemoteHandlerRegistrationCallback(slotName, remoteHandlerRegistered) + transport.addRemoteHandlerUnregistrationCallback(slotName, remoteHandlerUnregistered) + }) - t.onRemoteHandlerRegistered(slotName, (handler: Handler) => { - handlers.push(handler) - if (handlers.length === 1) callLazyConnectCallbacks() + /* + * ======================== + * API + * ======================== + */ - // We signal that the transport is ready for this slot by resolving the - // promise stored in `remoteHandlersConnected`. - onHandlerRegistered() - }) + /* + * Sends data through the slot. + */ - t.onRemoteHandlerUnregistered(slotName, handler => { - const handlerIndex = handlers.indexOf(handler) - handlers.splice(handlerIndex, 1) - if (handlers.length === 0) callLazyDisonnectCallbacks() + // Signature for Slot() using default param + function trigger(data: any): Promise - // When the channel disconnects we also need to remove the - // promise blocking the trigger. - remoteHandlersConnected.splice(remoteHandlersConnected.indexOf(remoteHandlerPromise), 1) + // Signature for Slot(, ) + function trigger(param: string, data: any): Promise - // And also insert a new promise that will be re-fulfilled when - // remote handlers are re-registered. - awaitHandlerRegistration() - }) - }) + // Combined signatures + function trigger(firstArg: string | any, secondArg?: any) { + const paramUsed = arguments.length === 2 + const data: any = paramUsed ? secondArg : firstArg + const param: string = paramUsed ? firstArg : DEFAULT_PARAM + if (config.noBuffer || transports.length === 0) { + const allParamHandlers = getParamHandlers(param, handlers) + return callHandlers(data, allParamHandlers) + } - // Called when a client triggers (calls) the slot - // Before calling the handler, we also check that all the declared transports - // are ready to answer to the request. - // If no transports were declared, call directly the handlers. - const trigger: any = (data: any) => (transports.length) ? - Promise.all(remoteHandlersConnected).then(() => callHandlers(data, handlers)) : - callHandlers(data, handlers) + else { + transports.forEach((_t, transportKey) => { + if (!remoteHandlersConnected[transportKey][param]) { + awaitHandlerRegistration(String(transportKey), param) + } + }) + + const transportPromises: Promise[] = transports.reduce( + (acc, _t, transportKey) => [ + ...acc, + remoteHandlersConnected[transportKey][param].registered + ], [] + ) + + return Promise.all(transportPromises).then(() => { + const allParamHandlers = getParamHandlers(param, handlers) + return callHandlers(data, allParamHandlers) + }) + } + } - trigger.lazy = ( - firstClientConnectCallback: SimpleCallback, - lastClientDisconnectCallback: SimpleCallback - ): Unsubscribe => { + /* + * Allows a client to be notified when a first + * client connects to the slot with `.on`, and when the + * last client disconnects from it. + */ + + function lazy( + firstClientConnectCallback: LazyCallback, + lastClientDisconnectCallback: LazyCallback + ): Unsubscribe { lazyConnectCallbacks.push(firstClientConnectCallback) lazyDisonnectCallbacks.push(lastClientDisconnectCallback) - // Call connect callback - if (handlers.length > 0) firstClientConnectCallback() + // Call connect callback immediately if handlers were already registered + findAllUsedParams(handlers).forEach(firstClientConnectCallback) return () => { // Call disconnect callback - if (handlers.length > 0) lastClientDisconnectCallback() + findAllUsedParams(handlers).forEach(lastClientDisconnectCallback) - // Stop lazy connect and disconnect process + // Stop lazy connect and disconnect processes const connectIx = lazyConnectCallbacks.indexOf(firstClientConnectCallback) if (connectIx > -1) lazyConnectCallbacks.splice(connectIx, 1) @@ -126,34 +267,67 @@ export function connectSlot(slotName: string, transports: Trans } } - // Called when a client subscribes to the slot (with `Slot.on()`) - trigger.on = (handler: Handler): Unsubscribe => { + /* + * Allows a client to be notified when someone + * sends data through the slot. + */ + + // Signature for Slot.on() using default param + function on( + handler: Handler + ): Unsubscribe + + // Signature for Slot.on(, ) + function on( + param: string, + handler: Handler + ): Unsubscribe + + // Combined signatures + function on( + paramOrHandler: string | Handler, + handlerIfParam?: Handler + ): Unsubscribe { + + // Get param and handler from arguments, depending if param was passed or not + let param = "" + let handler: Handler = () => new Promise(r => r()) + + if (typeof paramOrHandler === 'string') { + param = paramOrHandler + handler = handlerIfParam || handler + } + else { + param = DEFAULT_PARAM + handler = paramOrHandler + } // Register a remote handler with all of our remote transports - transports.forEach(t => t.registerHandler(slotName, handler)) + transports.forEach(t => t.registerHandler(slotName, param, handler)) // Store this handler - handlers.push(handler) + handlers[LOCAL_TRANSPORT][param] = + (handlers[LOCAL_TRANSPORT][param] || []).concat(handler) - // Call lazy connect callbacks if needed - if (handlers.length === 1) callLazyConnectCallbacks() + // Call lazy connect callbacks if there is at least one handler + const paramHandlers = getParamHandlers(param, handlers) + if (paramHandlers.length === 1) callLazyConnectCallbacks(param) // Return the unsubscription function return () => { // Unregister remote handler with all of our remote transports - transports.forEach(t => t.unregisterHandler(slotName, handler)) + transports.forEach(t => t.unregisterHandler(slotName, param, handler)) - const ix = handlers.indexOf(handler) - if (ix !== -1) { - handlers.splice(ix, 1) - } + const localParamHandlers = handlers[LOCAL_TRANSPORT][param] || [] + const ix = localParamHandlers.indexOf(handler) + if (ix !== -1) handlers[LOCAL_TRANSPORT][param].splice(ix, 1) - // Call lazy disconnect callbacks if needed - if (!handlers.length) callLazyDisonnectCallbacks() + // Call lazy disconnect callbacks if there are no handlers anymore + const paramHandlers = getParamHandlers(param, handlers) + if (paramHandlers.length === 0) callLazyDisonnectCallbacks(param) } - } - return trigger as Slot + return Object.assign(trigger, { on, lazy, config }) } diff --git a/src/Transport.ts b/src/Transport.ts index 2c6ce53..9825ba4 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -1,12 +1,12 @@ import { Handler, callHandlers } from './Handler' import { Channel } from './Channel' import { + TransportError, + TransportMessage, TransportRegistrationMessage, TransportUnregistrationMessage, - TransportError, - TransportRequest, TransportResponse, - TransportMessage + TransportRequest } from './Message' let _ID = 0 @@ -34,34 +34,51 @@ export type PendingRequests = { } } +type RemoteHandlerCallback = + (param: string, handler: Handler) => void + export class Transport { - private _localHandlers: { [slotName: string]: Handler[] } = {} - private _localHandlerRegistrations: TransportRegistrationMessage[] = [] + private _localHandlers: { + [slotName: string]: { + [param: string]: Handler[] + } + } = {} + + private _localHandlerRegistrations: { + [param: string]: TransportRegistrationMessage[] + } = {} /** * Handlers created by the Transport. When an event is triggered locally, * these handlers will make a request to the far end to handle this event, * and store a PendingRequest */ - private _remoteHandlers: { [slotName: string]: Handler } = {} + private _remoteHandlers: { + [slotName: string]: { + [param: string]: Handler + } + } = {} /** * Callbacks provided by each slot allowing to register remote handlers * created by the Transport */ - private _remoteHandlerRegistrationCallbacks: { [slotName: string]: (newHandler: Handler) => void } = {} + private _remoteHandlerRegistrationCallbacks: + { [slotName: string]: RemoteHandlerCallback } = {} /** * Callbacks provided by each slot allowing to unregister the remote handlers * created by the Transport, typically when the remote connection is closed. */ - private _remoteHandlerDeletionCallbacks: { [slotName: string]: (newHandler: Handler) => void } = {} + private _remoteHandlerDeletionCallbacks: + { [slotName: string]: RemoteHandlerCallback } = {} /** * Requests that have been sent to the far end, but have yet to be fulfilled */ - private _pendingRequests: PendingRequests = {} + private _pendingRequests: { [param: string]: PendingRequests } = {} + private _channelReady = false constructor(private _channel: Channel) { @@ -74,7 +91,7 @@ export class Transport { case 'handler_registered': return this._registerRemoteHandler(message) case 'handler_unregistered': - return this._unregisterRemoteHandler(message.slotName) + return this._unregisterRemoteHandler(message) case 'error': return this._errorReceived(message) default: @@ -85,8 +102,10 @@ export class Transport { this._channelReady = true // When the far end connects, signal which local handlers are set - this._localHandlerRegistrations.forEach(msg => { - this._channel.send(msg) + Object.keys(this._localHandlerRegistrations).forEach(param => { + this._localHandlerRegistrations[param].forEach(msg => { + this._channel.send(msg) + }) }) }) this._channel.onDisconnect(() => { @@ -108,32 +127,33 @@ export class Transport { * and send either a response or an error mirroring the request id, * depending on the status of the resulting promise */ - private _requestReceived({ slotName, data, id }: TransportRequest): void { + private _requestReceived({ slotName, data, id, param }: TransportRequest): void { // Get local handlers - const handlers = this._localHandlers[slotName] - if (!handlers) { - return - } + const slotHandlers = this._localHandlers[slotName] + if (!slotHandlers) return + + const handlers = slotHandlers[param] + if (!handlers) return // Call local handlers with the request data callHandlers(data, handlers) - // If the resulting promise is fulfilled, send a response to the far end .then(response => this._channel.send({ type: 'response', slotName, id, - data: response - }) - ) + data: response, + param + })) // If the resulting promise is rejected, send an error to the far end .catch((error: Error) => this._channel.send({ - type: 'error', - slotName, id, message: `${error}`, - stack: error.stack || '' + param, + slotName, + stack: error.stack || '', + type: 'error' })) } @@ -141,26 +161,28 @@ export class Transport { * When a response is received from the far end, resolve the pending promise * with the received data */ - private _responseReceived({ slotName, data, id }: TransportResponse): void { - if (!this._pendingRequests[slotName][id]) { + private _responseReceived({ slotName, data, id, param }: TransportResponse): void { + const slotRequests = this._pendingRequests[slotName] + if (!slotRequests || !slotRequests[param] || !slotRequests[param][id]) { return } - this._pendingRequests[slotName][id].resolve(data) - delete this._pendingRequests[slotName][id] + + slotRequests[param][id].resolve(data) + delete slotRequests[param][id] } /** * When an error is received from the far end, reject the pending promise * with the received data */ - private _errorReceived({ slotName, id, message, stack }: TransportError): void { - if (!this._pendingRequests[slotName][id]) { - return - } - const error = new Error(`${message} on ${slotName}`) + private _errorReceived({ slotName, id, message, stack, param }: TransportError): void { + const slotRequests = this._pendingRequests[slotName] + if (!slotRequests || !slotRequests[param] || !slotRequests[param][id]) return + + const error = new Error(`${message} on ${slotName} with param ${param}`) error.stack = stack || error.stack - this._pendingRequests[slotName][id].reject(error) - delete this._pendingRequests[slotName][id] + this._pendingRequests[slotName][param][id].reject(error) + delete this._pendingRequests[slotName][param][id] } /** @@ -170,14 +192,15 @@ export class Transport { * and rejection function * */ - private _registerRemoteHandler({ slotName }: TransportMessage): void { + private _registerRemoteHandler({ slotName, param }: TransportRegistrationMessage): void { + const addHandler = this._remoteHandlerRegistrationCallbacks[slotName] - if (!addHandler) { - return - } - if (this._remoteHandlers[slotName]) { - return - } + if (!addHandler) return + + const slotHandlers = this._remoteHandlers[slotName] + + if (slotHandlers && slotHandlers[param]) return + const remoteHandler = (requestData: any) => new Promise((resolve, reject) => { // If the channel is not ready, reject immediately // TODO think of a better (buffering...) solution in the future @@ -187,74 +210,89 @@ export class Transport { // Keep a reference to the pending promise's // resolution and rejection callbacks - if (!this._pendingRequests[slotName]) { - this._pendingRequests[slotName] = {} - } const id = getId() - this._pendingRequests[slotName][id] = { resolve, reject } + + this._pendingRequests[slotName] = this._pendingRequests[slotName] || {} + this._pendingRequests[slotName][param] = this._pendingRequests[slotName][param] || {} + this._pendingRequests[slotName][param][id] = { resolve, reject } // Send a request to the far end this._channel.send({ type: 'request', id, slotName, + param, data: requestData }) // Handle request timeout if needed setTimeout(() => { - if (this._pendingRequests[slotName][id]) { - this._pendingRequests[slotName][id].reject(new Error(`${ERRORS.TIMED_OUT} on ${slotName}`)) - delete this._pendingRequests[slotName][id] + const slotHandlers = this._pendingRequests[slotName] || {} + const paramHandlers = slotHandlers[param] || {} + const request = paramHandlers[id] + if (request) { + const error = new Error(`${ERRORS.TIMED_OUT} on ${slotName} with param ${param}`) + request.reject(error) + delete this._pendingRequests[slotName][param][id] } }, this._channel.timeout) }) - this._remoteHandlers[slotName] = remoteHandler - addHandler(remoteHandler) + + this._remoteHandlers[slotName] = this._remoteHandlers[slotName] || {} + this._remoteHandlers[slotName][param] = remoteHandler + + addHandler(param, remoteHandler) } - private _unregisterRemoteHandler(slotName: string): void { + private _unregisterRemoteHandler( + { slotName, param }: { slotName: string, param: string } + ): void { const unregisterRemoteHandler = this._remoteHandlerDeletionCallbacks[slotName] - const remoteHandler = this._remoteHandlers[slotName] + const slotHandlers = this._remoteHandlers[slotName] + if (!slotHandlers) return + + const remoteHandler = slotHandlers[param] if (remoteHandler && unregisterRemoteHandler) { - unregisterRemoteHandler(remoteHandler) - delete this._remoteHandlers[slotName] + unregisterRemoteHandler(param, remoteHandler) + delete this._remoteHandlers[slotName][param] } } private _unregisterAllRemoteHandlers(): void { Object.keys(this._remoteHandlerDeletionCallbacks) .forEach(slotName => { - this._unregisterRemoteHandler(slotName) + const slotHandlers = this._remoteHandlers[slotName] + if (!slotHandlers) return + + const params = Object.keys(slotHandlers).filter(param => slotHandlers[param]) + params.forEach(param => this._unregisterRemoteHandler({ slotName, param })) }) } private _rejectAllPendingRequests(e: Error): void { Object.keys(this._pendingRequests).forEach(slotName => { - Object.keys(this._pendingRequests[slotName]).forEach(id => { - this._pendingRequests[slotName][id].reject(e) + Object.keys(this._pendingRequests[slotName]).forEach(param => { + Object.keys(this._pendingRequests[slotName][param]).forEach(id => { + this._pendingRequests[slotName][param][id].reject(e) + }) }) this._pendingRequests[slotName] = {} }) } - /** - * Called on slot creation. - * The provided callbacks will be used when remote handlers are registered, - * to add a corresponding local handler. - */ - public onRemoteHandlerRegistered(slotName: string, addLocalHandler: (h: Handler) => void): void { + public addRemoteHandlerRegistrationCallback( + slotName: string, + addLocalHandler: (p: string, h: Handler) => void + ): void { if (!this._remoteHandlerRegistrationCallbacks[slotName]) { this._remoteHandlerRegistrationCallbacks[slotName] = addLocalHandler } } - /** - * Called on slot creation. - * The provided callbacks will be used when the far end disconnects, - * to remove the handlers we had registered. - */ - public onRemoteHandlerUnregistered(slotName: string, removeHandler: (h: Handler) => void): void { + public addRemoteHandlerUnregistrationCallback( + slotName: string, + removeHandler: (p: string, h: Handler) => void + ): void { if (!this._remoteHandlerDeletionCallbacks[slotName]) { this._remoteHandlerDeletionCallbacks[slotName] = removeHandler } @@ -264,21 +302,31 @@ export class Transport { * Called when a local handler is registered, to send a `handler_registered` * message to the far end. */ - public registerHandler(slotName: string, handler: Handler): void { - if (!this._localHandlers[slotName]) { - this._localHandlers[slotName] = [] - } - this._localHandlers[slotName].push(handler) + public registerHandler( + slotName: string, + param: string, + handler: Handler + ): void { + + this._localHandlers[slotName] = this._localHandlers[slotName] || {} + this._localHandlers[slotName][param] = this._localHandlers[slotName][param] || [] + this._localHandlers[slotName][param].push(handler) + /** * We notify the far end when adding the first handler only, as they * only need to know if at least one handler is connected. */ - if (this._localHandlers[slotName].length === 1) { + if (this._localHandlers[slotName][param].length === 1) { const registrationMessage: TransportRegistrationMessage = { type: 'handler_registered', + param, slotName } - this._localHandlerRegistrations.push(registrationMessage) + this._localHandlerRegistrations[param] = + this._localHandlerRegistrations[param] || [] + + this._localHandlerRegistrations[param].push(registrationMessage) + if (this._channelReady) { this._channel.send(registrationMessage) } @@ -289,18 +337,24 @@ export class Transport { * Called when a local handler is unregistered, to send a `handler_unregistered` * message to the far end. */ - public unregisterHandler(slotName: string, handler: Handler): void { - if (this._localHandlers[slotName]) { - const ix = this._localHandlers[slotName].indexOf(handler) + public unregisterHandler( + slotName: string, + param: string, + handler: Handler + ): void { + const slotLocalHandlers = this._localHandlers[slotName] + if (slotLocalHandlers && slotLocalHandlers[param]) { + const ix = slotLocalHandlers[param].indexOf(handler) if (ix > -1) { - this._localHandlers[slotName].splice(ix, 1) + slotLocalHandlers[param].splice(ix, 1) /** * We notify the far end when removing the last handler only, as they * only need to know if at least one handler is connected. */ - if (this._localHandlers[slotName].length === 0) { + if (slotLocalHandlers[param].length === 0) { const unregistrationMessage: TransportUnregistrationMessage = { type: 'handler_unregistered', + param, slotName } if (this._channelReady) { diff --git a/src/index.ts b/src/index.ts index 8e1b7a1..c3a7167 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,3 +4,4 @@ export { Channel } from './Channel' export { GenericChannel } from './Channels/GenericChannel' export { ChunkedChannel } from './Channels/ChunkedChannel' export { TransportMessage } from './Message' +export { DEFAULT_PARAM } from './Constants' diff --git a/test/Event.test.ts b/test/Event.test.ts index 30230b5..2a53613 100644 --- a/test/Event.test.ts +++ b/test/Event.test.ts @@ -2,24 +2,28 @@ import 'should' import {slot} from './../src/Slot' import {combineEvents, createEventBus} from './../src/Events' -import {TransportMessage} from './../src/Message' +// import {TransportMessage} from './../src/Message' import {TestChannel} from './TestChannel' +import { DEFAULT_PARAM } from './../src/Constants' import * as sinon from 'sinon' describe('combineEvents()', () => { it('should correctly combine several EventDeclarations', () => { + const helloEvents = { + hello: slot<{ name: string }>() + } + const howAreEvents = { + how: slot<{ mode: 'simple' | 'advanced'}>(), + are: slot<{ tense: number }>() + } + const youEvents = { + you: slot<{ reflective: boolean }>() + } const combined = combineEvents( - { - hello: slot<{ name: string }>() - }, - { - how: slot<{ mode: 'simple' | 'advanced'}>(), - are: slot<{ tense: number }>() - }, - { - you: slot<{ reflective: boolean }>() - } + helloEvents, + howAreEvents, + youEvents ) Object.keys(combined).should.eql(['hello', 'how', 'are', 'you']) @@ -31,6 +35,14 @@ describe('combineEvents()', () => { // combined.you({ reflective: 5 }) }) + it('should throw with duplicate slot declarations', () => { + const helloEvents = { + hello: slot<{ name: string }>() + } + const failing = () => combineEvents(helloEvents, helloEvents) + failing.should.throw(/duplicate slots/) + }) + }) describe('createEventBus()', () => { @@ -39,6 +51,8 @@ describe('createEventBus()', () => { numberToString: slot() } + const param = DEFAULT_PARAM + it('should correctly create an event bus with no channels', async () => { // Attempting to use the events without having @@ -68,6 +82,7 @@ describe('createEventBus()', () => { eventBus.numberToString.on(num => num.toString()) channel.sendSpy.calledWith({ type: 'handler_registered', + param, slotName: 'numberToString' }).should.be.True() @@ -76,6 +91,7 @@ describe('createEventBus()', () => { channel.fakeReceive({ type: 'request', slotName: 'numberToString', + param, id: '0', data: 5 }) @@ -84,9 +100,9 @@ describe('createEventBus()', () => { channel.sendSpy.calledWith({ type: 'response', slotName: 'numberToString', + param, id: '0', data: '5' }).should.be.True() }) - }) diff --git a/test/Slot.test.ts b/test/Slot.test.ts index 299c359..ad8ba3e 100644 --- a/test/Slot.test.ts +++ b/test/Slot.test.ts @@ -4,6 +4,8 @@ import { spy } from 'sinon' import { connectSlot } from './../src/Slot' import { TestChannel } from './TestChannel' import { Transport } from './../src/Transport' +import { DEFAULT_PARAM } from './../src/Constants' +import { TransportRegistrationMessage } from './../src/Message' const makeTestTransport = () => { const channel = new TestChannel() @@ -12,101 +14,130 @@ const makeTestTransport = () => { return { channel, transport } } -describe('connectSlot()', () => { +describe('connectSlot', () => { + + context('without parameter', () => { + + describe('trigger', () => { + it('should use default parameter', async () => { + const numberToString = connectSlot('numberToString', []) + numberToString.on(DEFAULT_PARAM, num => `${num.toString()}`) + + const res = await numberToString(56) + res.should.eql('56') + }) + }) + + describe('on', () => { + it('should use default parameter', async () => { + const numberToString = connectSlot('numberToString', []) + numberToString.on(num => `${num.toString()}`) + + const res = await numberToString(DEFAULT_PARAM, 56) + res.should.eql('56') + }) + }) + }) context('with no transports', () => { - it('should call the local handler if there is only one', () => { + it('should call a single local handler registered for a parameter', async () => { const numberToString = connectSlot('numberToString', []) - numberToString.on(num => num.toString()) - return numberToString(56) - .then(res => res.should.eql('56')) + numberToString.on('a', num => `a${num.toString()}`) + + const res = await numberToString('a', 56) + res.should.eql('a56') }) it('should call all handlers if there is more than one', async () => { const broadcastBool = connectSlot('broadcastBool', []) const results: string[] = [] - broadcastBool.on(b => { - results.push(`1:${b}`) - }) - broadcastBool.on(b => { - results.push(`2:${b}`) - }) - broadcastBool.on(b => { - results.push(`3:${b}`) - }) - await broadcastBool(true) - results.should.eql([ - '1:true', - '2:true', - '3:true' - ]) + + broadcastBool.on('a', b => { results.push(`1:${b}`) }) + broadcastBool.on('a', b => { results.push(`2:${b}`) }) + broadcastBool.on('a', b => { results.push(`3:${b}`) }) + + // Should not be called: different parameter + broadcastBool.on('b', b => { results.push(`4:${b}`) }) + + await broadcastBool('a', true) + + results.should.eql([ '1:true', '2:true', '3:true' ]) + }) + + it('should allow subscribing to multiple parameters', async () => { + const broadcastBool = connectSlot('broadcastBool', []) + let value = 0 + + broadcastBool.on('add', n => { value += n }) + broadcastBool.on('remove', n => { value -= n }) + + await broadcastBool('add', 3) + value.should.eql(3) + + await broadcastBool('remove', 2) + value.should.eql(1) }) it('should allow to unregister handlers', async () => { - const broadcastNumber = connectSlot('broadcastNumber', []) - let results: number[] = [] - const runTest = (n: number) => { - results = [] - return broadcastNumber(n) - } - - // Add two handlers and save references to - // the unsub function of the second - broadcastNumber.on(n => { - results.push(n - 1) - }) - const unreg = broadcastNumber.on(n => { - results.push(n - 2) - }) + const broadcastBool = connectSlot('broadcastBool', []) + let value = 0 - // Trigger the event once - await runTest(5) + const unsub = broadcastBool.on('add', n => { value += n }) + broadcastBool.on('add', n => { value += n }) - // both handlers should have been called - results.should.eql([4, 3]) + await broadcastBool('add', 3) + value.should.eql(6) // 2 * 3 - // unregister the second handler, then trigger - // the event a second time: only the second handler - // should be called - unreg() - await runTest(56) - results.should.eql([55]) + unsub() + + await broadcastBool('add', 3) + value.should.eql(9) // 6 + 1 * 3 }) - it('should call lazy connect and disconnect', () => { + it('should call lazy connect and disconnect with parameter', () => { const broadcastBool = connectSlot('broadcastBool', []) + const param = 'param' + const connect = spy() const disconnect = spy() broadcastBool.lazy(connect, disconnect) - const unsubscribe = broadcastBool.on(() => {}) + const unsubscribe = broadcastBool.on(param, () => {}) + + if (!connect.calledWith(param)) + throw new Error('connect should have been called with parameter') - if (!connect.called) throw new Error('connect should have been called') - if (disconnect.called) throw new Error('disconnect should not have been called') + if (disconnect.called) + throw new Error('disconnect should not have been called with parameter') unsubscribe() - if (!disconnect.called) throw new Error('disconnect should have been called') + if (!disconnect.calledWith(param)) + throw new Error('disconnect should have been called with parameter') }) - }) context('with local and remote handlers', () => { it('should call both local handlers and remote handlers', async () => { - const {channel, transport} = makeTestTransport() + const { channel, transport } = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport]) let localCalled = false - broadcastBool.on(b => { localCalled = true }) + broadcastBool.on(_b => { localCalled = true }) const triggerPromise = broadcastBool(true) // Handlers should not be called until a remote handler is registered await Promise.resolve() localCalled.should.be.False() - channel.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + + channel.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered' + }) // setTimeout(0) to yield control to ts-event-bus internals, // so that the call to handlers can be processed @@ -115,27 +146,67 @@ describe('connectSlot()', () => { // Once a remote handler is registered, both local and remote should be called localCalled.should.be.True() const request = channel.sendSpy.lastCall.args[0] + request.should.match({ - type: 'request', + data: true, + param: DEFAULT_PARAM, slotName: 'broadcastBool', - data: true + type: 'request' }) // triggerPromise should resolve once a remote response is received channel.fakeReceive({ - type: 'response', + data: null, id: request.id, + param: DEFAULT_PARAM, slotName: 'broadcastBool', - data: null + type: 'response' }) await triggerPromise }) + describe('noBuffer', () => { + it('should call local handlers even if no remote handler is registered', async () => { + const { channel, transport } = makeTestTransport() + const broadcastBool = connectSlot( + 'broadcastBool', + [transport], + { noBuffer: true } + ) + let localCalled = false + broadcastBool.on(_b => { localCalled = true }) + broadcastBool(true) + + // We should have called the trigger + localCalled.should.be.True() + + const registrationMessage: TransportRegistrationMessage = { + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered' + } + + channel.fakeReceive(registrationMessage) + await new Promise(resolve => setTimeout(resolve, 0)) + + // Remote should not have been called, as it was not registered + // at the time of the trigger. + const request = channel.sendSpy.lastCall.args[0] + request.should.match(registrationMessage) + }) + }) + describe('lazy', () => { it('should call connect and disconnect', () => { + const param = 'param' + const { channel: channel1, transport: transport1 } = makeTestTransport() const { channel: channel2, transport: transport2 } = makeTestTransport() - const broadcastBool = connectSlot('broadcastBool', [transport1, transport2]) + + const broadcastBool = connectSlot( + 'broadcastBool', + [transport1, transport2] + ) const connect = spy() const disconnect = spy() @@ -143,89 +214,114 @@ describe('connectSlot()', () => { broadcastBool.lazy(connect, disconnect) // Simulate two remote connextions to the slot - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) - channel2.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) + + channel2.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) // Connect should have been called once - if (!connect.calledOnce) throw new Error('connect should have been called once') + if (!connect.calledOnceWith(param)) + throw new Error('connect should have been called once with param') // Disconnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') + if (disconnect.called) + throw new Error('disconnect should not have been called') // Disconnect first remote client - channel1.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disconnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') + if (disconnect.called) + throw new Error('disconnect should not have been called') // Disconnect second remote client - channel2.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + channel2.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disconnect should have been called once - if (!disconnect.calledOnce) throw new Error('disconnect should have been called once') + if (!disconnect.calledOnceWith(param)) + throw new Error('disconnect should have been called once with param') }) it('should support multiple lazy calls', () => { const { channel: channel1, transport: transport1 } = makeTestTransport() + const param = 'param' + const connect1 = spy() const disconnect1 = spy() const connect2 = spy() const disconnect2 = spy() - const broadcastBool = connectSlot('broadcastBool', [transport1]) + const broadcastBool = connectSlot( + 'broadcastBool', + [transport1] + ) broadcastBool.lazy(connect1, disconnect1) broadcastBool.lazy(connect2, disconnect2) - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) // Connects should have been called once - if (!connect1.calledOnce) throw new Error('connect1 should have been called once') - if (!connect2.calledOnce) throw new Error('connect1 should have been called once') + if (!connect1.calledOnceWith(param)) + throw new Error('connect1 should have been called once with param') + + if (!connect2.calledOnceWith(param)) + throw new Error('connect2 should have been called once with param') - channel1.fakeReceive({ type: 'handler_unregistered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_unregistered', + slotName: 'broadcastBool', + param + }) // Disonnects should have been called once - if (!disconnect1.calledOnce) throw new Error('connect1 should have been called once') - if (!disconnect2.calledOnce) throw new Error('connect1 should have been called once') + if (!disconnect1.calledOnceWith(param)) + throw new Error('disconnect1 should have been called once with param') + + if (!disconnect2.calledOnceWith(param)) + throw new Error('disconnect2 should have been called once with param') }) it('should call connect if transport was registered before lazy was called', () => { + const param = 'param' const { channel: channel1, transport: transport1 } = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport1]) const connect = spy() // Register remote handler *before* calling lazy - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) + channel1.fakeReceive({ + type: 'handler_registered', + slotName: 'broadcastBool', + param + }) broadcastBool.lazy(connect, () => {}) // Connect should have been called once - if (!connect.calledOnce) throw new Error('connect should have been called once') - }) - - it('should call disconnect on unsubscribe when remote client is connected', () => { - const { channel: channel1, transport: transport1 } = makeTestTransport() - const broadcastBool = connectSlot('broadcastBool', [transport1]) - - const disconnect = spy() - - // Register remote handler - channel1.fakeReceive({ type: 'handler_registered', slotName: 'broadcastBool'}) - - // Connect lazy - const unsubscribe = broadcastBool.lazy(() => {}, disconnect) - - // Disonnect should not have been called - if (disconnect.called) throw new Error('disconnect should not have been called') - - unsubscribe() - - // Disconnect should have been called once - if (!disconnect.calledOnce) throw new Error('disconnect should have been called once') + if (!connect.calledOnceWith(param)) + throw new Error('connect should have been called once with param') }) }) }) diff --git a/test/TestChannel.ts b/test/TestChannel.ts index aa5b44f..e0d81e6 100644 --- a/test/TestChannel.ts +++ b/test/TestChannel.ts @@ -2,6 +2,7 @@ import * as sinon from 'sinon' import { ChunkedChannel } from './../src/Channels/ChunkedChannel' import { GenericChannel } from './../src/Channels/GenericChannel' import { TransportMessage } from './../src/Message' +import { DEFAULT_PARAM } from './../src/Constants' export class TestChannel extends GenericChannel { @@ -27,6 +28,7 @@ export class TestChannel extends GenericChannel { this._messageReceived({ type: 'error', slotName: 'test', + param: DEFAULT_PARAM, id: '1', message: 'error' }) diff --git a/test/Transport.test.ts b/test/Transport.test.ts index ee884ac..b51f20b 100644 --- a/test/Transport.test.ts +++ b/test/Transport.test.ts @@ -5,8 +5,8 @@ import { TransportMessage } from './../src/Message' import { TestChannel } from './TestChannel' import * as sinon from 'sinon' import { SinonSpy } from 'sinon' -import { createEventBus } from '../src/Events' -import { slot } from '../src/Slot' + +const param = 'param' describe('Transport', () => { @@ -15,7 +15,7 @@ describe('Transport', () => { const channel = new TestChannel() const stubMethods: Array = ['onConnect', 'onDisconnect', 'onData', 'onError'] stubMethods.forEach(method => sinon.stub(channel, method)) - const transport = new Transport(channel) + new Transport(channel) stubMethods.forEach(methodName => it(`should subscribe to its channel\'s ${methodName} method`, () => { const method = channel[methodName] as any as sinon.SinonSpy @@ -43,9 +43,10 @@ describe('Transport', () => { it('should send a handler_registered message for each slot when a local handler is registered', () => { Object.keys(slots).forEach(slotName => { - transport.registerHandler(slotName, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_registered', + param, slotName }).should.be.True() }) @@ -53,14 +54,25 @@ describe('Transport', () => { it('should not send a handler_registered message when an additional local handler is registered', () => { const slotName = 'getCarrotStock' - transport.registerHandler(slotName, slots[slotName][0]) - transport.registerHandler(slotName, slots[slotName][1]) + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) channel.sendSpy - .withArgs({ type: 'handler_registered', slotName }) + .withArgs({ type: 'handler_registered', slotName, param }) .calledOnce // should have been called exactly once .should.be.True() }) + it('should not send a handler_unregistered message when an additional local handler is unregistered', () => { + const slotName = 'getCarrotStock' + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) + transport.unregisterHandler(slotName, param, slots[slotName][1]) + channel.sendSpy + .withArgs({ type: 'handler_unregistered', slotName, param }) + .called + .should.be.False() + }) + it('should call the appropriate handler when a request is received', async () => { @@ -68,12 +80,13 @@ describe('Transport', () => { const handler = slots[slotName][0] // Register handler on slot - transport.registerHandler(slotName, handler) + transport.registerHandler(slotName, param, handler) const request: TransportMessage = { type: 'request', slotName, id: '5', + param, data: { height: 5, constitution: 'strong' @@ -88,6 +101,7 @@ describe('Transport', () => { slotName, type: 'response', id: '5', + param, data: { color: 'blue' } @@ -99,13 +113,14 @@ describe('Transport', () => { const slotName = 'buildCelery' // Register one handler on slot - transport.registerHandler(slotName, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][0]) // Unregister it - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_unregistered', + param, slotName }).should.be.True() }) @@ -116,15 +131,16 @@ describe('Transport', () => { const handler = slots[slotName][0] // Register one handler on slot - transport.registerHandler(slotName, handler) + transport.registerHandler(slotName, param, handler) // Unregister it - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) const request: TransportMessage = { type: 'request', slotName, id: '5', + param, data: { height: 5, constitution: 'strong' @@ -140,11 +156,11 @@ describe('Transport', () => { const slotName = 'getCarrotStock' // Register two handlers on slot - transport.registerHandler(slotName, slots[slotName][0]) - transport.registerHandler(slotName, slots[slotName][1]) + transport.registerHandler(slotName, param, slots[slotName][0]) + transport.registerHandler(slotName, param, slots[slotName][1]) // Unregister one handler only - transport.unregisterHandler(slotName, slots[slotName][0]) + transport.unregisterHandler(slotName, param, slots[slotName][0]) channel.sendSpy.calledWith({ type: 'handler_unregistered', @@ -163,16 +179,16 @@ describe('Transport', () => { beforeEach(() => { addLocalHandler = sinon.spy() removeLocalHandler = sinon.spy() - transport.onRemoteHandlerRegistered(slotName, addLocalHandler) - channel.fakeReceive({ type: 'handler_registered', slotName }) - localHandler = addLocalHandler.lastCall.args[0] + transport.addRemoteHandlerRegistrationCallback(slotName, addLocalHandler) + channel.fakeReceive({ type: 'handler_registered', slotName, param }) + localHandler = addLocalHandler.lastCall.args[1] }) it('should add a local handler when a remote handler registration is received', () => { addLocalHandler.called.should.be.True() }) - it('should resolve a local pending request when a response is received', () => { + it('should resolve a local pending request when a response is received', async () => { const requestData = { carrotType: 'red' } const pendingPromise = localHandler(requestData) const request = channel.sendSpy.lastCall.args[0] @@ -186,38 +202,52 @@ describe('Transport', () => { type: 'response', id: request.id, slotName, + param, data: responseData }) - return pendingPromise - .then((response: number) => response.should.eql(responseData)) + const response = await pendingPromise + response.should.eql(responseData) }) - it('should reject a local pending request when an error is received', () => { + it('should reject a local pending request when an error is received', async () => { const pendingPromise = localHandler({ carrotType: 'blue' }) - const { id } = channel.sendSpy.lastCall.args[0] + const { id, param } = channel.sendSpy.lastCall.args[0] channel.fakeReceive({ type: 'error', id, + param, slotName, message: 'all out of blue' }) - return pendingPromise - .then(() => { - throw new Error('Promise should have been rejected') - }) - .catch(err => `${err}`.should.eql('Error: all out of blue on getCarrotStock')) + try { + await pendingPromise + throw new Error('Promise should have been rejected') + } + catch (err) { + `${err}`.should.eql('Error: all out of blue on getCarrotStock with param param') + } }) it('should remove a local handler when a remote handler unregistration is received', () => { - transport.onRemoteHandlerUnregistered(slotName, removeLocalHandler) + transport.addRemoteHandlerUnregistrationCallback(slotName, removeLocalHandler) channel.fakeReceive({ type: 'handler_unregistered', + param, slotName }) removeLocalHandler.called.should.be.True() }) - }) - }) + it('should unregister all remote handlers when channel gets disconnected', () => { + // Add remote handler unregistration callback + transport.addRemoteHandlerUnregistrationCallback(slotName, removeLocalHandler) + // Disconnect channel + channel.callDisconnected() + + // Callback should have been called + removeLocalHandler.called.should.be.True() + }) + }) + }) }) diff --git a/yarn.lock b/yarn.lock index a0b6ae2..eb38450 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1530,10 +1530,6 @@ flush-write-stream@^1.0.0: inherits "^2.0.1" readable-stream "^2.0.4" -for-in@^1.0.2: - version "1.0.2" - resolved "https://registry.yarnpkg.com/for-in/-/for-in-1.0.2.tgz#81068d295a8142ec0ac726c6e2200c30fb6d5e80" - fragment-cache@^0.2.1: version "0.2.1" resolved "https://registry.yarnpkg.com/fragment-cache/-/fragment-cache-0.2.1.tgz#4290fad27f13e89be7f33799c6bc5a0abfff0d19" @@ -1955,7 +1951,7 @@ is-number@^3.0.0: dependencies: kind-of "^3.0.2" -is-plain-object@^2.0.1, is-plain-object@^2.0.3, is-plain-object@^2.0.4: +is-plain-object@^2.0.4: version "2.0.4" resolved "https://registry.yarnpkg.com/is-plain-object/-/is-plain-object-2.0.4.tgz#2c163b3fafb1b606d9d17928f05c2a1c38e07677" dependencies: @@ -2095,10 +2091,10 @@ lodash.get@^4.4.2: version "4.4.2" resolved "https://registry.yarnpkg.com/lodash.get/-/lodash.get-4.4.2.tgz#2d177f652fa31e939b4438d5341499dfa3825e99" -lodash@>=4.17.11, lodash@^4.17.4: - version "4.17.11" - resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.11.tgz#b39ea6229ef607ecd89e2c8df12536891cac9b8d" - integrity sha512-cQKh8igo5QUhZ7lg38DYWAxMvjSAKG0A8wGSVimP07SIUEK2UO+arSRKbRZWtelMtN5V0Hkwh5ryOto/SshYIg== +lodash@>=4.17.12, lodash@^4.17.4: + version "4.17.14" + resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.14.tgz#9ce487ae66c96254fe20b599f21b6816028078ba" + integrity sha512-mmKYbW3GLuJeX+iGP+Y7Gp1AiGHGbXHCOh/jZmrawMmsE7MS4znI3RL2FsjbqOyMayHInjOeykW7PEajUk1/xw== lolex@^2.2.0, lolex@^2.3.2: version "2.3.2" @@ -2245,12 +2241,10 @@ mississippi@^2.0.0: stream-each "^1.1.0" through2 "^2.0.0" -mixin-deep@^1.2.0: - version "1.3.1" - resolved "https://registry.yarnpkg.com/mixin-deep/-/mixin-deep-1.3.1.tgz#a49e7268dce1a0d9698e45326c5626df3543d0fe" - dependencies: - for-in "^1.0.2" - is-extendable "^1.0.1" +"mixin-deep@>=1.3.2 <2.0.0 || >=2.0.1", mixin-deep@^1.2.0: + version "2.0.1" + resolved "https://registry.yarnpkg.com/mixin-deep/-/mixin-deep-2.0.1.tgz#9a6946bef4a368401b784970ae3caaaa6bab02fa" + integrity sha512-imbHQNRglyaplMmjBLL3V5R6Bfq5oM+ivds3SKgc6oRtzErEnBUUc5No11Z2pilkUvl42gJvi285xTNswcKCMA== mkdirp@0.5.1, mkdirp@^0.5.0, mkdirp@^0.5.1, mkdirp@~0.5.0: version "0.5.1" @@ -2969,23 +2963,12 @@ set-blocking@^2.0.0, set-blocking@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/set-blocking/-/set-blocking-2.0.0.tgz#045f9782d011ae9a6803ddd382b24392b3d890f7" -set-value@^0.4.3: - version "0.4.3" - resolved "https://registry.yarnpkg.com/set-value/-/set-value-0.4.3.tgz#7db08f9d3d22dc7f78e53af3c3bf4666ecdfccf1" - dependencies: - extend-shallow "^2.0.1" - is-extendable "^0.1.1" - is-plain-object "^2.0.1" - to-object-path "^0.3.0" - -set-value@^2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/set-value/-/set-value-2.0.0.tgz#71ae4a88f0feefbbf52d1ea604f3fb315ebb6274" +"set-value@>=2.0.1 <3.0.0 || >=3.0.1", set-value@^0.4.3, set-value@^2.0.0: + version "3.0.1" + resolved "https://registry.yarnpkg.com/set-value/-/set-value-3.0.1.tgz#52c82af7653ba69eb1db92e81f5cdb32739b9e95" + integrity sha512-w6n3GUPYAWQj4ZyHWzD7K2FnFXHx9OTwJYbWg+6nXjG8sCLfs9DGv+KlqglKIIJx+ks7MlFuwFW2RBPb+8V+xg== dependencies: - extend-shallow "^2.0.1" - is-extendable "^0.1.1" - is-plain-object "^2.0.3" - split-string "^3.0.1" + is-plain-object "^2.0.4" setimmediate@^1.0.4: version "1.0.5" @@ -3160,7 +3143,7 @@ spdx-license-ids@^3.0.0: resolved "https://registry.yarnpkg.com/spdx-license-ids/-/spdx-license-ids-3.0.4.tgz#75ecd1a88de8c184ef015eafb51b5b48bfd11bb1" integrity sha512-7j8LYJLeY/Yb6ACbQ7F76qy5jHkp0U6jgBfJsk97bwWlVUnUWsAgpyaCvo17h0/RQGnQ036tVDomiwoI4pDkQA== -split-string@^3.0.1, split-string@^3.0.2: +split-string@^3.0.2: version "3.1.0" resolved "https://registry.yarnpkg.com/split-string/-/split-string-3.1.0.tgz#7cb09dda3a86585705c64b39a6466038682e8fe2" dependencies: