Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
eea4e99
chore: add async and @types/async dependencies to package.json
larryrider Jan 13, 2025
59f278b
feat: add streamReadableIntoChunks method to StreamUtils for chunking…
larryrider Jan 13, 2025
e6e046d
feat: implement encryptStreamInParts method for chunked encryption of…
larryrider Jan 13, 2025
dc5e27d
feat: update uploadFile method to accept Buffer in addition to Readab…
larryrider Jan 13, 2025
0219dd7
feat: add uploadMultipartFromStream method for multi-part encrypted u…
larryrider Jan 14, 2025
c735f54
feat: implement multipart upload based on file size
larryrider Jan 14, 2025
0a89592
feat: add uncaughtException handler to log errors in WebDAV server
larryrider Jan 14, 2025
1a80eeb
feat: enhance PUT request handler to support multipart uploads
larryrider Jan 14, 2025
b2e3264
test: add progress reporting for file uploads in network facade
larryrider Jan 14, 2025
3599d38
Merge branch 'feat/improve-progress-reporting' into feat/pb-3405-add-…
larryrider Jan 14, 2025
74a0d49
test: add multipart file uploads in network facade service
larryrider Jan 14, 2025
eb5eb59
refactor: fix maintainability issue
larryrider Jan 14, 2025
d0241de
test: add multipart file upload handling in PUT request handler
larryrider Jan 14, 2025
141b10b
refactor: fix maintainability issue
larryrider Jan 14, 2025
0dc9d4a
refactor: rename worker function to uploadPart for clarity
larryrider Jan 14, 2025
0e4e4e5
Merge branch 'main' into feat/pb-3405-add-multipart-upload
larryrider Jan 15, 2025
0f129a1
Merge branch 'main' into feat/pb-3405-add-multipart-upload
larryrider Jan 15, 2025
8d8575a
chore(deps): pin async and @types/async to specific versions
larryrider Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@internxt/sdk": "1.7.0",
"@oclif/core": "4.2.3",
"@types/validator": "13.12.2",
"async": "3.2.6",
"axios": "1.7.9",
"bip39": "3.1.0",
"body-parser": "1.20.3",
Expand Down Expand Up @@ -70,6 +71,7 @@
"@internxt/prettier-config": "internxt/prettier-config#v1.0.2",
"@oclif/test": "4.1.7",
"@openpgp/web-stream-tools": "0.0.11-patch-1",
"@types/async": "3.2.24",
"@types/cli-progress": "3.11.6",
"@types/express": "5.0.0",
"@types/mime-types": "2.1.4",
Expand Down
37 changes: 29 additions & 8 deletions src/commands/upload-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,38 @@ export default class UploadFile extends Command {
linewrap: true,
});
progressBar.start(100, 0);
const [uploadPromise, abortable] = await networkFacade.uploadFromStream(
user.bucket,
user.mnemonic,
stats.size,
fileStream,
{

const minimumMultipartThreshold = 100 * 1024 * 1024;
const useMultipart = stats.size > minimumMultipartThreshold;
const partSize = 30 * 1024 * 1024;
const parts = Math.ceil(stats.size / partSize);

let uploadOperation: Promise<
[
Promise<{
fileId: string;
hash: Buffer;
}>,
AbortController,
]
>;

if (useMultipart) {
uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, stats.size, fileStream, {
parts,
progressCallback: (progress) => {
progressBar.update(progress * 0.99);
},
},
);
});
} else {
uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, stats.size, fileStream, {
progressCallback: (progress) => {
progressBar.update(progress * 0.99);
},
});
}

const [uploadPromise, abortable] = await uploadOperation;

