Skip to content

Commit

Permalink
feat(channel): addition of a chunked channel
Browse files Browse the repository at this point in the history
A new channel implementation has been created to allow sending &
receiving messages in chunks for underlying channel implementations that
cannot sustain large messages.

BREAKING CHANGES:
* GenericChannel now has a dedicate constructor which accepts an
optional timeout argumet that defaults to 5000ms
* GenericChannel's `_timeout` is now private, and a `timeout` getter is
available
  • Loading branch information
pyrho committed Nov 15, 2018
1 parent 88de635 commit 8b9f16e
Show file tree
Hide file tree
Showing 10 changed files with 939 additions and 61 deletions.
53 changes: 1 addition & 52 deletions src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,10 @@ import { TransportMessage } from './Message'
export type OnMessageCallback = (message: {}) => void

export interface Channel {
timeout?: number
timeout: number
send: (message: TransportMessage) => void
onData: (cb: OnMessageCallback) => void
onConnect: (cb: () => void) => void
onDisconnect: (cb: () => void) => void
onError: (cb: (e: Error) => void) => void
}

export abstract class GenericChannel implements Channel {

public timeout?: number

private _onMessageCallbacks: OnMessageCallback[] = []
private _onConnectCallbacks: Function[] = []
private _onDisconnectCallbacks: Function[] = []
private _onErrorCallbacks: Function[] = []
private _ready = false
public abstract send(message: TransportMessage): void

public onData(cb: OnMessageCallback): void {
if (this._onMessageCallbacks.indexOf(cb) === -1) {
this._onMessageCallbacks.push(cb)
}
}

public onConnect(cb: Function): void {
if (this._ready) {
cb()
}
this._onConnectCallbacks.push(cb)
}

public onDisconnect(cb: Function): void {
this._onDisconnectCallbacks.push(cb)
}

public onError(cb: Function): void {
this._onErrorCallbacks.push(cb)
}

protected _messageReceived(message: TransportMessage) {
this._onMessageCallbacks.forEach(cb => cb(message))
}

protected _error(error: any) {
this._onErrorCallbacks.forEach(cb => cb(error))
}

protected _connected() {
this._ready = true
this._onConnectCallbacks.forEach(cb => cb())
}

protected _disconnected() {
this._ready = false
this._onDisconnectCallbacks.forEach(cb => cb())
}
}
229 changes: 229 additions & 0 deletions src/Channels/ChunkedChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import { GenericChannel } from './GenericChannel'
import { TransportMessage, isTransportMessage } from '../Message'

export type ChunkedMessageStart = { type: 'chunk_start', chunkId: string, size: number }
export type ChunkedMessage = { type: 'chunk_data', chunkId: string, data: any }
export type ChunkedMessageEnd = { type: 'chunk_end', chunkId: string }
export type ChunkedTransportMessage = ChunkedMessageEnd | ChunkedMessageStart | ChunkedMessage

/**
* A chunk is a array of bytes.
* It is stored as a array of numbers and is manipulated using a Uint16Array.
*/
type Chunk = number[]

interface ChunkBuffer {
[chunkId: string]: {
id: string
chunks: Chunk[]
size: number
}
}

const utils = {

getRandomId: () => [...Array(30)].map(() => Math.random().toString(36)[3]).join(''),

str2byteArray: (str: string) => {
const bufView = new Uint16Array(str.length)
for (let i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i)
}

return bufView
},

convertUintArrayToString: (a: Uint16Array, maxStringAlloc: number) => {
if (maxStringAlloc === -1) {
return String.fromCharCode.apply(null, a)
} else {
let result = ''
for (let i = 0; i < a.length; i += maxStringAlloc) {
if (i + maxStringAlloc > a.length) {
result += String.fromCharCode.apply(null, a.subarray(i))
} else {
result += String.fromCharCode.apply(null, a.subarray(i, i + maxStringAlloc))
}
}
return result

}
},

