Skip to content

Commit

Permalink
to native Node streams.
Browse files Browse the repository at this point in the history
I failed to handle backpressure in Node's Web Streams API.
  • Loading branch information
mrluanma committed Mar 14, 2024
1 parent ea90872 commit bfe0c20
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 67 deletions.
64 changes: 31 additions & 33 deletions local.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import WebSocket, {createWebSocketStream} from 'ws';
import parseArgs from 'minimist';
import {HttpsProxyAgent} from 'https-proxy-agent';
import {Encryptor} from './encrypt.js';
import {inetNtoa, createTransformStream} from './utils.js';
import {Duplex} from 'node:stream';
import {inetNtoa, createTransform} from './utils.js';
import {pipeline} from 'node:stream/promises';

const options = {
alias: {
Expand Down Expand Up @@ -70,7 +70,7 @@ const getServer = function () {
}
};

var server = net.createServer(async (connection) => {
var server = net.createServer(async (conn) => {
console.log('local connected');
server.getConnections(function (err, count) {
console.log('concurrent connections:', count);
Expand All @@ -81,18 +81,23 @@ var server = net.createServer(async (connection) => {
let remotePort = null;
let addrToSend = '';
const aServer = getServer();
const conn = Duplex.toWeb(connection);
const reader = conn.readable.getReader();

await new Promise((resolve, reject) => {
conn.once('readable', resolve);
});

// handshake
let {value: data} = await reader.read();
const writer = conn.writable.getWriter();
await writer.write(Buffer.from([5, 0]));
let data = await conn.read();
conn.write(Buffer.from([5, 0]));

const nextCmd = data.indexOf(5, 1);
if (nextCmd !== -1) {
data = data.subarray(nextCmd);
} else {
data = (await reader.read()).value;
await new Promise((resolve, reject) => {
conn.once('readable', resolve);
});
data = await conn.read();
}
// +----+-----+-------+------+----------+----------+
// |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT |
Expand All @@ -102,7 +107,7 @@ var server = net.createServer(async (connection) => {

let headerLength = 5;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
const cmd = data[1];
Expand All @@ -111,12 +116,12 @@ var server = net.createServer(async (connection) => {
console.log('unsupported cmd:', cmd);
const reply = Buffer.from('\u0005\u0007\u0000\u0001', 'binary');
writer.write(reply);
reader.cancel();
conn.end();
return;
}
if (![1, 3, 4].includes(addrtype)) {
console.log('unsupported addrtype:', addrtype);
reader.cancel();
conn.end();
return;
}
addrToSend = data.subarray(3, 4).toString('binary');
Expand All @@ -126,7 +131,7 @@ var server = net.createServer(async (connection) => {
// ipv4
headerLength = 4 + 4 + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = inetNtoa(4, data.subarray(4, 8));
Expand All @@ -136,7 +141,7 @@ var server = net.createServer(async (connection) => {
// ipv6
headerLength = 4 + 16 + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = inetNtoa(6, Buffer.from(data.subarray(4, 20)));
Expand All @@ -146,7 +151,7 @@ var server = net.createServer(async (connection) => {
const addrLen = data[4];
headerLength = 5 + addrLen + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = new TextDecoder().decode(data.subarray(5, 5 + addrLen));
Expand All @@ -157,7 +162,7 @@ var server = net.createServer(async (connection) => {
buf.write('\u0005\u0000\u0000\u0001', 0, 4, 'binary');
buf.write('\u0000\u0000\u0000\u0000', 4, 4, 'binary');
buf.writeUInt16BE(remotePort, 8);
writer.write(buf);
conn.write(buf);
// connect to remote server
// ws = new WebSocket aServer, protocol: "binary"

Expand Down Expand Up @@ -186,25 +191,18 @@ var server = net.createServer(async (connection) => {
});
}

reader.releaseLock();
writer.releaseLock();

const wss = Duplex.toWeb(createWebSocketStream(ws));
const wss = createWebSocketStream(ws);
console.log(`connecting ${remoteAddr} via ${aServer}`);

conn.readable
.pipeThrough(
createTransformStream(
encryptor.encrypt.bind(encryptor),
data.subarray(3),
),
)
.pipeTo(wss.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`local: ${e}`));
wss.readable
.pipeThrough(createTransformStream(encryptor.decrypt.bind(encryptor)))
.pipeTo(conn.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`local: ${e}`));
const writable = createTransform(encryptor.encrypt.bind(encryptor));
writable.pipe(wss);
writable.write(data.subarray(3));
pipeline(conn, writable).catch(
(e) => e.name !== 'AbortError' && console.error(`local: ${e}`),
);
pipeline(wss, createTransform(encryptor.decrypt.bind(encryptor)), conn).catch(
(e) => e.name !== 'AbortError' && console.error(`local: ${e}`),
);
});

server.listen(PORT, LOCAL_ADDRESS, function () {
Expand Down
50 changes: 25 additions & 25 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import http from 'http';
import {WebSocketServer, createWebSocketStream} from 'ws';
import parseArgs from 'minimist';
import {Encryptor} from './encrypt.js';
import {inetNtoa, createTransformStream} from './utils.js';
import {Duplex} from 'node:stream';
import {inetNtoa, createTransform} from './utils.js';
import {pipeline} from 'node:stream/promises';

const options = {
alias: {
Expand Down Expand Up @@ -64,30 +64,33 @@ wsserver.on('connection', async (ws) => {
let remoteAddr;
let remotePort;

const wss = Duplex.toWeb(createWebSocketStream(ws));
const readable = wss.readable.pipeThrough(
createTransformStream(encryptor.decrypt.bind(encryptor)),
const conn = createWebSocketStream(ws);
const readable = conn.pipe(
createTransform(encryptor.decrypt.bind(encryptor)),
);
const reader = readable.getReader();
const {value: data} = await reader.read();
await new Promise((resolve, reject) => {
readable.once('readable', resolve);
});

const data = await readable.read();

let headerLength = 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
const addrtype = data[0];
if (![1, 3, 4].includes(addrtype)) {
console.warn(`unsupported addrtype: ${addrtype}`);
reader.cancel();
conn.end();
return;
}
// read address and port
if (addrtype === 1) {
// ipv4
headerLength = 1 + 4 + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = inetNtoa(4, data.subarray(1, 5));
Expand All @@ -96,7 +99,7 @@ wsserver.on('connection', async (ws) => {
// ipv6
headerLength = 1 + 16 + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = inetNtoa(6, data.subarray(1, 17));
Expand All @@ -105,30 +108,27 @@ wsserver.on('connection', async (ws) => {
let addrLen = data[1];
headerLength = 2 + addrLen + 2;
if (data.length < headerLength) {
reader.cancel();
conn.end();
return;
}
remoteAddr = data.subarray(2, 2 + addrLen).toString('binary');
remotePort = data.readUInt16BE(2 + addrLen);
}

const remote = Duplex.toWeb(net.connect(remotePort, remoteAddr));
const remote = net.connect(remotePort, remoteAddr);
console.log('connecting', remoteAddr);
const writer = remote.writable.getWriter();
if (data.length > headerLength) {
await writer.write(data.subarray(headerLength));
remote.write(data.subarray(headerLength));
}

reader.releaseLock();
writer.releaseLock();

readable
.pipeTo(remote.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`server: ${e}`));
remote.readable
.pipeThrough(createTransformStream(encryptor.encrypt.bind(encryptor)))
.pipeTo(wss.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`server: ${e}`));
pipeline(readable, remote).catch(
(e) => e.name !== 'AbortError' && console.error(`server: ${e}`),
);
pipeline(
remote,
createTransform(encryptor.encrypt.bind(encryptor)),
conn,
).catch((e) => e.name !== 'AbortError' && console.error(`server: ${e}`));
});

server.listen(PORT, LOCAL_ADDRESS, function () {
Expand Down
15 changes: 6 additions & 9 deletions utils.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {Transform} from 'node:stream';

export function inetNtoa(family, buf) {
if (family === 4) return buf[0] + '.' + buf[1] + '.' + buf[2] + '.' + buf[3];
else if (family === 6) {
Expand All @@ -23,15 +25,10 @@ export function memoize(func) {
};
}

export function createTransformStream(withFn, initial) {
return new TransformStream({
start(controller) {
if (initial) {
controller.enqueue(withFn(initial));
}
},
async transform(chunk, controller) {
controller.enqueue(withFn(chunk));
export function createTransform(withFn) {
return new Transform({
transform(chunk, encoding, callback) {
callback(null, withFn(chunk));
},
});
}

0 comments on commit bfe0c20

Please sign in to comment.