@@ -9,10 +9,13 @@ import * as path from 'path';
99import { ConfigService } from '@nestjs/config' ;
1010import * as fs from 'fs' ;
1111import { promises as fsPromises } from 'fs' ;
12+ import { CACHE_DIR } from './constants' ;
13+ import { FileRemoval } from './cleanup/removalUtils' ;
14+ import * as kill from 'tree-kill' ;
1215
1316export interface Job {
1417 id : string ;
15- status : 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' ;
18+ status : 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal' ;
1619 progress : number ;
1720 outputPath : string ;
1821 inputUrl : string ;
@@ -32,21 +35,23 @@ export class AppService {
3235 private jobQueue : string [ ] = [ ] ;
3336 private maxConcurrentJobs : number ;
3437 private cacheDir : string ;
38+ private immediateRemoval : boolean ;
3539
3640 constructor (
3741 private logger : Logger ,
3842 private configService : ConfigService ,
43+ private readonly fileRemoval : FileRemoval
44+
3945 ) {
40- this . cacheDir = path . join ( process . cwd ( ) , 'cache' ) ;
46+ this . cacheDir = CACHE_DIR ;
4147 this . maxConcurrentJobs = this . configService . get < number > (
4248 'MAX_CONCURRENT_JOBS' ,
4349 1 ,
4450 ) ;
45-
46- // Ensure the cache directory exists
47- if ( ! fs . existsSync ( this . cacheDir ) ) {
48- fs . mkdirSync ( this . cacheDir , { recursive : true } ) ;
49- }
51+ this . immediateRemoval = this . configService . get < boolean > (
52+ 'REMOVE_FILE_AFTER_RIGHT_DOWNLOAD' ,
53+ true ,
54+ ) ;
5055 }
5156
5257 async downloadAndCombine (
@@ -92,7 +97,7 @@ export class AppService {
9297 if ( ! deviceId ) {
9398 return this . activeJobs ;
9499 }
95- return this . activeJobs . filter ( ( job ) => job . deviceId === deviceId ) ;
100+ return this . activeJobs . filter ( ( job ) => job . deviceId === deviceId && job . status !== 'ready-for-removal' ) ;
96101 }
97102
98103 async deleteCache ( ) : Promise < { message : string } > {
@@ -110,23 +115,74 @@ export class AppService {
110115 }
111116 }
112117
118+ removeJob ( jobId : string ) : void {
119+ this . activeJobs = this . activeJobs . filter ( job => job . id !== jobId ) ;
120+ this . logger . log ( `Job ${ jobId } removed.` ) ;
121+ }
122+
113123 cancelJob ( jobId : string ) : boolean {
114- const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
124+ this . completeJob ( jobId ) ;
125+ const job = this . activeJobs . find ( job => job . id === jobId ) ;
115126 const process = this . ffmpegProcesses . get ( jobId ) ;
127+
128+ const finalizeJobRemoval = ( ) => {
129+ if ( job ) {
130+ this . jobQueue = this . jobQueue . filter ( id => id !== jobId ) ;
131+
132+ if ( this . immediateRemoval === true || job . progress < 100 ) {
133+ this . fileRemoval . cleanupReadyForRemovalJobs ( [ job ] ) ;
134+ this . activeJobs = this . activeJobs . filter ( activeJob => activeJob . id !== jobId ) ;
135+ this . logger . log ( `Job ${ jobId } removed` ) ;
136+ }
137+ else {
138+ this . logger . log ( 'Immediate removal is not allowed, cleanup service will take care in due time' )
139+ }
140+ }
141+ this . checkQueue ( ) ;
142+ } ;
143+
116144 if ( process ) {
117- process . kill ( 'SIGKILL' ) ;
145+ try {
146+ this . logger . log ( `Attempting to kill process tree for PID ${ process . pid } ` ) ;
147+ new Promise < void > ( ( resolve , reject ) => {
148+ kill ( process . pid , 'SIGINT' , ( err ) => {
149+ if ( err ) {
150+ this . logger . error ( `Failed to kill process tree for PID ${ process . pid } : ${ err . message } ` ) ;
151+ reject ( err ) ;
152+ } else {
153+ this . logger . log ( `Successfully killed process tree for PID ${ process . pid } ` ) ;
154+ resolve ( ) ;
155+ finalizeJobRemoval ( )
156+ }
157+ } ) ;
158+ } ) ;
159+ } catch ( err ) {
160+ this . logger . error ( `Error terminating process for job ${ jobId } : ${ err . message } ` ) ;
161+ }
118162 this . ffmpegProcesses . delete ( jobId ) ;
163+ return true ;
164+ } else {
165+ finalizeJobRemoval ( ) ;
166+ return true ;
119167 }
120-
168+ }
169+
170+ completeJob ( jobId : string ) :void {
171+ const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
121172 if ( job ) {
122- this . jobQueue = this . jobQueue . filter ( ( id ) => id !== jobId ) ;
123- this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
173+ job . status = 'ready-for-removal' ;
174+ job . timestamp = new Date ( )
175+ this . logger . log ( `Job ${ jobId } marked as completed and ready for removal.` ) ;
176+ } else {
177+ this . logger . warn ( `Job ${ jobId } not found. Cannot mark as completed.` ) ;
124178 }
179+ }
125180
126- this . checkQueue ( ) ;
127-
128- this . logger . log ( `Job ${ jobId } canceled` ) ;
129- return true ;
181+ cleanupJob ( jobId : string ) : void {
182+ const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
183+ this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
184+ this . ffmpegProcesses . delete ( jobId ) ;
185+ this . videoDurations . delete ( jobId ) ;
130186 }
131187
132188 getTranscodedFilePath ( jobId : string ) : string | null {
@@ -137,12 +193,6 @@ export class AppService {
137193 return null ;
138194 }
139195
140- cleanupJob ( jobId : string ) : void {
141- this . activeJobs = this . activeJobs . filter ( ( job ) => job . id !== jobId ) ;
142- this . ffmpegProcesses . delete ( jobId ) ;
143- this . videoDurations . delete ( jobId ) ;
144- }
145-
146196 getMaxConcurrentJobs ( ) : number {
147197 return this . maxConcurrentJobs ;
148198 }
@@ -219,24 +269,29 @@ export class AppService {
219269 }
220270
221271 private checkQueue ( ) {
222- const runningJobs = Array . from ( this . activeJobs . values ( ) ) . filter (
223- ( job ) => job . status === 'optimizing' ,
224- ) . length ;
272+ let runningJobs = this . activeJobs . filter ( ( job ) => job . status === 'optimizing' )
273+ . length ;
225274
226275 while ( runningJobs < this . maxConcurrentJobs && this . jobQueue . length > 0 ) {
227276 const nextJobId = this . jobQueue . shift ( ) ;
228277 if ( nextJobId ) {
229278 this . startJob ( nextJobId ) ;
279+ runningJobs ++ ; // Now we track the newly started job
230280 }
231281 }
232282 }
233283
284+
234285 private startJob ( jobId : string ) {
235286 const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
236287 if ( job ) {
237288 job . status = 'optimizing' ;
238289 const ffmpegArgs = this . getFfmpegArgs ( job . inputUrl , job . outputPath ) ;
239- this . startFFmpegProcess ( jobId , ffmpegArgs ) ;
290+ this . startFFmpegProcess ( jobId , ffmpegArgs )
291+ . finally ( ( ) => {
292+ // This runs after the returned Promise resolves or rejects.
293+ this . checkQueue ( ) ;
294+ } ) ;
240295 this . logger . log ( `Started job ${ jobId } ` ) ;
241296 }
242297 }
@@ -263,20 +318,19 @@ export class AppService {
263318 await this . getVideoDuration ( ffmpegArgs [ 1 ] , jobId ) ;
264319
265320 return new Promise ( ( resolve , reject ) => {
266- const ffmpegProcess = spawn ( 'ffmpeg' , ffmpegArgs ) ;
321+ const ffmpegProcess = spawn ( 'ffmpeg' , ffmpegArgs , { stdio : [ 'pipe' , 'pipe' , 'pipe' ] } ) ;
267322 this . ffmpegProcesses . set ( jobId , ffmpegProcess ) ;
268323
269324 ffmpegProcess . stderr . on ( 'data' , ( data ) => {
270325 this . updateProgress ( jobId , data . toString ( ) ) ;
271326 } ) ;
272-
327+
273328 ffmpegProcess . on ( 'close' , async ( code ) => {
274329 this . ffmpegProcesses . delete ( jobId ) ;
275330 this . videoDurations . delete ( jobId ) ;
276331
277332 const job = this . activeJobs . find ( ( job ) => job . id === jobId ) ;
278333 if ( ! job ) {
279- // Job was cancelled and removed, just resolve
280334 resolve ( ) ;
281335 return ;
282336 }
@@ -320,12 +374,10 @@ export class AppService {
320374 if ( job ) {
321375 job . status = 'failed' ;
322376 }
323- } finally {
324- // Check queue after job completion or failure
325- this . checkQueue ( ) ;
326377 }
327378 }
328379
380+
329381 private async getVideoDuration (
330382 inputUrl : string ,
331383 jobId : string ,
0 commit comments