process.on('SIGINT', () => {
abortable.abort('SIGINT received');
Expand Down
22 changes: 18 additions & 4 deletions src/services/crypto.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { CryptoProvider } from '@internxt/sdk';
import { Keys, Password } from '@internxt/sdk/dist/auth';
import { createCipheriv, createDecipheriv, createHash, Decipher, pbkdf2Sync, randomBytes } from 'node:crypto';
import { Transform } from 'node:stream';
import { Readable, Transform } from 'node:stream';
import { KeysService } from './keys.service';
import { ConfigService } from '../services/config.service';
import { StreamUtils } from '../utils/stream.utils';
Expand Down Expand Up @@ -116,12 +116,26 @@ export class CryptoService {
return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8');
};

public async decryptStream(
public encryptStreamInParts = (
readable: Readable,
cipher: Transform,
size: number,
parts: number,
): Transform => {
// We include a marginChunkSize because if we split the chunk directly, there will always be one more chunk left, this will cause a mismatch with the urls provided
const marginChunkSize = 1024;
const chunkSize = size / parts + marginChunkSize;
const readableChunks = StreamUtils.streamReadableIntoChunks(readable, chunkSize);

return readableChunks.pipe(cipher);
};

public decryptStream = (
inputSlices: ReadableStream<Uint8Array>[],
key: Buffer,
iv: Buffer,
startOffsetByte?: number,
) {
) => {
let decipher: Decipher;
if (startOffsetByte) {
const aesBlockSize = 16;
Expand Down Expand Up @@ -164,7 +178,7 @@ export class CryptoService {
});

return decryptedStream;
}
};

public getEncryptionTransform = (key: Buffer, iv: Buffer): Transform => {
const cipher = createCipheriv('aes-256-ctr', key, iv);
Expand Down
153 changes: 151 additions & 2 deletions src/services/network/network-facade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ import {
DownloadFileFunction,
EncryptFileFunction,
UploadFileFunction,
UploadFileMultipartFunction,
} from '@internxt/sdk/dist/network';
import { Environment } from '@internxt/inxt-js';
import { randomBytes } from 'node:crypto';
import { Readable, Transform } from 'node:stream';
import { DownloadOptions, UploadOptions, UploadProgressCallback, DownloadProgressCallback } from '../../types/network.types';
import {
DownloadOptions,
UploadOptions,
UploadProgressCallback,
DownloadProgressCallback,
UploadMultipartOptions,
UploadTask,
} from '../../types/network.types';
import { CryptoService } from '../crypto.service';
import { UploadService } from './upload.service';
import { DownloadService } from './download.service';
import { ValidationService } from '../validation.service';
import { HashStream } from '../../utils/hash.utils';
import { RangeOptions } from '../../utils/network.utils';
import { queue, QueueObject } from 'async';

export class NetworkFacade {
private readonly cryptoLib: Network.Crypto;
Expand Down Expand Up @@ -73,7 +82,7 @@ export class NetworkFacade {
if (rangeOptions) {
startOffsetByte = rangeOptions.parsed.start;
}
fileStream = await this.cryptoService.decryptStream(
fileStream = this.cryptoService.decryptStream(
encryptedContentStreams,
Buffer.from(key as ArrayBuffer),
Buffer.from(iv as ArrayBuffer),
Expand Down Expand Up @@ -183,4 +192,144 @@ export class NetworkFacade {

return [uploadOperation(), abortable];
}

/**
* Performs a multi-part upload encrypting the stream content
*
* @param bucketId The bucket where the file will be uploaded
* @param mnemonic The plain mnemonic of the user
* @param size The total size of the stream content
* @param from The source ReadStream to upload from
* @param options The upload options
* @returns A promise to execute the upload and an abort controller to cancel the upload
*/
async uploadMultipartFromStream(
bucketId: string,
mnemonic: string,
size: number,
from: Readable,
options: UploadMultipartOptions,
): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> {
const hashStream = new HashStream();
const abortable = options?.abortController ?? new AbortController();
let encryptionTransform: Transform;
let hash: Buffer;

const partsUploadedBytes: Record<number, number> = {};
type Part = {
PartNumber: number;
ETag: string;
};
const fileParts: Part[] = [];

const onProgress = (partId: number, loadedBytes: number) => {
if (!options?.progressCallback) return;
partsUploadedBytes[partId] = loadedBytes;
const currentTotalLoadedBytes = Object.values(partsUploadedBytes).reduce((a, p) => a + p, 0);
const reportedProgress = Math.round((currentTotalLoadedBytes / size) * 100);
options.progressCallback(reportedProgress);
};

const encryptFile: EncryptFileFunction = async (_, key, iv) => {
const encryptionCipher = this.cryptoService.getEncryptionTransform(
Buffer.from(key as ArrayBuffer),
Buffer.from(iv as ArrayBuffer),
);
const streamInParts = this.cryptoService.encryptStreamInParts(from, encryptionCipher, size, options.parts);
encryptionTransform = streamInParts.pipe(hashStream);
};

const uploadFileMultipart: UploadFileMultipartFunction = async (urls: string[]) => {
let partIndex = 0;
const limitConcurrency = 6;

const uploadPart = async (upload: UploadTask) => {
const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, {
abortController: abortable,
progressCallback: (loadedBytes: number) => {
onProgress(upload.index, loadedBytes);
},
});

fileParts.push({
ETag: etag,
PartNumber: upload.index + 1,
});
};

const uploadQueue: QueueObject<UploadTask> = queue<UploadTask>(function (task, callback) {
uploadPart(task)
.then(() => {
callback();
})
.catch((e) => {
callback(e);
});
}, limitConcurrency);

for await (const chunk of encryptionTransform) {
const part: Buffer = chunk;

if (uploadQueue.running() === limitConcurrency) {
await uploadQueue.unsaturated();
}

if (abortable.signal.aborted) {
throw new Error('Upload cancelled by user');
}

let errorAlreadyThrown = false;

uploadQueue
.pushAsync({
contentToUpload: part,
urlToUpload: urls[partIndex],
index: partIndex++,
})
.catch((err: Error) => {
if (errorAlreadyThrown) return;

errorAlreadyThrown = true;
if (err) {
uploadQueue.kill();
if (!abortable?.signal.aborted) {
abortable.abort();
}
}
});
}

while (uploadQueue.running() > 0 || uploadQueue.length() > 0) {
await uploadQueue.drain();
}

hash = hashStream.getHash();
const compareParts = (pA: Part, pB: Part) => pA.PartNumber - pB.PartNumber;
const sortedParts = fileParts.sort(compareParts);
return {
hash: hash.toString('hex'),
parts: sortedParts,
};
};

const uploadOperation = async () => {
const uploadResult = await NetworkUpload.uploadMultipartFile(
this.network,
this.cryptoLib,
bucketId,
mnemonic,
size,
encryptFile,
uploadFileMultipart,
options.parts,
);

return {
fileId: uploadResult,
hash: hash,
};
};

return [uploadOperation(), abortable];
}
}
2 changes: 1 addition & 1 deletion src/services/network/upload.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { UploadOptions } from '../../types/network.types';
export class UploadService {
public static readonly instance: UploadService = new UploadService();

async uploadFile(url: string, from: Readable, options: UploadOptions): Promise<{ etag: string }> {
async uploadFile(url: string, from: Readable | Buffer, options: UploadOptions): Promise<{ etag: string }> {
const response = await axios.put(url, from, {
signal: options.abortController?.signal,
onUploadProgress: (progressEvent) => {
Expand Down
10 changes: 10 additions & 0 deletions src/types/network.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ export interface SelfsignedCert {
cert: string | Buffer;
key: string | Buffer;
}

export interface UploadTask {
contentToUpload: Buffer;
urlToUpload: string;
index: number;
}

export interface UploadMultipartOptions extends UploadOptions {
parts: number;
}
44 changes: 43 additions & 1 deletion src/utils/stream.utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ReadStream, WriteStream } from 'node:fs';
import { Transform, TransformCallback } from 'node:stream';
import { Readable, Transform, TransformCallback } from 'node:stream';

export class StreamUtils {
static readStreamToReadableStream(readStream: ReadStream): ReadableStream<Uint8Array> {
Expand Down Expand Up @@ -64,6 +64,48 @@ export class StreamUtils {

return stream;
}

/**
* Given a readable stream, it enqueues its parts into chunks as it is being read
* @param readable Readable stream
* @param chunkSize The chunkSize in bytes that we want each chunk to be
* @returns A readable stream whose output is chunks of size chunkSize
*/
static streamReadableIntoChunks(readable: Readable, chunkSize: number): Readable {
let buffer = Buffer.alloc(0);

const mergeBuffers = (buffer1: Buffer, buffer2: Buffer): Buffer => {
return Buffer.concat([buffer1, buffer2]);
};

const outputStream = new Readable({
read() {
// noop
},
});

readable.on('data', (chunk: Buffer) => {
buffer = mergeBuffers(buffer, chunk);

while (buffer.length >= chunkSize) {
outputStream.push(buffer.subarray(0, chunkSize));
buffer = buffer.subarray(chunkSize);
}
});

readable.on('end', () => {
if (buffer.length > 0) {
outputStream.push(buffer);
}
outputStream.push(null); // Signal the end of the stream
});

readable.on('error', (err) => {
outputStream.destroy(err);
});

return outputStream;
}
}

export class ProgressTransform extends Transform {
Expand Down
Loading
Loading