Skip to content

Commit

Permalink
Merge pull request #3 from exposr/buffer-initial-writes
Browse files Browse the repository at this point in the history
WebSocketMultiplexSocket: buffer calls to _write/_writev during opening
  • Loading branch information
fredriklindberg authored Dec 6, 2023
2 parents 119966c + 144601f commit 383393d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
27 changes: 25 additions & 2 deletions src/ws-multiplex-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ export class WebSocketMultiplexSocket extends Duplex {
private readBufferSize: number;
private wantData: boolean = false;

private writeBuffer: Array<{ channel: number, data: Buffer | Array<Buffer>, callback: (error?: Error) => void }>;
private writer: (channel: number, data: Buffer | Array<Buffer>, callback: (error?: Error) => void) => void;

public connecting: boolean;
public pending: boolean;
public readyState: SocketReadyState;
Expand Down Expand Up @@ -69,6 +72,8 @@ export class WebSocketMultiplexSocket extends Duplex {
this.constructCallback = undefined;
this.readBuffer = [];
this.readBufferSize = 0;
this.writeBuffer = [];
this.writer = this.bufferedWrite;

Object.defineProperty(this, "bytesWritten", {
get() {
Expand All @@ -90,7 +95,11 @@ export class WebSocketMultiplexSocket extends Duplex {
this.pending = false;
this.readyState = "open";

this.flushWriteBuffer();
this.writer = this.wsm.send.bind(this.wsm);

typeof this.constructCallback == 'function' && this.constructCallback();

this.emit('connect');
this.emit('ready');
this.resetTimeout();
Expand Down Expand Up @@ -204,9 +213,23 @@ export class WebSocketMultiplexSocket extends Duplex {
return this;
}

private bufferedWrite(channel: number, data: Buffer | Array<Buffer>, callback: (error?: Error) => void): void {
this.writeBuffer.push({channel, data, callback});
}

private flushWriteBuffer(): void {
while (true) {
const buffer = this.writeBuffer.shift();
if (!buffer) {
break;
}
this.wsm.send(buffer.channel, buffer.data, buffer.callback);
}
}

_write(data: Buffer, encoding: BufferEncoding, callback: (error?: Error) => void): void {
assert(this._destroyed == false, "_write on destroyed");
this.wsm.send(<number>this.channel, data, callback);
this.writer(this.channel, data, callback);
this.resetTimeout();
}

Expand All @@ -216,7 +239,7 @@ export class WebSocketMultiplexSocket extends Duplex {
for (const item of chunks) {
buffers.push(item.chunk)
}
this.wsm.send(<number>this.channel, buffers, callback);
this.writer(this.channel, buffers, callback);
this.resetTimeout();
}

Expand Down
1 change: 0 additions & 1 deletion test/ws-multiplex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { WebSocketMultiplex } from '../src/ws-multiplex'
import { WebSocketMultiplexError, WebSocketMultiplexErrorCode } from '../src/ws-multiplex-error';
import { WebSocketMultiplexSocket } from '../src/ws-multiplex-socket';
import EventEmitter from 'events';
import { setTimeout } from 'timers/promises';

describe('ws-multiplex', () => {

Expand Down

0 comments on commit 383393d

Please sign in to comment.