Skip to content

Commit 48a9a69

Browse files
committed
feat: implement smart workflow completion detection
- Replace hardcoded job count (5) with dynamic thresholds - Add job arrival time tracking for intelligent processing - Implement timeout-based detection for small workflows - Support workflows of any size (1+ jobs) with appropriate timing - Add cleanup for arrival time tracking data - Improve logging for better debugging Smart thresholds: - 10+ jobs: Process immediately when all complete - 5-9 jobs: Process when all complete - 3-4 jobs: Process when all complete - 1-2 jobs: Process after 3s timeout or immediately if single job
1 parent b844639 commit 48a9a69

File tree

1 file changed

+50
-26
lines changed

1 file changed

+50
-26
lines changed

src/web_app_handler.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(self, dsn: str, token: str, dry_run: bool = False):
3131
self.processed_jobs = set() # Track processed job IDs to avoid duplicates
3232
self.workflow_timers = {} # run_id -> timer for delayed processing
3333
self.processed_workflows = set() # Track processed workflow runs to avoid duplicates
34+
self.job_arrival_times = defaultdict(list) # run_id -> list of arrival timestamps
3435
self._lock = threading.Lock() # Thread lock for preventing race conditions
3536

3637
def add_job(self, job_data: Dict[str, Any]):
@@ -47,18 +48,22 @@ def add_job(self, job_data: Dict[str, Any]):
4748
self.processed_jobs.add(job_id)
4849
self.workflow_jobs[run_id].append(job)
4950

51+
# Track job arrival time for smart detection
52+
self.job_arrival_times[run_id].append(time.time())
53+
5054
logger.info(f"Added job {job['name']} (ID: {job_id}) to workflow run {run_id}")
5155

52-
# Check if we have enough jobs to process the workflow
53-
# For testing, we'll wait for 5 jobs (the expected number in our test workflow)
54-
if len(self.workflow_jobs[run_id]) >= 5 and run_id not in self.processed_workflows:
55-
logger.info(f"Workflow run {run_id} has {len(self.workflow_jobs[run_id])} jobs, setting timer to process in 2 seconds")
56-
# Set a short timer to allow all jobs to arrive
57-
timer = threading.Timer(2.0, self._process_workflow_immediately, args=[run_id])
58-
self.workflow_timers[run_id] = timer
59-
timer.start()
60-
else:
61-
logger.info(f"Workflow run {run_id} has {len(self.workflow_jobs[run_id])} jobs, waiting for more")
56+
# Smart workflow completion detection
57+
jobs_count = len(self.workflow_jobs[run_id])
58+
if run_id not in self.processed_workflows:
59+
if self._should_process_workflow(run_id, jobs_count):
60+
logger.info(f"Workflow run {run_id} has {jobs_count} jobs, setting timer to process in 2 seconds")
61+
# Set a short timer to allow all jobs to arrive
62+
timer = threading.Timer(2.0, self._process_workflow_immediately, args=[run_id])
63+
self.workflow_timers[run_id] = timer
64+
timer.start()
65+
else:
66+
logger.info(f"Workflow run {run_id} has {jobs_count} jobs, waiting for more")
6267

6368
def _process_workflow_immediately(self, run_id: int):
6469
"""Process workflow immediately when we have enough jobs"""
@@ -112,29 +117,46 @@ def _process_workflow_delayed(self, run_id: int):
112117
self.workflow_timers[run_id].cancel()
113118
del self.workflow_timers[run_id]
114119

115-
def _is_workflow_complete(self, run_id: int, current_job: Dict[str, Any]) -> bool:
116-
"""Check if all jobs in the workflow are complete"""
120+
def _should_process_workflow(self, run_id: int, jobs_count: int) -> bool:
121+
"""Smart detection of when to process workflow based on job patterns and timing"""
122+
117123
jobs = self.workflow_jobs[run_id]
124+
arrival_times = self.job_arrival_times[run_id]
118125

119-
# For webhook testing, wait for multiple jobs to complete
120-
# Based on the logs, we expect around 6-7 jobs per workflow
121-
expected_jobs = 6 # Adjust based on actual workflow structure
126+
# All jobs must be completed
127+
all_completed = all(job.get("conclusion") is not None for job in jobs)
128+
if not all_completed:
129+
return False
122130

123-
if len(jobs) >= expected_jobs:
124-
all_completed = all(job.get("conclusion") is not None for job in jobs)
125-
if all_completed:
126-
logger.info(f"Workflow run {run_id} appears complete with {len(jobs)} jobs")
127-
return True
128-
elif len(jobs) >= 1:
129-
# For testing, also trigger if we have at least 1 job and it's been a while
130-
# This handles cases where not all jobs arrive
131-
all_completed = all(job.get("conclusion") is not None for job in jobs)
132-
if all_completed:
133-
logger.info(f"Workflow run {run_id} appears complete with {len(jobs)} jobs (partial)")
131+
# Smart thresholds based on job count patterns
132+
if jobs_count >= 10:
133+
# Large workflows (10+ jobs) - process immediately when all complete
134+
return True
135+
elif jobs_count >= 5:
136+
# Medium workflows (5-9 jobs) - process when all complete
137+
return True
138+
elif jobs_count >= 3:
139+
# Small workflows (3-4 jobs) - process when all complete
140+
return True
141+
elif jobs_count >= 1:
142+
# Single or few jobs - check if enough time has passed since last arrival
143+
if len(arrival_times) >= 1:
144+
time_since_last_job = time.time() - arrival_times[-1]
145+
# If no new jobs for 3 seconds, process what we have
146+
if time_since_last_job > 3.0:
147+
return True
148+
149+
# For single jobs, process immediately
150+
if jobs_count == 1:
134151
return True
135152

136153
return False
137154

155+
def _is_workflow_complete(self, run_id: int, current_job: Dict[str, Any]) -> bool:
156+
"""Check if all jobs in the workflow are complete (legacy method)"""
157+
jobs_count = len(self.workflow_jobs[run_id])
158+
return self._should_process_workflow(run_id, jobs_count)
159+
138160
def _send_workflow_trace(self, run_id: int):
139161
"""Send workflow-level trace for all jobs in the run"""
140162
# Check if already processed to prevent duplicates
@@ -171,6 +193,8 @@ def _send_workflow_trace(self, run_id: int):
171193
if run_id in self.workflow_timers:
172194
self.workflow_timers[run_id].cancel()
173195
del self.workflow_timers[run_id]
196+
if run_id in self.job_arrival_times:
197+
del self.job_arrival_times[run_id]
174198

175199
def _send_individual_traces(self, jobs: List[Dict[str, Any]]):
176200
"""DISABLED: Individual job traces are now handled by WorkflowTracer"""

0 commit comments

Comments
 (0)