Skip to content

Commit

Permalink
Merge pull request #50 from Dashlane/feat/dont-await-for-non-interest…
Browse files Browse the repository at this point in the history
…ed-remotes

Feat/dont await for non interested remotes
  • Loading branch information
bastienGranger authored Jul 22, 2022
2 parents 94562ca + b3c8216 commit 8f9bd3a
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 27 deletions.
28 changes: 23 additions & 5 deletions src/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,33 @@ export function combineEvents<
return Object.assign({}, ...args)
}

export function createEventBus<C extends EventDeclaration>(args: { events: C, channels?: Channel[] }): C {
const transports = (args.channels || []).map(c => new Transport(c))
export function createEventBus<
C extends EventDeclaration,
T extends Array<keyof C>
>(args: {
events: C;
channels?: Channel[];
ignoredEvents?: T;
}): Omit<C, T[number]> {

const transports = (args.channels || []).map(
(c) => new Transport(c, (args.ignoredEvents as string[]))
)

const eventBus: Partial<C> = {}
for (const event in args.events) {
if (args.events.hasOwnProperty(event)) {
eventBus[event] = (connectSlot(event, transports, args.events[event].config) as C[Extract<keyof C, string>])
if (
args.events.hasOwnProperty(event) &&
(!args.ignoredEvents ||
(args.ignoredEvents && !args.ignoredEvents.includes(event)))
) {
eventBus[event] = connectSlot(
event,
transports,
args.events[event].config
) as C[Extract<keyof C, string>]
}
}

return eventBus as C
return eventBus as Omit<C, T[number]>
}
9 changes: 9 additions & 0 deletions src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,25 @@ export type TransportRegistrationMessage = {
slotName: string,
param: string
}

export type TransportUnregistrationMessage = {
type: 'handler_unregistered',
slotName: string,
param: string
}

export type TransportEventListMessage = {
type: 'event_list',
ignoredEvents: string[]
}

export type TransportMessage =
TransportError
| TransportRegistrationMessage
| TransportRequest
| TransportResponse
| TransportUnregistrationMessage
| TransportEventListMessage

export function isTransportMessage(m: { type: string }): m is TransportMessage {
switch (m.type) {
Expand All @@ -47,6 +55,7 @@ export function isTransportMessage(m: { type: string }): m is TransportMessage {
case 'error':
case 'handler_unregistered':
case 'handler_registered':
case 'event_list':
return true
default:
return false
Expand Down
79 changes: 66 additions & 13 deletions src/Slot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,34 @@ export function connectSlot<T = void, T2 = void>(

// Signal to all transports that we will accept handlers for this slotName
transports.forEach((transport, transportKey) => {

const remoteHandlerRegistered = (
param = DEFAULT_PARAM,
handler: Handler<any, any>
) => {
// If the remote end of the communication channel had blacklisted an
// event but is now trying to register a handler. We ignore it and
// consider the blacklist to be the source of truth
if (!remoteHandlersConnected[transportKey]) {
return
}

// Store handler
const paramHandlers = handlers[transportKey][param] || []
handlers[transportKey][param] = paramHandlers.concat(handler)

// Call lazy callbacks if needed
if (getParamHandlers(param, handlers).length === 1) callLazyConnectCallbacks(param)
if (getParamHandlers(param, handlers).length === 1) {
callLazyConnectCallbacks(param)
}

// Release potential buffered events
if (!remoteHandlersConnected[transportKey][param]) {
awaitHandlerRegistration(String(transportKey), param)
}

// call onRegister callback on slot for each transport. It will
// release the event once triggered. If one is not registered then
// event will not be sent.
remoteHandlersConnected[transportKey][param].onRegister()
}

Expand All @@ -191,13 +202,50 @@ export function connectSlot<T = void, T2 = void>(
) => {
const paramHandlers = handlers[transportKey][param] || []
const handlerIndex = paramHandlers.indexOf(handler)
if (handlerIndex > -1) handlers[transportKey][param].splice(handlerIndex, 1)
if (getParamHandlers(param, handlers).length === 0) callLazyDisonnectCallbacks(param)
awaitHandlerRegistration(String(transportKey), param)

if (handlerIndex > -1)
handlers[transportKey][param].splice(handlerIndex, 1)

if (getParamHandlers(param, handlers).length === 0)
callLazyDisonnectCallbacks(param)

if (remoteHandlersConnected[transportKey])
awaitHandlerRegistration(String(transportKey), param)

}

transport.addRemoteHandlerRegistrationCallback(slotName, remoteHandlerRegistered)
transport.addRemoteHandlerUnregistrationCallback(slotName, remoteHandlerUnregistered)
const remoteEventListChangedListener = () => {
// Because the remote end communicated a blacklist of event it does
// not want to listen, and because the local end has registered a
// handler for the remote end for this blacklisted events. The local
// ends need to:
// 1 - resolve the onRegister promise
// 2 - remove the useless handler from the remote handlers list
if (remoteHandlersConnected[transportKey]) {
Object.keys(remoteHandlersConnected[transportKey]).forEach(
(param) => {
remoteHandlersConnected[transportKey][
param
].onRegister()
}
)
}
delete remoteHandlersConnected[transportKey]
}

transport.addRemoteHandlerRegistrationCallback(
slotName,
remoteHandlerRegistered
)
transport.addRemoteHandlerUnregistrationCallback(
slotName,
remoteHandlerUnregistered
)

transport.addRemoteEventListChangedListener(
slotName,
remoteEventListChangedListener
)
})

/*
Expand Down Expand Up @@ -255,20 +303,25 @@ export function connectSlot<T = void, T2 = void>(
return Promise.all(transportConnectionPromises).then(() => {
return callHandlersWithParameters()
})
}

else {
} else {
transports.forEach((_t, transportKey) => {
if (!remoteHandlersConnected[transportKey][param]) {
if (
remoteHandlersConnected[transportKey] &&
!remoteHandlersConnected[transportKey][param]
) {
awaitHandlerRegistration(String(transportKey), param)
}
})

const transportPromises: Promise<void>[] = transports.reduce(
(acc, _t, transportKey) => [
...acc,
remoteHandlersConnected[transportKey][param].registered
], []
...((remoteHandlersConnected[transportKey] && [
remoteHandlersConnected[transportKey][param].registered
]) ??
[])
],
[]
)

return Promise.all(transportPromises).then(() => {
Expand Down
53 changes: 51 additions & 2 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
TransportRegistrationMessage,
TransportUnregistrationMessage,
TransportResponse,
TransportRequest
TransportRequest,
TransportEventListMessage
} from './Message'

let _ID = 0
Expand Down Expand Up @@ -74,14 +75,21 @@ export class Transport {
private _remoteHandlerDeletionCallbacks:
{ [slotName: string]: RemoteHandlerCallback } = {}

/**
* Callbacks provided by each slot allowing to remove blacklisted events
* declaration from the remote handlers.
*/
private _remoteIgnoredEventsCallbacks:
{ [slotName: string]: () => void } = {}

/**
* Requests that have been sent to the far end, but have yet to be fulfilled
*/
private _pendingRequests: { [param: string]: PendingRequests } = {}

private _channelReady = false

constructor(private _channel: Channel) {
constructor(private _channel: Channel, ignoredEvents?: string[]) {
this._channel.onData((message: TransportMessage) => {
switch (message.type) {
case 'request':
Expand All @@ -94,6 +102,8 @@ export class Transport {
return this._unregisterRemoteHandler(message)
case 'error':
return this._errorReceived(message)
case 'event_list':
return this._remoteIgnoredEventsReceived(message)
default:
assertNever(message)
}
Expand All @@ -107,6 +117,17 @@ export class Transport {
this._channel.send(msg)
})
})

// Also send the list of events this end is not interested in so the
// far end can know when to wait or not for this end to be ready
// when triggering a specific slot. This is necessary only when some
// events have been listed as ignored when calling createEventBus
if (ignoredEvents) {
this._channel.send({
type: "event_list",
ignoredEvents
})
}
})
this._channel.onDisconnect(() => {
this._channelReady = false
Expand All @@ -122,6 +143,24 @@ export class Transport {
this._channel.onError(e => this._rejectAllPendingRequests(e))
}

/**
* This event is triggered when events have been listed as ignored by the far
* end. It will call onRegister on ignored events' handlers to fake their
* registration so this end doesn't wait on the far end to have registered
* them to be able to trigger them.
*/
private _remoteIgnoredEventsReceived({
ignoredEvents
}: TransportEventListMessage): void {
Object.keys(this._remoteIgnoredEventsCallbacks).forEach(
(slotName) => {
if (ignoredEvents.includes(slotName)) {
this._remoteIgnoredEventsCallbacks[slotName]()
}
}
)
}

/**
* When a request is received from the far end, call all the local subscribers,
* and send either a response or an error mirroring the request id,
Expand Down Expand Up @@ -298,6 +337,16 @@ export class Transport {
}
}

public addRemoteEventListChangedListener(
slotName: string,
eventListChangedListener: () => void
): void {
if (!this._remoteIgnoredEventsCallbacks[slotName]) {
this._remoteIgnoredEventsCallbacks[slotName] =
eventListChangedListener
}
}

/**
* Called when a local handler is registered, to send a `handler_registered`
* message to the far end.
Expand Down
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export { slot, Slot } from './Slot'
export { EventDeclaration, combineEvents, createEventBus } from './Events'
export {
EventDeclaration,
combineEvents,
createEventBus
} from './Events'
export { Channel } from './Channel'
export { GenericChannel } from './Channels/GenericChannel'
export { ChunkedChannel } from './Channels/ChunkedChannel'
Expand Down
25 changes: 23 additions & 2 deletions test/Event.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { slot } from './../src/Slot'
import { combineEvents, createEventBus } from './../src/Events'
import { combineEvents, createEventBus, omitEvents } from './../src/Events'
import { TestChannel } from './TestChannel'
import { DEFAULT_PARAM } from './../src/Constants'

Expand Down Expand Up @@ -44,7 +44,8 @@ describe('combineEvents()', () => {
describe('createEventBus()', () => {

const events = {
numberToString: slot<number, string>()
numberToString: slot<number, string>(),
eventToIgnore: slot<void, void>()
}

const param = DEFAULT_PARAM
Expand Down Expand Up @@ -101,4 +102,24 @@ describe('createEventBus()', () => {
data: '5'
})
})

describe('when a ignored list is passed to createEventBus()', () => {
it('should not connect the ignored slot and should return a filtered eventBus', () => {
const channel = new TestChannel()
const eventBus = createEventBus({
events,
channels: [channel],
ignoredEvents: ['eventToIgnore']
})
channel.callConnected()
const isIncluded = Object.keys(eventBus).includes('eventToIgnore')
expect(isIncluded).toBe(false)
// An event_list message should have been received with the list of
// ignoredEvents
expect(channel.sendSpy).toHaveBeenCalledWith({
type: 'event_list',
ignoredEvents: ['eventToIgnore']
})
})
})
})
Loading

0 comments on commit 8f9bd3a

Please sign in to comment.