checkForChunkId: (message: ChunkedTransportMessage) => {
if (!message.chunkId) {
throw new Error(`ChunkedMessage did not have a chunkId: ${JSON.stringify(message)}`)
}
}

}

export interface ChunkedChannelConstructorOptions {
chunkSize: number
sender: (m: TransportMessage) => void
timeout?: number
maxStringAlloc?: number
}

/**
* Overrides the `send` and `_messageReceived` methods of the GenericChannel class
* to offer transparent message chunking over a fragile underlying channel.
*/
export class ChunkedChannel extends GenericChannel {
constructor(opts: ChunkedChannelConstructorOptions) {
super(opts.timeout)
this._chunkSize = opts.chunkSize
this._sender = opts.sender
this._maxStringAlloc = opts.maxStringAlloc || -1
}

/**
* The size of the data array in each chunk.
* Note that the total "size" of the message will be larger
* because of the chunking metadata.
*/
private _chunkSize: number

/**
* Defines the maximum string length that will be allocated at once when
* merging the buffered chunks into the original string.
* This is only needed if the environment where this instance is running applies restriction
* on memory for string allocation.
* Omitting to set this will just create the string from the chunks in one go.
*/
private _maxStringAlloc: number

/** The actual sending via the underlying channel (eg. websocket) */
protected _sender: (m: TransportMessage | ChunkedTransportMessage) => void

/** Stores chunks pending flush */
private _buffer: ChunkBuffer = {}

/**
* This method override will chunk messages so that an array of no more than
* `chunkSize` bytes (excluding internal metadata) will be sent for each call
* to a given slot.
*/
public send(message: TransportMessage) {
const stringified = JSON.stringify(message)
if (stringified.length <= this._chunkSize) {
this._sender(message)
return
}

const messageAsByteArray = utils.str2byteArray(stringified)
const chunkId = utils.getRandomId()

this._sender({
type: 'chunk_start',
chunkId,
size: stringified.length
})

const sendChunks = (start = 0) => {
let chunk = messageAsByteArray.slice(start, start + this._chunkSize)
if (chunk.length) {
this._sender({
type: 'chunk_data',
chunkId,

// To avoid having the underlying channel implemetation interpret/cast
// the UintArray into something else, we explicitely send an array
data: Array.from(chunk)
})
sendChunks(start + this._chunkSize)
}
}
sendChunks()

this._sender({
type: 'chunk_end',
chunkId
})

}

/**
* When a message is received on this channel, either it has been chunked because its original size
* was greater than the chunkSize in which case it will be a `ChunkedTransportMessage`,
* or it was small enough so that it could be sent un chunked in which
* case it will be a plain `TransportMessage`.
*/
protected _messageReceived(message: TransportMessage | ChunkedTransportMessage) {

switch (message.type) {
case 'chunk_start':
this._receiveNewChunk(message)
break

case 'chunk_data':
this._receiveChunkData(message)
break

case 'chunk_end':
const decodedMessage: TransportMessage = this._mergeChunks(message)
super._messageReceived(decodedMessage)
break

default:
// If the message is small enough, it won't be chunked before sending
// so it won't need merging/buffering here
super._messageReceived(message as TransportMessage)
}

}

private _receiveNewChunk(message: ChunkedMessageStart) {
utils.checkForChunkId(message)
if (this._buffer[message.chunkId]) {
throw new Error(`There was already an entry in the buffer for chunkId ${message.chunkId}`)
}

this._buffer[message.chunkId] = {
id: message.chunkId,
chunks: [],
size: message.size
}
}

private _receiveChunkData(message: ChunkedMessage) {
utils.checkForChunkId(message)
if (!this._buffer[message.chunkId]) {
throw new Error(`ChunkId ${message.chunkId} was not found in the buffer`)
}

this._buffer[message.chunkId].chunks.push(message.data)
}

private _mergeChunks(message: ChunkedMessageEnd): TransportMessage {
utils.checkForChunkId(message)
if (!this._buffer[message.chunkId]) {
throw new Error(`ChunkId ${message.chunkId} was not found in the buffer`)
}

// Store all the chunks into one Uint16Array
const mergedChunks = this._buffer[message.chunkId].chunks.reduce((d, chunk, ix) => {
chunk.forEach((byte, i) => d.uintArray[d.currentIx + i] = byte)
d.currentIx += chunk.length
return d
}, { uintArray: new Uint16Array(this._buffer[message.chunkId].size), currentIx: 0 })

let transportMessage: TransportMessage

// Then rebuild the object from the merged chunk, now stored as one string
const dataAsString = utils.convertUintArrayToString(mergedChunks.uintArray, this._maxStringAlloc)
try {
transportMessage = JSON.parse(dataAsString) as TransportMessage
} catch (e) {
throw new Error(`Not a valid JSON string: ${dataAsString}`)
}

if (!isTransportMessage(transportMessage)) {
throw new Error(`Not a transport message: ${JSON.stringify(transportMessage)}`)
}

return transportMessage
}

}

