diff --git a/src/Events.ts b/src/Events.ts index 09b1c09..bf05cdf 100644 --- a/src/Events.ts +++ b/src/Events.ts @@ -34,15 +34,33 @@ export function combineEvents< return Object.assign({}, ...args) } -export function createEventBus(args: { events: C, channels?: Channel[] }): C { - const transports = (args.channels || []).map(c => new Transport(c)) +export function createEventBus< + C extends EventDeclaration, + T extends Array +>(args: { + events: C; + channels?: Channel[]; + ignoredEvents?: T; +}): Omit { + + const transports = (args.channels || []).map( + (c) => new Transport(c, (args.ignoredEvents as string[])) + ) const eventBus: Partial = {} for (const event in args.events) { - if (args.events.hasOwnProperty(event)) { - eventBus[event] = (connectSlot(event, transports, args.events[event].config) as C[Extract]) + if ( + args.events.hasOwnProperty(event) && + (!args.ignoredEvents || + (args.ignoredEvents && !args.ignoredEvents.includes(event))) + ) { + eventBus[event] = connectSlot( + event, + transports, + args.events[event].config + ) as C[Extract] } } - return eventBus as C + return eventBus as Omit } diff --git a/src/Message.ts b/src/Message.ts index c2115b2..a3a512d 100644 --- a/src/Message.ts +++ b/src/Message.ts @@ -28,17 +28,25 @@ export type TransportRegistrationMessage = { slotName: string, param: string } + export type TransportUnregistrationMessage = { type: 'handler_unregistered', slotName: string, param: string } + +export type TransportEventListMessage = { + type: 'event_list', + ignoredEvents: string[] +} + export type TransportMessage = TransportError | TransportRegistrationMessage | TransportRequest | TransportResponse | TransportUnregistrationMessage + | TransportEventListMessage export function isTransportMessage(m: { type: string }): m is TransportMessage { switch (m.type) { @@ -47,6 +55,7 @@ export function isTransportMessage(m: { type: string }): m is TransportMessage { case 'error': case 'handler_unregistered': case 'handler_registered': + case 'event_list': return true default: return false diff --git a/src/Slot.ts b/src/Slot.ts index 597ee9b..5d33ffc 100644 --- a/src/Slot.ts +++ b/src/Slot.ts @@ -165,23 +165,34 @@ export function connectSlot( // Signal to all transports that we will accept handlers for this slotName transports.forEach((transport, transportKey) => { - const remoteHandlerRegistered = ( param = DEFAULT_PARAM, handler: Handler ) => { + // If the remote end of the communication channel had blacklisted an + // event but is now trying to register a handler. We ignore it and + // consider the blacklist to be the source of truth + if (!remoteHandlersConnected[transportKey]) { + return + } + // Store handler const paramHandlers = handlers[transportKey][param] || [] handlers[transportKey][param] = paramHandlers.concat(handler) // Call lazy callbacks if needed - if (getParamHandlers(param, handlers).length === 1) callLazyConnectCallbacks(param) + if (getParamHandlers(param, handlers).length === 1) { + callLazyConnectCallbacks(param) + } // Release potential buffered events if (!remoteHandlersConnected[transportKey][param]) { awaitHandlerRegistration(String(transportKey), param) } + // call onRegister callback on slot for each transport. It will + // release the event once triggered. If one is not registered then + // event will not be sent. remoteHandlersConnected[transportKey][param].onRegister() } @@ -191,13 +202,50 @@ export function connectSlot( ) => { const paramHandlers = handlers[transportKey][param] || [] const handlerIndex = paramHandlers.indexOf(handler) - if (handlerIndex > -1) handlers[transportKey][param].splice(handlerIndex, 1) - if (getParamHandlers(param, handlers).length === 0) callLazyDisonnectCallbacks(param) - awaitHandlerRegistration(String(transportKey), param) + + if (handlerIndex > -1) + handlers[transportKey][param].splice(handlerIndex, 1) + + if (getParamHandlers(param, handlers).length === 0) + callLazyDisonnectCallbacks(param) + + if (remoteHandlersConnected[transportKey]) + awaitHandlerRegistration(String(transportKey), param) + } - transport.addRemoteHandlerRegistrationCallback(slotName, remoteHandlerRegistered) - transport.addRemoteHandlerUnregistrationCallback(slotName, remoteHandlerUnregistered) + const remoteEventListChangedListener = () => { + // Because the remote end communicated a blacklist of event it does + // not want to listen, and because the local end has registered a + // handler for the remote end for this blacklisted events. The local + // ends need to: + // 1 - resolve the onRegister promise + // 2 - remove the useless handler from the remote handlers list + if (remoteHandlersConnected[transportKey]) { + Object.keys(remoteHandlersConnected[transportKey]).forEach( + (param) => { + remoteHandlersConnected[transportKey][ + param + ].onRegister() + } + ) + } + delete remoteHandlersConnected[transportKey] + } + + transport.addRemoteHandlerRegistrationCallback( + slotName, + remoteHandlerRegistered + ) + transport.addRemoteHandlerUnregistrationCallback( + slotName, + remoteHandlerUnregistered + ) + + transport.addRemoteEventListChangedListener( + slotName, + remoteEventListChangedListener + ) }) /* @@ -255,11 +303,12 @@ export function connectSlot( return Promise.all(transportConnectionPromises).then(() => { return callHandlersWithParameters() }) - } - - else { + } else { transports.forEach((_t, transportKey) => { - if (!remoteHandlersConnected[transportKey][param]) { + if ( + remoteHandlersConnected[transportKey] && + !remoteHandlersConnected[transportKey][param] + ) { awaitHandlerRegistration(String(transportKey), param) } }) @@ -267,8 +316,12 @@ export function connectSlot( const transportPromises: Promise[] = transports.reduce( (acc, _t, transportKey) => [ ...acc, - remoteHandlersConnected[transportKey][param].registered - ], [] + ...((remoteHandlersConnected[transportKey] && [ + remoteHandlersConnected[transportKey][param].registered + ]) ?? + []) + ], + [] ) return Promise.all(transportPromises).then(() => { diff --git a/src/Transport.ts b/src/Transport.ts index 1cb3c4b..4702f80 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -6,7 +6,8 @@ import { TransportRegistrationMessage, TransportUnregistrationMessage, TransportResponse, - TransportRequest + TransportRequest, + TransportEventListMessage } from './Message' let _ID = 0 @@ -74,6 +75,13 @@ export class Transport { private _remoteHandlerDeletionCallbacks: { [slotName: string]: RemoteHandlerCallback } = {} + /** + * Callbacks provided by each slot allowing to remove blacklisted events + * declaration from the remote handlers. + */ + private _remoteIgnoredEventsCallbacks: + { [slotName: string]: () => void } = {} + /** * Requests that have been sent to the far end, but have yet to be fulfilled */ @@ -81,7 +89,7 @@ export class Transport { private _channelReady = false - constructor(private _channel: Channel) { + constructor(private _channel: Channel, ignoredEvents?: string[]) { this._channel.onData((message: TransportMessage) => { switch (message.type) { case 'request': @@ -94,6 +102,8 @@ export class Transport { return this._unregisterRemoteHandler(message) case 'error': return this._errorReceived(message) + case 'event_list': + return this._remoteIgnoredEventsReceived(message) default: assertNever(message) } @@ -107,6 +117,17 @@ export class Transport { this._channel.send(msg) }) }) + + // Also send the list of events this end is not interested in so the + // far end can know when to wait or not for this end to be ready + // when triggering a specific slot. This is necessary only when some + // events have been listed as ignored when calling createEventBus + if (ignoredEvents) { + this._channel.send({ + type: "event_list", + ignoredEvents + }) + } }) this._channel.onDisconnect(() => { this._channelReady = false @@ -122,6 +143,24 @@ export class Transport { this._channel.onError(e => this._rejectAllPendingRequests(e)) } + /** + * This event is triggered when events have been listed as ignored by the far + * end. It will call onRegister on ignored events' handlers to fake their + * registration so this end doesn't wait on the far end to have registered + * them to be able to trigger them. + */ + private _remoteIgnoredEventsReceived({ + ignoredEvents + }: TransportEventListMessage): void { + Object.keys(this._remoteIgnoredEventsCallbacks).forEach( + (slotName) => { + if (ignoredEvents.includes(slotName)) { + this._remoteIgnoredEventsCallbacks[slotName]() + } + } + ) + } + /** * When a request is received from the far end, call all the local subscribers, * and send either a response or an error mirroring the request id, @@ -298,6 +337,16 @@ export class Transport { } } + public addRemoteEventListChangedListener( + slotName: string, + eventListChangedListener: () => void + ): void { + if (!this._remoteIgnoredEventsCallbacks[slotName]) { + this._remoteIgnoredEventsCallbacks[slotName] = + eventListChangedListener + } + } + /** * Called when a local handler is registered, to send a `handler_registered` * message to the far end. diff --git a/src/index.ts b/src/index.ts index c3a7167..9777a82 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,9 @@ export { slot, Slot } from './Slot' -export { EventDeclaration, combineEvents, createEventBus } from './Events' +export { + EventDeclaration, + combineEvents, + createEventBus +} from './Events' export { Channel } from './Channel' export { GenericChannel } from './Channels/GenericChannel' export { ChunkedChannel } from './Channels/ChunkedChannel' diff --git a/test/Event.spec.ts b/test/Event.spec.ts index a1b2576..912212f 100644 --- a/test/Event.spec.ts +++ b/test/Event.spec.ts @@ -1,5 +1,5 @@ import { slot } from './../src/Slot' -import { combineEvents, createEventBus } from './../src/Events' +import { combineEvents, createEventBus, omitEvents } from './../src/Events' import { TestChannel } from './TestChannel' import { DEFAULT_PARAM } from './../src/Constants' @@ -44,7 +44,8 @@ describe('combineEvents()', () => { describe('createEventBus()', () => { const events = { - numberToString: slot() + numberToString: slot(), + eventToIgnore: slot() } const param = DEFAULT_PARAM @@ -101,4 +102,24 @@ describe('createEventBus()', () => { data: '5' }) }) + + describe('when a ignored list is passed to createEventBus()', () => { + it('should not connect the ignored slot and should return a filtered eventBus', () => { + const channel = new TestChannel() + const eventBus = createEventBus({ + events, + channels: [channel], + ignoredEvents: ['eventToIgnore'] + }) + channel.callConnected() + const isIncluded = Object.keys(eventBus).includes('eventToIgnore') + expect(isIncluded).toBe(false) + // An event_list message should have been received with the list of + // ignoredEvents + expect(channel.sendSpy).toHaveBeenCalledWith({ + type: 'event_list', + ignoredEvents: ['eventToIgnore'] + }) + }) + }) }) diff --git a/test/Slot.spec.ts b/test/Slot.spec.ts index 3296038..85c8fed 100644 --- a/test/Slot.spec.ts +++ b/test/Slot.spec.ts @@ -30,9 +30,7 @@ describe('slot', () => { }) describe('connectSlot', () => { - describe('without parameter', () => { - describe('trigger', () => { it('should use default parameter', async () => { const numberToString = connectSlot('numberToString', []) @@ -55,7 +53,6 @@ describe('connectSlot', () => { }) describe('with no transports', () => { - it('should call a single local handler registered for a parameter', async () => { const numberToString = connectSlot('numberToString', []) numberToString.on('a', num => `a${num.toString()}`) @@ -130,7 +127,6 @@ describe('connectSlot', () => { }) describe('with local and remote handlers', () => { - it('should call both local handlers and remote handlers', async () => { const { channel, transport } = makeTestTransport() const broadcastBool = connectSlot('broadcastBool', [transport]) @@ -140,6 +136,7 @@ describe('connectSlot', () => { // Handlers should not be called until a remote handler is registered await Promise.resolve() + expect(localCalled).toEqual(false) channel.fakeReceive({ @@ -321,4 +318,171 @@ describe('connectSlot', () => { }) }) }) + + describe('with two remote endpoints: A and B', () => { + describe('no ignoredEvents list sent by any endpoints', () => { + it('should wait for all remote endpoints to have signaled registration before sending the event', async () => { + const { channel: channelA, transport: transportA } = + makeTestTransport() + const { channel: channelB, transport: transportB } = + makeTestTransport() + const broadcastBool = connectSlot('broadcastBool', [ + transportA, + transportB, + ]) + + broadcastBool(true) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(channelA.sendSpy.mock.calls.length).toBe(0) + expect(channelB.sendSpy.mock.calls.length).toBe(0) + + // Endpoint A signals registration + channelA.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered', + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(channelA.sendSpy.mock.calls.length).toBe(0) + expect(channelB.sendSpy.mock.calls.length).toBe(0) + + // Endpoint B signals registration + channelB.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered', + }) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(channelA.sendSpy.mock.calls.length).toBe(1) + expect(channelB.sendSpy.mock.calls.length).toBe(1) + + const messageToA = + channelA.sendSpy.mock.calls[ + channelA.sendSpy.mock.calls.length - 1 + ][0] + expect(messageToA).toMatchObject({ + data: true, + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'request', + }) + + const messageToB = + channelB.sendSpy.mock.calls[ + channelB.sendSpy.mock.calls.length - 1 + ][0] + expect(messageToB).toMatchObject({ + data: true, + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'request', + }) + }) + }) + + describe('an ignoredEvents list is sent to one endpoint (A)', () => { + it('should NOT wait for remote endpoint A but SHOULD wait on remote endpoint B to have signaled registration before sending the event', async () => { + const { channel: channelA, transport: transportA } = + makeTestTransport() + const { channel: channelB, transport: transportB } = + makeTestTransport() + + const broadcastBool = connectSlot('broadcastBool', [ + transportA, + transportB, + ]) + + // Receiving a list of events to ignore + channelA.fakeReceive({ + type: 'event_list', + ignoredEvents: ['broadcastBool'], + }) + + // This will be called only when B is ready we don't care about + // A as his event white list is empty + let called = false + broadcastBool.on((_b) => { + called = true + }) + + broadcastBool(true) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should not fire as there B is not registered + expect(called).toBe(false) + + // Endpoint B signals registration + channelB.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered', + }) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(called).toBe(true) + }) + }) + + describe('an empty ignoredEvents list is sent to one endpoint (A)', () => { + // This is the same test as before. But this time the because the + // ignoredEvents list is empty, we need to wait for A AND B to been + // registered to trigger the event. + it('should wait for remote endpoint A and remote endpoint B to have signaled registration before sending the event', async () => { + const { channel: channelA, transport: transportA } = + makeTestTransport() + const { channel: channelB, transport: transportB } = + makeTestTransport() + + const broadcastBool = connectSlot('broadcastBool', [ + transportA, + transportB, + ]) + + // This will be called only when A and B are ready + let called = false + broadcastBool.on((_b) => { + called = true + }) + + channelA.fakeReceive({ + type: 'event_list', + ignoredEvents: [], + }) + + broadcastBool(true) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should not fire as none of A and B are registered + expect(called).toBe(false) + + // Endpoint A signals registration + channelA.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered', + }) + + // Should not fire as there B is not registered + expect(called).toBe(false) + + // Endpoint B signals registration + channelB.fakeReceive({ + param: DEFAULT_PARAM, + slotName: 'broadcastBool', + type: 'handler_registered', + }) + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should fire as A and B are registered + expect(called).toBe(true) + }) + }) + }) })