diff --git a/src/services/network/upload/upload-file.service.ts b/src/services/network/upload/upload-file.service.ts new file mode 100644 index 0000000..b0e8acc --- /dev/null +++ b/src/services/network/upload/upload-file.service.ts @@ -0,0 +1,146 @@ +import { logger } from '../../../utils/logger.utils'; +import { + DELAYS_MS, + MAX_CONCURRENT_UPLOADS, + MAX_RETRIES, + UploadFilesInBatchesParams, + UploadFileWithRetryParams, +} from './upload.types'; +import { DriveFileService } from '../../drive/drive-file.service'; +import { dirname, extname } from 'node:path'; +import { isAlreadyExistsError } from '../../../utils/errors.utils'; +import { stat } from 'node:fs/promises'; +import { EncryptionVersion } from '@internxt/sdk/dist/drive/storage/types'; +import { createFileStreamWithBuffer, tryUploadThumbnail } from '../../../utils/thumbnail.utils'; + +export class UploadFileService { + static readonly instance = new UploadFileService(); + + async uploadFilesInChunks({ + network, + filesToUpload, + folderMap, + bucket, + destinationFolderUuid, + currentProgress, + emitProgress, + }: UploadFilesInBatchesParams): Promise { + let bytesUploaded = 0; + + const chunks = this.chunkArray(filesToUpload, MAX_CONCURRENT_UPLOADS); + + for (const chunk of chunks) { + await Promise.allSettled( + chunk.map(async (file) => { + const parentPath = dirname(file.relativePath); + const parentFolderUuid = + parentPath === '.' || parentPath === '' ? destinationFolderUuid : folderMap.get(parentPath); + + if (!parentFolderUuid) { + logger.warn(`Parent folder not found for ${file.relativePath}, skipping...`); + return null; + } + const createdFileUuid = await this.uploadFileWithRetry({ + file, + network, + bucket, + parentFolderUuid, + }); + if (createdFileUuid) { + bytesUploaded += file.size; + currentProgress.bytesUploaded += file.size; + currentProgress.itemsUploaded++; + } + emitProgress(); + }), + ); + } + return bytesUploaded; + } + + async uploadFileWithRetry({ + file, + network, + bucket, + parentFolderUuid, + }: UploadFileWithRetryParams): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const stats = await stat(file.absolutePath); + if (!stats.size) { + logger.warn(`Skipping empty file: ${file.relativePath}`); + return null; + } + + const fileType = extname(file.absolutePath).replaceAll('.', ''); + const { fileStream, bufferStream } = createFileStreamWithBuffer({ + path: file.absolutePath, + fileType, + }); + + const fileId = await new Promise((resolve, reject) => { + network.uploadFile( + fileStream, + stats.size, + bucket, + (err: Error | null, res: string | null) => { + if (err) { + return reject(err); + } + resolve(res as string); + }, + () => {}, + ); + }); + + const createdDriveFile = await DriveFileService.instance.createFile({ + plainName: file.name, + type: fileType, + size: stats.size, + folderUuid: parentFolderUuid, + fileId, + bucket, + encryptVersion: EncryptionVersion.Aes03, + creationTime: stats.birthtime?.toISOString(), + modificationTime: stats.mtime?.toISOString(), + }); + + if (bufferStream) { + void tryUploadThumbnail({ + bufferStream, + fileType, + userBucket: bucket, + fileUuid: createdDriveFile.uuid, + networkFacade: network, + }); + } + + return createdDriveFile.fileId; + } catch (error: unknown) { + if (isAlreadyExistsError(error)) { + const msg = `File ${file.name} already exists, skipping...`; + logger.info(msg); + return null; + } + + if (attempt < MAX_RETRIES) { + const delay = DELAYS_MS[attempt]; + const retryMsg = `Failed to upload file ${file.name}, retrying in ${delay}ms...`; + logger.warn(`${retryMsg} (attempt ${attempt + 1}/${MAX_RETRIES + 1})`); + await new Promise((resolve) => setTimeout(resolve, delay)); + } else { + logger.error(`Failed to upload file ${file.name} after ${MAX_RETRIES + 1} attempts`); + return null; + } + } + } + return null; + } + private chunkArray(array: T[], chunkSize: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)); + } + return chunks; + } +} diff --git a/src/utils/thumbnail.utils.ts b/src/utils/thumbnail.utils.ts index 0684826..1ae7529 100644 --- a/src/utils/thumbnail.utils.ts +++ b/src/utils/thumbnail.utils.ts @@ -1,3 +1,10 @@ +import { Readable } from 'node:stream'; +import { NetworkFacade } from '../services/network/network-facade.service'; +import { ThumbnailService } from '../services/thumbnail.service'; +import { ErrorUtils } from './errors.utils'; +import { BufferStream } from './stream.utils'; +import { createReadStream } from 'node:fs'; + export const ThumbnailConfig = { MaxWidth: 300, MaxHeight: 300, @@ -41,3 +48,47 @@ export const isPDFThumbnailable = (fileType: string) => { export const isImageThumbnailable = (fileType: string) => { return fileType.trim().length > 0 && thumbnailableImageExtension.includes(fileType.trim().toLowerCase()); }; + +export const tryUploadThumbnail = async ({ + bufferStream, + fileType, + userBucket, + fileUuid, + networkFacade, +}: { + bufferStream: BufferStream; + fileType: string; + userBucket: string; + fileUuid: string; + networkFacade: NetworkFacade; +}) => { + try { + const thumbnailBuffer = bufferStream.getBuffer(); + if (thumbnailBuffer) { + await ThumbnailService.instance.uploadThumbnail(thumbnailBuffer, fileType, userBucket, fileUuid, networkFacade); + } + } catch (error) { + ErrorUtils.report(error); + } +}; + +export const createFileStreamWithBuffer = ({ + path, + fileType, +}: { + path: string; + fileType: string; +}): { + bufferStream?: BufferStream; + fileStream: Readable; +} => { + const readable: Readable = createReadStream(path); + if (isFileThumbnailable(fileType)) { + const bufferStream = new BufferStream(); + return { + bufferStream, + fileStream: readable.pipe(bufferStream), + }; + } + return { fileStream: readable }; +}; diff --git a/test/services/network/upload/upload-file.service.test.ts b/test/services/network/upload/upload-file.service.test.ts new file mode 100644 index 0000000..6cf58e3 --- /dev/null +++ b/test/services/network/upload/upload-file.service.test.ts @@ -0,0 +1,401 @@ +import { beforeEach, describe, it, vi, expect } from 'vitest'; +import { UploadFileService } from '../../../../src/services/network/upload/upload-file.service'; +import { NetworkFacade } from '../../../../src/services/network/network-facade.service'; +import { DriveFileService } from '../../../../src/services/drive/drive-file.service'; +import { logger } from '../../../../src/utils/logger.utils'; +import { isAlreadyExistsError } from '../../../../src/utils/errors.utils'; +import { stat } from 'fs/promises'; +import { createReadStream } from 'fs'; +import { + createFileStreamWithBuffer, + isFileThumbnailable, + tryUploadThumbnail, +} from '../../../../src/utils/thumbnail.utils'; +import { + createFileSystemNodeFixture, + createMockReadStream, + createMockStats, + createProgressFixtures, +} from './upload.service.helpers'; + +vi.mock('fs', () => ({ + createReadStream: vi.fn(), +})); + +vi.mock('fs/promises', () => ({ + stat: vi.fn(), +})); + +vi.mock('../../../../src/services/drive/drive-file.service', () => ({ + DriveFileService: { + instance: { + createFile: vi.fn(), + }, + }, +})); + +vi.mock('../../../../src/utils/thumbnail.utils', () => ({ + isFileThumbnailable: vi.fn(), + tryUploadThumbnail: vi.fn(), + createFileStreamWithBuffer: vi.fn(), +})); + +vi.mock('../../../../src/utils/stream.utils', () => ({ + StreamUtils: { + createFileStreamWithBuffer: vi.fn(), + }, +})); + +vi.mock('../../../../src/utils/logger.utils', () => ({ + logger: { + warn: vi.fn(), + info: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock('../../../../src/utils/errors.utils', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isAlreadyExistsError: vi.fn(), + }; +}); + +describe('UploadFileService', () => { + let sut: UploadFileService; + + const mockNetworkFacade = { + uploadFile: vi.fn((_stream, _size, _bucket, callback) => { + callback(null, 'mock-uploaded-file-id'); + return { stop: vi.fn() }; + }), + } as unknown as NetworkFacade; + + beforeEach(() => { + vi.clearAllMocks(); + sut = UploadFileService.instance; + vi.mocked(isAlreadyExistsError).mockReturnValue(false); + vi.mocked(stat).mockResolvedValue(createMockStats(1024) as Awaited>); + vi.mocked(createReadStream).mockReturnValue(createMockReadStream() as ReturnType); + vi.mocked(isFileThumbnailable).mockReturnValue(false); + vi.mocked(createFileStreamWithBuffer).mockReturnValue({ + fileStream: createMockReadStream() as ReturnType, + bufferStream: undefined, + }); + vi.mocked(tryUploadThumbnail).mockResolvedValue(undefined); + vi.mocked(DriveFileService.instance.createFile).mockResolvedValue({ + uuid: 'mock-file-uuid', + fileId: 'mock-file-id', + } as Awaited>); + }); + + describe('uploadFilesInChunks', () => { + const bucket = 'test-bucket'; + const destinationFolderUuid = 'dest-uuid'; + const folderMap = new Map(); + + it('should properly return the total amount of bytes uploaded once finished with the uploads', async () => { + const files = [ + createFileSystemNodeFixture({ type: 'file', name: 'file1.txt', relativePath: 'file1.txt', size: 100 }), + createFileSystemNodeFixture({ type: 'file', name: 'file2.txt', relativePath: 'file2.txt', size: 200 }), + createFileSystemNodeFixture({ type: 'file', name: 'file3.txt', relativePath: 'file3.txt', size: 300 }), + ]; + const { currentProgress, emitProgress } = createProgressFixtures(); + + const uploadFileWithRetrySpy = vi.spyOn(sut, 'uploadFileWithRetry').mockResolvedValue('mock-file-id'); + + const result = await sut.uploadFilesInChunks({ + network: mockNetworkFacade, + filesToUpload: files, + folderMap, + bucket, + destinationFolderUuid, + currentProgress, + emitProgress, + }); + + expect(result).toBe(600); + expect(uploadFileWithRetrySpy).toHaveBeenCalledTimes(3); + uploadFileWithRetrySpy.mockRestore(); + }); + + it('should properly upload files in chunks of max 5', async () => { + const files = Array.from({ length: 12 }, (_, i) => + createFileSystemNodeFixture({ + type: 'file', + name: `file${i}.txt`, + relativePath: `file${i}.txt`, + size: 100, + }), + ); + const { currentProgress, emitProgress } = createProgressFixtures(); + + const uploadFileWithRetrySpy = vi.spyOn(sut, 'uploadFileWithRetry').mockResolvedValue('mock-file-id'); + + await sut.uploadFilesInChunks({ + network: mockNetworkFacade, + filesToUpload: files, + folderMap, + bucket, + destinationFolderUuid, + currentProgress, + emitProgress, + }); + + expect(uploadFileWithRetrySpy).toHaveBeenCalledTimes(12); + uploadFileWithRetrySpy.mockRestore(); + }); + + it('should properly emit progress and update the currentProgress object', async () => { + const files = [ + createFileSystemNodeFixture({ type: 'file', name: 'file1.txt', relativePath: 'file1.txt', size: 500 }), + createFileSystemNodeFixture({ type: 'file', name: 'file2.txt', relativePath: 'file2.txt', size: 1000 }), + ]; + const { currentProgress, emitProgress } = createProgressFixtures(); + + const uploadFileWithRetrySpy = vi.spyOn(sut, 'uploadFileWithRetry').mockResolvedValue('mock-file-id'); + + await sut.uploadFilesInChunks({ + network: mockNetworkFacade, + filesToUpload: files, + folderMap, + bucket, + destinationFolderUuid, + currentProgress, + emitProgress, + }); + + expect(currentProgress.itemsUploaded).toBe(2); + expect(currentProgress.bytesUploaded).toBe(1500); + expect(emitProgress).toHaveBeenCalledTimes(2); + uploadFileWithRetrySpy.mockRestore(); + }); + it('should skip files when parent folder is not found in folderMap', async () => { + const bucket = 'test-bucket'; + const destinationFolderUuid = 'dest-uuid'; + const folderMap = new Map(); + const files = [ + createFileSystemNodeFixture({ + type: 'file', + name: 'nested.txt', + relativePath: 'subfolder/nested.txt', + size: 100, + }), + ]; + const { currentProgress, emitProgress } = createProgressFixtures(); + + const uploadFileWithRetrySpy = vi.spyOn(sut, 'uploadFileWithRetry'); + + const result = await sut.uploadFilesInChunks({ + network: mockNetworkFacade, + filesToUpload: files, + folderMap, + bucket, + destinationFolderUuid, + currentProgress, + emitProgress, + }); + + expect(result).toBe(0); + expect(logger.warn).toHaveBeenCalledWith('Parent folder not found for subfolder/nested.txt, skipping...'); + expect(uploadFileWithRetrySpy).not.toHaveBeenCalled(); + + uploadFileWithRetrySpy.mockRestore(); + }); + }); + describe('uploadFileWithRetry', () => { + const bucket = 'test-bucket'; + const destinationFolderUuid = 'dest-uuid'; + + it('should properly create a file and return the created file id', async () => { + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'test', + relativePath: 'test.txt', + size: 1024, + absolutePath: '/path/to/test.txt', + }); + + const result = await sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + expect(result).toBe('mock-file-id'); + expect(stat).toHaveBeenCalledWith(file.absolutePath); + expect(mockNetworkFacade.uploadFile).toHaveBeenCalledWith( + expect.anything(), + 1024, + bucket, + expect.any(Function), + expect.any(Function), + ); + expect(DriveFileService.instance.createFile).toHaveBeenCalledWith( + expect.objectContaining({ + plainName: 'test', + type: 'txt', + size: 1024, + folderUuid: destinationFolderUuid, + fileId: 'mock-uploaded-file-id', + bucket, + encryptVersion: '03-aes', + }), + ); + }); + + it('should retry a maximum of 2 retries (3 total attempts) if an exception is thrown while uploading', async () => { + vi.useFakeTimers(); + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'test', + relativePath: 'test.txt', + size: 1024, + }); + const error = new Error('Network error'); + + vi.mocked(mockNetworkFacade.uploadFile) + .mockImplementationOnce((_stream, _size, _bucket, callback) => { + callback(error, null); + return { stop: vi.fn() } as unknown as ReturnType; + }) + .mockImplementationOnce((_stream, _size, _bucket, callback) => { + callback(error, null); + return { stop: vi.fn() } as unknown as ReturnType; + }) + .mockImplementationOnce((_stream, _size, _bucket, callback) => { + callback(null, 'success-file-id'); + return { stop: vi.fn() } as unknown as ReturnType; + }); + + const resultPromise = sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + await vi.runAllTimersAsync(); + + const result = await resultPromise; + + expect(result).toBe('mock-file-id'); + expect(mockNetworkFacade.uploadFile).toHaveBeenCalledTimes(3); + expect(logger.warn).toHaveBeenCalledTimes(2); + + vi.useRealTimers(); + }); + + it('should skip empty files and return null', async () => { + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'empty.txt', + relativePath: 'empty.txt', + size: 0, + }); + + vi.mocked(stat).mockResolvedValue(createMockStats(0) as Awaited>); + + const result = await sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + expect(result).toBeNull(); + expect(logger.warn).toHaveBeenCalledWith('Skipping empty file: empty.txt'); + expect(mockNetworkFacade.uploadFile).not.toHaveBeenCalled(); + }); + + it('should call tryUploadThumbnail when bufferStream is present', async () => { + const mockBufferStream = { getBuffer: vi.fn() }; + vi.mocked(createFileStreamWithBuffer).mockReturnValue({ + fileStream: createMockReadStream() as ReturnType, + bufferStream: mockBufferStream as unknown as ReturnType['bufferStream'], + }); + + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'image.png', + relativePath: 'image.png', + size: 1024, + }); + + await sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + expect(tryUploadThumbnail).toHaveBeenCalledWith({ + bufferStream: mockBufferStream, + fileType: 'png', + userBucket: bucket, + fileUuid: 'mock-file-uuid', + networkFacade: mockNetworkFacade, + }); + }); + + it('should return null when file already exists', async () => { + vi.mocked(isAlreadyExistsError).mockReturnValue(true); + vi.mocked(mockNetworkFacade.uploadFile).mockImplementation((_stream, _size, _bucket, callback) => { + callback(new Error('File already exists'), null); + return { stop: vi.fn() } as unknown as ReturnType; + }); + + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'duplicate.txt', + relativePath: 'duplicate.txt', + size: 1024, + }); + + const result = await sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + expect(result).toBeNull(); + expect(logger.info).toHaveBeenCalledWith('File duplicate.txt already exists, skipping...'); + }); + + it('should return null after max retries exceeded', async () => { + vi.useFakeTimers(); + const file = createFileSystemNodeFixture({ + type: 'file', + name: 'fail.txt', + relativePath: 'fail.txt', + size: 1024, + }); + const error = new Error('Network error'); + + vi.mocked(mockNetworkFacade.uploadFile).mockImplementation((_stream, _size, _bucket, callback) => { + callback(error, null); + return { stop: vi.fn() } as unknown as ReturnType; + }); + + const resultPromise = sut.uploadFileWithRetry({ + file, + network: mockNetworkFacade, + bucket, + parentFolderUuid: destinationFolderUuid, + }); + + await vi.runAllTimersAsync(); + + const result = await resultPromise; + + expect(result).toBeNull(); + expect(logger.error).toHaveBeenCalledWith('Failed to upload file fail.txt after 3 attempts'); + expect(mockNetworkFacade.uploadFile).toHaveBeenCalledTimes(3); + + vi.useRealTimers(); + }); + }); +}); diff --git a/test/services/network/upload/upload.service.helpers.ts b/test/services/network/upload/upload.service.helpers.ts index 4a4795f..9d43a91 100644 --- a/test/services/network/upload/upload.service.helpers.ts +++ b/test/services/network/upload/upload.service.helpers.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream'; import { vi } from 'vitest'; import { FileSystemNode } from '../../../../src/services/local-filesystem/local-filesystem.types'; @@ -22,3 +23,17 @@ export function createProgressFixtures() { emitProgress: vi.fn(), }; } + +export function createMockStats(size: number) { + return { + size, + birthtime: new Date('2024-01-01'), + mtime: new Date('2024-01-02'), + }; +} + +export function createMockReadStream() { + const stream = new Readable(); + stream.push(null); + return stream; +} diff --git a/test/utils/thumbnail.utils.test.ts b/test/utils/thumbnail.utils.test.ts new file mode 100644 index 0000000..60d1037 --- /dev/null +++ b/test/utils/thumbnail.utils.test.ts @@ -0,0 +1,30 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { createFileStreamWithBuffer } from '../../src/utils/thumbnail.utils'; +import { BufferStream } from '../../src/utils/stream.utils'; +import path from 'node:path'; +import { Readable } from 'node:stream'; + +describe('createFileStreamWithBuffer', () => { + const testFilePath = path.join(process.cwd(), 'test/fixtures/test-content.fixture.txt'); + + beforeEach(() => { + vi.restoreAllMocks(); + }); + + it('should create BufferStream and pipe stream when file type is thumbnailable', () => { + const result = createFileStreamWithBuffer({ path: testFilePath, fileType: 'png' }); + + expect(result.bufferStream).toBeDefined(); + expect(result.bufferStream).toBeInstanceOf(BufferStream); + expect(result.fileStream).toBeDefined(); + expect(result.fileStream).toBeInstanceOf(Readable); + }); + + it('should not create BufferStream when file type is not thumbnailable', () => { + const result = createFileStreamWithBuffer({ path: testFilePath, fileType: 'txt' }); + + expect(result.bufferStream).toBeUndefined(); + expect(result.fileStream).toBeDefined(); + expect(result.fileStream).toBeInstanceOf(Readable); + }); +});