@@ -67,27 +67,32 @@ def add_job(self, job_data: Dict[str, Any]):
6767
6868 def _process_workflow_immediately (self , run_id : int ):
6969 """Process workflow immediately when we have enough jobs"""
70- with self ._lock :
71- # Skip if already processed
72- if run_id in self .processed_workflows :
73- logger .info (f"Workflow run { run_id } already processed, skipping" )
74- return
70+ try :
71+ with self ._lock :
72+ # Skip if already processed
73+ if run_id in self .processed_workflows :
74+ logger .info (f"Workflow run { run_id } already processed, skipping" )
75+ return
76+
77+ jobs = self .workflow_jobs [run_id ]
7578
76- jobs = self . workflow_jobs [ run_id ]
77-
78- if not jobs :
79- logger . warning ( f"No jobs found for workflow run { run_id } " )
80- return
79+ if not jobs :
80+ logger . warning ( f"No jobs found for workflow run { run_id } " )
81+ return
82+
83+ logger . info ( f"Processing workflow run { run_id } immediately with { len ( jobs ) } jobs" )
8184
82- logger .info (f"Processing workflow run { run_id } immediately with { len (jobs )} jobs" )
83-
84- # Check if all jobs are complete
85- all_completed = all (job .get ("conclusion" ) is not None for job in jobs )
86- if all_completed :
87- logger .info (f"All jobs complete for workflow run { run_id } , sending trace" )
88- self ._send_workflow_trace (run_id )
89- else :
90- logger .info (f"Not all jobs complete for workflow run { run_id } , skipping" )
85+ # Check if all jobs are complete
86+ all_completed = all (job .get ("conclusion" ) is not None for job in jobs )
87+ if all_completed :
88+ logger .info (f"All jobs complete for workflow run { run_id } , sending trace" )
89+ self ._send_workflow_trace (run_id )
90+ else :
91+ logger .info (f"Not all jobs complete for workflow run { run_id } , skipping" )
92+ except Exception as e :
93+ logger .error (f"Error processing workflow run { run_id } immediately: { e } " , exc_info = True )
94+ # Ensure cleanup happens even if there's an exception
95+ self ._cleanup_workflow_run (run_id )
9196
9297 def _process_workflow_delayed (self , run_id : int ):
9398 """Process workflow after delay to allow all jobs to arrive"""
@@ -196,6 +201,26 @@ def _send_workflow_trace(self, run_id: int):
196201 if run_id in self .job_arrival_times :
197202 del self .job_arrival_times [run_id ]
198203
204+ def _cleanup_workflow_run (self , run_id : int ):
205+ """Clean up workflow run data to prevent resource leaks"""
206+ try :
207+ with self ._lock :
208+ # Mark as processed to prevent reprocessing
209+ self .processed_workflows .add (run_id )
210+
211+ # Clean up workflow data
212+ if run_id in self .workflow_jobs :
213+ del self .workflow_jobs [run_id ]
214+ if run_id in self .workflow_timers :
215+ self .workflow_timers [run_id ].cancel ()
216+ del self .workflow_timers [run_id ]
217+ if run_id in self .job_arrival_times :
218+ del self .job_arrival_times [run_id ]
219+
220+ logger .info (f"Cleaned up workflow run { run_id } after exception" )
221+ except Exception as cleanup_error :
222+ logger .error (f"Error during cleanup of workflow run { run_id } : { cleanup_error } " , exc_info = True )
223+
199224 def _send_individual_traces (self , jobs : List [Dict [str , Any ]]):
200225 """DISABLED: Individual job traces are now handled by WorkflowTracer"""
201226 logger .info (f"DISABLED: Individual traces for { len (jobs )} jobs - now handled by WorkflowTracer" )
0 commit comments