60 changes: 60 additions & 0 deletions src/Channels/GenericChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Channel, OnMessageCallback } from '../Channel'
import { TransportMessage } from '../Message'

const DEFAULT_TIMEOUT = 5000
export abstract class GenericChannel implements Channel {

public constructor(private _timeout = DEFAULT_TIMEOUT) { }

get timeout() {
return this._timeout
}

protected _onMessageCallbacks: OnMessageCallback[] = []
private _onConnectCallbacks: Function[] = []
private _onDisconnectCallbacks: Function[] = []
private _onErrorCallbacks: Function[] = []
private _ready = false

public abstract send(message: TransportMessage): void

public onData(cb: OnMessageCallback): void {
if (this._onMessageCallbacks.indexOf(cb) === -1) {
this._onMessageCallbacks.push(cb)
}
}

public onConnect(cb: Function): void {
if (this._ready) {
cb()
}
this._onConnectCallbacks.push(cb)
}

public onDisconnect(cb: Function): void {
this._onDisconnectCallbacks.push(cb)
}

public onError(cb: Function): void {
this._onErrorCallbacks.push(cb)
}

protected _messageReceived(message: TransportMessage) {
this._onMessageCallbacks.forEach(cb => cb(message))
}

protected _error(error: any) {
this._onErrorCallbacks.forEach(cb => cb(error))
}

protected _connected() {
this._ready = true
this._onConnectCallbacks.forEach(cb => cb())
}

protected _disconnected() {
this._ready = false
this._onDisconnectCallbacks.forEach(cb => cb())
}
}

14 changes: 13 additions & 1 deletion src/Message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

export type TransportRequest = { type: 'request', slotName: string, id: string, data: any }
export type TransportResponse = { type: 'response', slotName: string, id: string, data: any }
export type TransportError = { type: 'error', slotName: string, id: string, message: string, stack?: string }
Expand All @@ -10,3 +9,16 @@ export type TransportMessage =
| TransportRequest
| TransportResponse
| TransportError

export function isTransportMessage(m: { type: string }): m is TransportMessage {
switch (m.type) {
case 'request':
case 'response':
case 'error':
case 'handler_unregistered':
case 'handler_registered':
return true
default:
return false
}
}
4 changes: 1 addition & 3 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ const assertNever = (a: never) => {
throw new Error(`Should not happen: ${a}`)
}

const DEFAULT_TIMEOUT = 5000

const ERRORS = {
TIMED_OUT: 'TIMED_OUT',
REMOTE_CONNECTION_CLOSED: 'REMOTE_CONNECTION_CLOSED',
Expand Down Expand Up @@ -209,7 +207,7 @@ export class Transport {
this._pendingRequests[slotName][id].reject(new Error(`${ERRORS.TIMED_OUT} on ${slotName}`))
delete this._pendingRequests[slotName][id]
}
}, this._channel.timeout || DEFAULT_TIMEOUT)
}, this._channel.timeout)
})
this._remoteHandlers[slotName] = remoteHandler
addHandler(remoteHandler)
Expand Down
Loading

0 comments on commit 8b9f16e

Please sign in to comment.