diff --git a/package.json b/package.json index d7941e19..6cecad9d 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "@internxt/sdk": "1.7.0", "@oclif/core": "4.2.3", "@types/validator": "13.12.2", + "async": "3.2.6", "axios": "1.7.9", "bip39": "3.1.0", "body-parser": "1.20.3", @@ -70,6 +71,7 @@ "@internxt/prettier-config": "internxt/prettier-config#v1.0.2", "@oclif/test": "4.1.7", "@openpgp/web-stream-tools": "0.0.11-patch-1", + "@types/async": "3.2.24", "@types/cli-progress": "3.11.6", "@types/express": "5.0.0", "@types/mime-types": "2.1.4", diff --git a/src/commands/upload-file.ts b/src/commands/upload-file.ts index b63f86a4..feb85ae2 100644 --- a/src/commands/upload-file.ts +++ b/src/commands/upload-file.ts @@ -82,17 +82,38 @@ export default class UploadFile extends Command { linewrap: true, }); progressBar.start(100, 0); - const [uploadPromise, abortable] = await networkFacade.uploadFromStream( - user.bucket, - user.mnemonic, - stats.size, - fileStream, - { + + const minimumMultipartThreshold = 100 * 1024 * 1024; + const useMultipart = stats.size > minimumMultipartThreshold; + const partSize = 30 * 1024 * 1024; + const parts = Math.ceil(stats.size / partSize); + + let uploadOperation: Promise< + [ + Promise<{ + fileId: string; + hash: Buffer; + }>, + AbortController, + ] + >; + + if (useMultipart) { + uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, stats.size, fileStream, { + parts, progressCallback: (progress) => { progressBar.update(progress * 0.99); }, - }, - ); + }); + } else { + uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, stats.size, fileStream, { + progressCallback: (progress) => { + progressBar.update(progress * 0.99); + }, + }); + } + + const [uploadPromise, abortable] = await uploadOperation; process.on('SIGINT', () => { abortable.abort('SIGINT received'); diff --git a/src/services/crypto.service.ts b/src/services/crypto.service.ts index b43dd6f7..c71febbe 100644 --- a/src/services/crypto.service.ts +++ b/src/services/crypto.service.ts @@ -1,7 +1,7 @@ import { CryptoProvider } from '@internxt/sdk'; import { Keys, Password } from '@internxt/sdk/dist/auth'; import { createCipheriv, createDecipheriv, createHash, Decipher, pbkdf2Sync, randomBytes } from 'node:crypto'; -import { Transform } from 'node:stream'; +import { Readable, Transform } from 'node:stream'; import { KeysService } from './keys.service'; import { ConfigService } from '../services/config.service'; import { StreamUtils } from '../utils/stream.utils'; @@ -116,12 +116,26 @@ export class CryptoService { return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8'); }; - public async decryptStream( + public encryptStreamInParts = ( + readable: Readable, + cipher: Transform, + size: number, + parts: number, + ): Transform => { + // We include a marginChunkSize because if we split the chunk directly, there will always be one more chunk left, this will cause a mismatch with the urls provided + const marginChunkSize = 1024; + const chunkSize = size / parts + marginChunkSize; + const readableChunks = StreamUtils.streamReadableIntoChunks(readable, chunkSize); + + return readableChunks.pipe(cipher); + }; + + public decryptStream = ( inputSlices: ReadableStream[], key: Buffer, iv: Buffer, startOffsetByte?: number, - ) { + ) => { let decipher: Decipher; if (startOffsetByte) { const aesBlockSize = 16; @@ -164,7 +178,7 @@ export class CryptoService { }); return decryptedStream; - } + }; public getEncryptionTransform = (key: Buffer, iv: Buffer): Transform => { const cipher = createCipheriv('aes-256-ctr', key, iv); diff --git a/src/services/network/network-facade.service.ts b/src/services/network/network-facade.service.ts index 04b4bb10..b2f56277 100644 --- a/src/services/network/network-facade.service.ts +++ b/src/services/network/network-facade.service.ts @@ -6,17 +6,26 @@ import { DownloadFileFunction, EncryptFileFunction, UploadFileFunction, + UploadFileMultipartFunction, } from '@internxt/sdk/dist/network'; import { Environment } from '@internxt/inxt-js'; import { randomBytes } from 'node:crypto'; import { Readable, Transform } from 'node:stream'; -import { DownloadOptions, UploadOptions, UploadProgressCallback, DownloadProgressCallback } from '../../types/network.types'; +import { + DownloadOptions, + UploadOptions, + UploadProgressCallback, + DownloadProgressCallback, + UploadMultipartOptions, + UploadTask, +} from '../../types/network.types'; import { CryptoService } from '../crypto.service'; import { UploadService } from './upload.service'; import { DownloadService } from './download.service'; import { ValidationService } from '../validation.service'; import { HashStream } from '../../utils/hash.utils'; import { RangeOptions } from '../../utils/network.utils'; +import { queue, QueueObject } from 'async'; export class NetworkFacade { private readonly cryptoLib: Network.Crypto; @@ -73,7 +82,7 @@ export class NetworkFacade { if (rangeOptions) { startOffsetByte = rangeOptions.parsed.start; } - fileStream = await this.cryptoService.decryptStream( + fileStream = this.cryptoService.decryptStream( encryptedContentStreams, Buffer.from(key as ArrayBuffer), Buffer.from(iv as ArrayBuffer), @@ -183,4 +192,144 @@ export class NetworkFacade { return [uploadOperation(), abortable]; } + + /** + * Performs a multi-part upload encrypting the stream content + * + * @param bucketId The bucket where the file will be uploaded + * @param mnemonic The plain mnemonic of the user + * @param size The total size of the stream content + * @param from The source ReadStream to upload from + * @param options The upload options + * @returns A promise to execute the upload and an abort controller to cancel the upload + */ + async uploadMultipartFromStream( + bucketId: string, + mnemonic: string, + size: number, + from: Readable, + options: UploadMultipartOptions, + ): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> { + const hashStream = new HashStream(); + const abortable = options?.abortController ?? new AbortController(); + let encryptionTransform: Transform; + let hash: Buffer; + + const partsUploadedBytes: Record = {}; + type Part = { + PartNumber: number; + ETag: string; + }; + const fileParts: Part[] = []; + + const onProgress = (partId: number, loadedBytes: number) => { + if (!options?.progressCallback) return; + partsUploadedBytes[partId] = loadedBytes; + const currentTotalLoadedBytes = Object.values(partsUploadedBytes).reduce((a, p) => a + p, 0); + const reportedProgress = Math.round((currentTotalLoadedBytes / size) * 100); + options.progressCallback(reportedProgress); + }; + + const encryptFile: EncryptFileFunction = async (_, key, iv) => { + const encryptionCipher = this.cryptoService.getEncryptionTransform( + Buffer.from(key as ArrayBuffer), + Buffer.from(iv as ArrayBuffer), + ); + const streamInParts = this.cryptoService.encryptStreamInParts(from, encryptionCipher, size, options.parts); + encryptionTransform = streamInParts.pipe(hashStream); + }; + + const uploadFileMultipart: UploadFileMultipartFunction = async (urls: string[]) => { + let partIndex = 0; + const limitConcurrency = 6; + + const uploadPart = async (upload: UploadTask) => { + const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, { + abortController: abortable, + progressCallback: (loadedBytes: number) => { + onProgress(upload.index, loadedBytes); + }, + }); + + fileParts.push({ + ETag: etag, + PartNumber: upload.index + 1, + }); + }; + + const uploadQueue: QueueObject = queue(function (task, callback) { + uploadPart(task) + .then(() => { + callback(); + }) + .catch((e) => { + callback(e); + }); + }, limitConcurrency); + + for await (const chunk of encryptionTransform) { + const part: Buffer = chunk; + + if (uploadQueue.running() === limitConcurrency) { + await uploadQueue.unsaturated(); + } + + if (abortable.signal.aborted) { + throw new Error('Upload cancelled by user'); + } + + let errorAlreadyThrown = false; + + uploadQueue + .pushAsync({ + contentToUpload: part, + urlToUpload: urls[partIndex], + index: partIndex++, + }) + .catch((err: Error) => { + if (errorAlreadyThrown) return; + + errorAlreadyThrown = true; + if (err) { + uploadQueue.kill(); + if (!abortable?.signal.aborted) { + abortable.abort(); + } + } + }); + } + + while (uploadQueue.running() > 0 || uploadQueue.length() > 0) { + await uploadQueue.drain(); + } + + hash = hashStream.getHash(); + const compareParts = (pA: Part, pB: Part) => pA.PartNumber - pB.PartNumber; + const sortedParts = fileParts.sort(compareParts); + return { + hash: hash.toString('hex'), + parts: sortedParts, + }; + }; + + const uploadOperation = async () => { + const uploadResult = await NetworkUpload.uploadMultipartFile( + this.network, + this.cryptoLib, + bucketId, + mnemonic, + size, + encryptFile, + uploadFileMultipart, + options.parts, + ); + + return { + fileId: uploadResult, + hash: hash, + }; + }; + + return [uploadOperation(), abortable]; + } } diff --git a/src/services/network/upload.service.ts b/src/services/network/upload.service.ts index f11a2a4e..c1ce0fb1 100644 --- a/src/services/network/upload.service.ts +++ b/src/services/network/upload.service.ts @@ -5,7 +5,7 @@ import { UploadOptions } from '../../types/network.types'; export class UploadService { public static readonly instance: UploadService = new UploadService(); - async uploadFile(url: string, from: Readable, options: UploadOptions): Promise<{ etag: string }> { + async uploadFile(url: string, from: Readable | Buffer, options: UploadOptions): Promise<{ etag: string }> { const response = await axios.put(url, from, { signal: options.abortController?.signal, onUploadProgress: (progressEvent) => { diff --git a/src/types/network.types.ts b/src/types/network.types.ts index 800fe09e..72c3869d 100644 --- a/src/types/network.types.ts +++ b/src/types/network.types.ts @@ -17,3 +17,13 @@ export interface SelfsignedCert { cert: string | Buffer; key: string | Buffer; } + +export interface UploadTask { + contentToUpload: Buffer; + urlToUpload: string; + index: number; +} + +export interface UploadMultipartOptions extends UploadOptions { + parts: number; +} diff --git a/src/utils/stream.utils.ts b/src/utils/stream.utils.ts index 2075a455..769d49ed 100644 --- a/src/utils/stream.utils.ts +++ b/src/utils/stream.utils.ts @@ -1,5 +1,5 @@ import { ReadStream, WriteStream } from 'node:fs'; -import { Transform, TransformCallback } from 'node:stream'; +import { Readable, Transform, TransformCallback } from 'node:stream'; export class StreamUtils { static readStreamToReadableStream(readStream: ReadStream): ReadableStream { @@ -64,6 +64,48 @@ export class StreamUtils { return stream; } + + /** + * Given a readable stream, it enqueues its parts into chunks as it is being read + * @param readable Readable stream + * @param chunkSize The chunkSize in bytes that we want each chunk to be + * @returns A readable stream whose output is chunks of size chunkSize + */ + static streamReadableIntoChunks(readable: Readable, chunkSize: number): Readable { + let buffer = Buffer.alloc(0); + + const mergeBuffers = (buffer1: Buffer, buffer2: Buffer): Buffer => { + return Buffer.concat([buffer1, buffer2]); + }; + + const outputStream = new Readable({ + read() { + // noop + }, + }); + + readable.on('data', (chunk: Buffer) => { + buffer = mergeBuffers(buffer, chunk); + + while (buffer.length >= chunkSize) { + outputStream.push(buffer.subarray(0, chunkSize)); + buffer = buffer.subarray(chunkSize); + } + }); + + readable.on('end', () => { + if (buffer.length > 0) { + outputStream.push(buffer); + } + outputStream.push(null); // Signal the end of the stream + }); + + readable.on('error', (err) => { + outputStream.destroy(err); + }); + + return outputStream; + } } export class ProgressTransform extends Transform { diff --git a/src/webdav/handlers/PUT.handler.ts b/src/webdav/handlers/PUT.handler.ts index 3e9ae149..8e5d43aa 100644 --- a/src/webdav/handlers/PUT.handler.ts +++ b/src/webdav/handlers/PUT.handler.ts @@ -11,6 +11,7 @@ import { DriveFileItem, DriveFolderItem } from '../../types/drive.types'; import { DriveFolderService } from '../../services/drive/drive-folder.service'; import { TrashService } from '../../services/drive/trash.service'; import { EncryptionVersion } from '@internxt/sdk/dist/drive/storage/types'; +import { CLIUtils } from '../../utils/cli.utils'; export class PUTRequestHandler implements WebDavMethodHandler { constructor( @@ -68,19 +69,50 @@ export class PUTRequestHandler implements WebDavMethodHandler { const { user } = await authService.getAuthDetails(); - let lastLoggedProgress = 0; - const [uploadPromise] = await networkFacade.uploadFromStream(user.bucket, user.mnemonic, contentLength, req, { - progressCallback: (progress) => { - const percentage = Math.floor(100 * progress); + const timer = CLIUtils.timer(); + + const minimumMultipartThreshold = 100 * 1024 * 1024; + const useMultipart = contentLength > minimumMultipartThreshold; + const partSize = 30 * 1024 * 1024; + const parts = Math.ceil(contentLength / partSize); + + let uploadOperation: Promise< + [ + Promise<{ + fileId: string; + hash: Buffer; + }>, + AbortController, + ] + >; + + const progressCallback = (progress: number) => { + webdavLogger.info(`[PUT] Upload progress for file ${resource.name}: ${progress}%`); + }; + + if (useMultipart) { + uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, contentLength, req, { + parts, + progressCallback, + }); + } else { + uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, contentLength, req, { + progressCallback, + }); + } + + const [uploadPromise, abortable] = await uploadOperation; - if (percentage >= lastLoggedProgress + 1) { - lastLoggedProgress = percentage; - webdavLogger.info(`[PUT] Upload progress for file ${resource.name}: ${percentage}%`); - } - }, + let uploaded = false; + res.on('close', () => { + if (!uploaded) { + webdavLogger.info('[PUT] ❌ HTTP Client has been disconnected, res has been closed.'); + abortable.abort('HTTP Client has been disconnected.'); + } }); const uploadResult = await uploadPromise; + uploaded = true; webdavLogger.info('[PUT] ✅ File uploaded to network'); @@ -95,7 +127,8 @@ export class PUTRequestHandler implements WebDavMethodHandler { name: '', }); - webdavLogger.info('[PUT] ✅ File uploaded to internxt drive'); + const uploadTime = timer.stop(); + webdavLogger.info(`[PUT] ✅ File uploaded in ${uploadTime}ms to Internxt Drive`); await driveDatabaseManager.createFile(file, resource.path.dir + '/'); diff --git a/src/webdav/index.ts b/src/webdav/index.ts index 28314bc6..67143a30 100644 --- a/src/webdav/index.ts +++ b/src/webdav/index.ts @@ -47,4 +47,8 @@ const init = async () => { .catch((err) => webdavLogger.error('Failed to start WebDAV server', err)); }; +process.on('uncaughtException', (err) => { + webdavLogger.error('Unhandled exception:', err); +}); + init(); diff --git a/test/services/network/network-facade.service.test.ts b/test/services/network/network-facade.service.test.ts index 69fad07c..9fe4ed2c 100644 --- a/test/services/network/network-facade.service.test.ts +++ b/test/services/network/network-facade.service.test.ts @@ -12,6 +12,7 @@ import axios from 'axios'; import { fail } from 'node:assert'; import crypto from 'node:crypto'; import { HashStream } from '../../../src/utils/hash.utils'; +import { UploadMultipartOptions } from '../../../src/types/network.types'; describe('Network Facade Service', () => { beforeEach(() => { @@ -304,4 +305,76 @@ describe('Network Facade Service', () => { expect(options.progressCallback).toHaveBeenCalledWith(100); }); + + it('When a file is uploaded via multipart, then it should report progress', async () => { + const bucket = 'f1858bc9675f9e4f7ab29429'; + const networkMock = getNetworkMock(); + + const sut = new NetworkFacade( + networkMock, + UploadService.instance, + DownloadService.instance, + CryptoService.instance, + ); + const file = crypto.randomBytes(16).toString('hex'); + const readStream = new Readable({ + read() { + this.push(file); + this.push(null); + }, + }); + const options: UploadMultipartOptions = { + progressCallback: vi.fn(), + abortController: new AbortController(), + parts: 2, + }; + + vi.spyOn(HashStream.prototype, 'getHash').mockImplementation(() => Buffer.from('')); + + vi.spyOn(axios, 'put').mockImplementation((_, __, config) => { + config?.onUploadProgress?.({ + loaded: file.length, + total: file.length, + bytes: file.length, + lengthComputable: true, + }); + return Promise.resolve({ + data: readStream, + headers: { + etag: 'any-etag', + }, + }); + }); + + vi.spyOn(networkMock, 'startUpload').mockResolvedValue({ + uploads: [ + { + index: 0, + url: 'any-url', + uuid: 'any-uuid', + UploadId: 'any-UploadId', + urls: ['url_1', 'url_2'], + }, + ], + }); + + vi.spyOn(networkMock, 'finishUpload') + // @ts-expect-error - We only mock the properties we need + .mockResolvedValue({ + id: 'uploaded_file_id', + }); + + const [executeUpload] = await sut.uploadMultipartFromStream( + bucket, + 'animal fog wink trade december thumb sight cousin crunch plunge captain enforce letter creek text', + file.length, + readStream, + options, + ); + + const uploadResult = await executeUpload; + + expect(uploadResult.fileId).to.be.equal('uploaded_file_id'); + expect(options.progressCallback).toHaveBeenCalledWith(100); + }); }); diff --git a/test/webdav/handlers/PUT.handler.test.ts b/test/webdav/handlers/PUT.handler.test.ts index e2791c82..8a201157 100644 --- a/test/webdav/handlers/PUT.handler.test.ts +++ b/test/webdav/handlers/PUT.handler.test.ts @@ -207,4 +207,76 @@ describe('PUT request handler', () => { expect(deleteDBFileStub).toHaveBeenCalledOnce(); expect(deleteDriveFileStub).toHaveBeenCalledOnce(); }); + + it('When the Drive destination folder is found, then it should upload the multipart file', async () => { + const driveDatabaseManager = getDriveDatabaseManager(); + const downloadService = DownloadService.instance; + const uploadService = UploadService.instance; + const cryptoService = CryptoService.instance; + const authService = AuthService.instance; + const networkFacade = new NetworkFacade(getNetworkMock(), uploadService, downloadService, cryptoService); + const sut = new PUTRequestHandler({ + driveFileService: DriveFileService.instance, + driveFolderService: DriveFolderService.instance, + authService: AuthService.instance, + trashService: TrashService.instance, + networkFacade, + driveDatabaseManager, + }); + + const multipartFileSize = 105 * 1024 * 1024; + + const requestedFileResource: WebDavRequestedResource = getRequestedFileResource(); + const requestedParentFolderResource: WebDavRequestedResource = getRequestedFolderResource({ + parentFolder: '/', + folderName: '', + }); + const folderFixture = newFolderItem({ name: requestedParentFolderResource.name }); + const fileFixture = newDriveFile({ + folderId: folderFixture.id, + folderUuid: folderFixture.uuid, + size: multipartFileSize, + }); + + const request = createWebDavRequestFixture({ + method: 'PUT', + url: requestedFileResource.url, + headers: { + 'content-length': multipartFileSize.toString(), + }, + }); + + const response = createWebDavResponseFixture({ + status: vi.fn().mockReturnValue({ send: vi.fn() }), + }); + + const getRequestedResourceStub = vi + .spyOn(WebDavUtils, 'getRequestedResource') + .mockResolvedValueOnce(requestedFileResource) + .mockResolvedValueOnce(requestedParentFolderResource); + const getAndSearchItemFromResourceStub = vi + .spyOn(WebDavUtils, 'getAndSearchItemFromResource') + .mockResolvedValueOnce(folderFixture) + .mockRejectedValue(new Error()); + const getAuthDetailsStub = vi.spyOn(authService, 'getAuthDetails').mockResolvedValue(UserCredentialsFixture); + const uploadMultipartFromStreamStub = vi + .spyOn(networkFacade, 'uploadMultipartFromStream') + .mockResolvedValue([ + Promise.resolve({ fileId: '09218313209', hash: Buffer.from('test') }), + new AbortController(), + ]); + const createDriveFileStub = vi + .spyOn(DriveFileService.instance, 'createFile') + .mockResolvedValue(fileFixture.toItem()); + const createDBFileStub = vi.spyOn(driveDatabaseManager, 'createFile').mockResolvedValue(fileFixture); + + await sut.handle(request, response); + expect(response.status).toHaveBeenCalledWith(200); + expect(getRequestedResourceStub).toHaveBeenCalledTimes(2); + expect(getAndSearchItemFromResourceStub).toHaveBeenCalledTimes(2); + expect(getAuthDetailsStub).toHaveBeenCalledOnce(); + expect(uploadMultipartFromStreamStub).toHaveBeenCalledOnce(); + expect(createDriveFileStub).toHaveBeenCalledOnce(); + expect(createDBFileStub).toHaveBeenCalledOnce(); + }); }); diff --git a/yarn.lock b/yarn.lock index 3ea1aeea..82141fcf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2192,6 +2192,11 @@ resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9" integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA== +"@types/async@3.2.24": + version "3.2.24" + resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.24.tgz#3a96351047575bbcf2340541b2d955a35339608f" + integrity sha512-8iHVLHsCCOBKjCF2KwFe0p9Z3rfM9mL+sSP8btyR5vTjJRAqpBYD28/ZLgXPf0pjG1VxOvtCV/BgXkQbpSe8Hw== + "@types/body-parser@*": version "1.19.5" resolved "https://registry.yarnpkg.com/@types/body-parser/-/body-parser-1.19.5.tgz#04ce9a3b677dc8bd681a17da1ab9835dc9d3ede4" @@ -2761,6 +2766,11 @@ async-retry@^1.3.3: dependencies: retry "0.13.1" +async@3.2.6, async@^3.2.0, async@^3.2.3, async@~3.2.0: + version "3.2.6" + resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" + integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== + async@^2.6.3, async@~2.6.1: version "2.6.4" resolved "https://registry.yarnpkg.com/async/-/async-2.6.4.tgz#706b7ff6084664cd7eae713f6f965433b5504221" @@ -2768,11 +2778,6 @@ async@^2.6.3, async@~2.6.1: dependencies: lodash "^4.17.14" -async@^3.2.0, async@^3.2.3, async@~3.2.0: - version "3.2.6" - resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" - integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== - asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"