Skip to content

Commit

Permalink
streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mrluanma committed Mar 13, 2024
1 parent e63904b commit 31c8998
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 296 deletions.
296 changes: 122 additions & 174 deletions local.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import net from 'net';
import url from 'url';
import fs from 'fs';
import WebSocket from 'ws';
import WebSocket, {createWebSocketStream} from 'ws';
import parseArgs from 'minimist';
import {HttpsProxyAgent} from 'https-proxy-agent';
import {Encryptor} from './encrypt.js';
import {inetNtoa} from './utils.js';
import {inetNtoa, createTransformStream} from './utils.js';
import {Duplex} from 'node:stream';

const options = {
alias: {
Expand Down Expand Up @@ -69,195 +71,141 @@ const getServer = function () {
}
};

var server = net.createServer(function (connection) {
var server = net.createServer(async (connection) => {
console.log('local connected');
server.getConnections(function (err, count) {
console.log('concurrent connections:', count);
});
const encryptor = new Encryptor(KEY, METHOD);
let stage = 0;
let cachedPieces = [];
let ws = null;
let ws;
let remoteAddr = null;
let remotePort = null;
let addrToSend = '';
const aServer = getServer();
connection.on('data', function (data) {
if (stage === 5) {
// pipe sockets
data = encryptor.encrypt(data);
if (ws.readyState === WebSocket.OPEN) {
ws.send(data, {binary: true});
}
const conn = Duplex.toWeb(connection);
const reader = conn.readable.getReader();
// handshake
let {value: data} = await reader.read();
const writer = conn.writable.getWriter();
await writer.write(Buffer.from([5, 0]));

const nextCmd = data.indexOf(5, 1);
if (nextCmd !== -1) {
data = data.subarray(nextCmd);
} else {
data = (await reader.read()).value;
}
// +----+-----+-------+------+----------+----------+
// |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT |
// +----+-----+-------+------+----------+----------+
// | 1 | 1 | X'00' | 1 | Variable | 2 |
// +----+-----+-------+------+----------+----------+

let headerLength = 5;
if (data.length < headerLength) {
reader.cancel();
return;
}
const cmd = data[1];
const addrtype = data[3];
if (cmd !== 1) {
console.log('unsupported cmd:', cmd);
const reply = Buffer.from('\u0005\u0007\u0000\u0001', 'binary');
writer.write(reply);
reader.cancel();
return;
}
if (![1, 3, 4].includes(addrtype)) {
console.log('unsupported addrtype:', addrtype);
reader.cancel();
return;
}
addrToSend = data.subarray(3, 4).toString('binary');

// read address and port
if (addrtype === 1) {
// ipv4
headerLength = 4 + 4 + 2;
if (data.length < headerLength) {
reader.cancel();
return;
}
if (stage === 0) {
connection.write(Buffer.from([5, 0]));
stage = 1;
remoteAddr = inetNtoa(4, data.subarray(4, 8));
addrToSend += data.subarray(4, 10).toString('binary');
remotePort = new DataView(data.buffer).getUint16(8);
} else if (addrtype === 4) {
// ipv6
headerLength = 4 + 16 + 2;
if (data.length < headerLength) {
reader.cancel();
return;
}
if (stage === 1) {
// +----+-----+-------+------+----------+----------+
// |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT |
// +----+-----+-------+------+----------+----------+
// | 1 | 1 | X'00' | 1 | Variable | 2 |
// +----+-----+-------+------+----------+----------+

let headerLength = 5;

if (data.length < headerLength) {
connection.end();
return;
}
const cmd = data[1];
const addrtype = data[3];
if (cmd !== 1) {
console.log('unsupported cmd:', cmd);
const reply = Buffer.from('\u0005\u0007\u0000\u0001', 'binary');
connection.end(reply);
return;
}
if (![1, 3, 4].includes(addrtype)) {
console.log('unsupported addrtype:', addrtype);
connection.end();
return;
}
addrToSend = data.subarray(3, 4).toString('binary');

// read address and port
if (addrtype === 1) {
// ipv4
headerLength = 4 + 4 + 2;
if (data.length < headerLength) {
connection.end();
return;
}
remoteAddr = inetNtoa(4, data.subarray(4, 8));
addrToSend += data.subarray(4, 10).toString('binary');
remotePort = data.readUInt16BE(8);
} else if (addrtype === 4) {
// ipv6
headerLength = 4 + 16 + 2;
if (data.length < headerLength) {
connection.end();
return;
}
remoteAddr = inetNtoa(6, data.subarray(4, 20));
addrToSend += data.subarray(4, 22).toString('binary');
remotePort = data.readUInt16BE(20);
} else {
const addrLen = data[4];
headerLength = 5 + addrLen + 2;
if (data.length < headerLength) {
connection.end();
return;
}
remoteAddr = data.subarray(5, 5 + addrLen).toString('binary');
addrToSend += data.subarray(4, 5 + addrLen + 2).toString('binary');
remotePort = data.readUInt16BE(5 + addrLen);
}
let buf = Buffer.alloc(10);
buf.write('\u0005\u0000\u0000\u0001', 0, 4, 'binary');
buf.write('\u0000\u0000\u0000\u0000', 4, 4, 'binary');
buf.writeUInt16BE(remotePort, 8);
connection.write(buf);
// connect to remote server
// ws = new WebSocket aServer, protocol: "binary"

if (HTTPPROXY) {
// WebSocket endpoint for the proxy to connect to
const endpoint = aServer;
const parsed = new URL(endpoint);
//console.log('attempting to connect to WebSocket %j', endpoint);

// create an instance of the `HttpsProxyAgent` class with the proxy server information
const opts = new URL(HTTPPROXY);

// IMPORTANT! Set the `secureEndpoint` option to `false` when connecting
// over "ws://", but `true` when connecting over "wss://"
opts.secureEndpoint = parsed.protocol
? parsed.protocol == 'wss:'
: false;

const agent = new HttpsProxyAgent(opts);

ws = new WebSocket(aServer, {
protocol: 'binary',
agent,
});
} else {
ws = new WebSocket(aServer, {
protocol: 'binary',
});
}

ws.on('open', function () {
console.log(`connecting ${remoteAddr} via ${aServer}`);
const data = Buffer.concat([
Buffer.from(addrToSend, 'binary'),
...cachedPieces,
]);
cachedPieces = null;
ws.send(encryptor.encrypt(data), {
binary: true,
});
stage = 5;
});

ws.on('message', function (data, flags) {
connection.write(encryptor.decrypt(data));
});

ws.on('close', function () {
console.log('remote disconnected');
connection.destroy();
});

ws.on('error', function (e) {
console.log(`remote ${remoteAddr}:${remotePort} error: ${e}`);
connection.destroy();
server.getConnections(function (err, count) {
console.log('concurrent connections:', count);
});
});

if (data.length > headerLength) {
let buf = Buffer.alloc(data.length - headerLength);
data.copy(buf, 0, headerLength);
cachedPieces.push(buf);
}
stage = 4;
} else if (stage === 4) {
// remote server not connected
// cache received buffers
// make sure no data is lost
cachedPieces.push(data);
remoteAddr = inetNtoa(6, Buffer.from(data.subarray(4, 20)));
addrToSend += data.subarray(4, 22).toString('binary');
remotePort = new DataView(data.buffer).getUint16(20);
} else {
const addrLen = data[4];
headerLength = 5 + addrLen + 2;
if (data.length < headerLength) {
reader.cancel();
return;
}
});

connection.on('end', function () {
console.log('local disconnected');
ws?.terminate();

server.getConnections(function (err, count) {
console.log('concurrent connections:', count);
remoteAddr = new TextDecoder().decode(data.subarray(5, 5 + addrLen));
addrToSend += data.subarray(4, 5 + addrLen + 2).toString('binary');
remotePort = new DataView(data.buffer).getUint16(5 + addrLen);
}
let buf = Buffer.alloc(10);
buf.write('\u0005\u0000\u0000\u0001', 0, 4, 'binary');
buf.write('\u0000\u0000\u0000\u0000', 4, 4, 'binary');
buf.writeUInt16BE(remotePort, 8);
writer.write(buf);
// connect to remote server
// ws = new WebSocket aServer, protocol: "binary"

if (HTTPPROXY) {
// WebSocket endpoint for the proxy to connect to
const endpoint = aServer;
const parsed = new URL(endpoint);
//console.log('attempting to connect to WebSocket %j', endpoint);

// create an instance of the `HttpsProxyAgent` class with the proxy server information
const opts = new URL(HTTPPROXY);

// IMPORTANT! Set the `secureEndpoint` option to `false` when connecting
// over "ws://", but `true` when connecting over "wss://"
opts.secureEndpoint = parsed.protocol ? parsed.protocol == 'wss:' : false;

const agent = new HttpsProxyAgent(opts);

ws = new WebSocket(aServer, {
protocol: 'binary',
agent,
});
});

connection.on('error', function (e) {
console.log(`local error: ${e}`);
ws?.terminate();

server.getConnections(function (err, count) {
console.log('concurrent connections:', count);
} else {
ws = new WebSocket(aServer, {
protocol: 'binary',
});
});
}

connection.setTimeout(timeout, function () {
console.log('local timeout');
connection.destroy();
ws?.terminate();
});
reader.releaseLock();
writer.releaseLock();

const wss = Duplex.toWeb(createWebSocketStream(ws));
console.log(`connecting ${remoteAddr} via ${aServer}`);

conn.readable
.pipeThrough(
createTransformStream(
encryptor.encrypt.bind(encryptor),
data.subarray(3),
),
)
.pipeTo(wss.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`local: ${e}`));
wss.readable
.pipeThrough(createTransformStream(encryptor.decrypt.bind(encryptor)))
.pipeTo(conn.writable)
.catch((e) => e.name !== 'AbortError' && console.error(`local: ${e}`));
});

server.listen(PORT, LOCAL_ADDRESS, function () {
Expand Down
Loading

0 comments on commit 31c8998

Please sign in to comment.