Skip to content

Commit

Permalink
fix(scheduler): query scheduler should remove failed extraction job f…
Browse files Browse the repository at this point in the history
…rom waiting jobs dictionaries (y-scope#605)
  • Loading branch information
aestriplex committed Feb 16, 2025
1 parent 2aa5c5c commit 7886f3f
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,17 @@ async def handle_finished_search_job(
logger.info(f"Completed job {job_id} with failing tasks.")
del active_jobs[job_id]

def get_waiting_jobs(job: QueryJob) -> List[str]:
waiting_jobs: List[str]

if QueryJobType.EXTRACT_IR == job.get_type():
extract_ir_config: ExtractIrJobConfig = job.get_config()
waiting_jobs = active_file_split_ir_extractions.pop(extract_ir_config.file_split_id)
else:
extract_json_config: ExtractJsonJobConfig = job.get_config()
waiting_jobs = active_archive_json_extractions.pop(extract_json_config.archive_id)

return waiting_jobs

async def handle_finished_stream_extraction_job(
db_conn, job: QueryJob, task_results: List[Any]
Expand Down Expand Up @@ -953,15 +964,9 @@ async def handle_finished_stream_extraction_job(
else:
logger.info(f"Completed stream extraction job {job_id} with failing tasks.")

waiting_jobs: List[str]
if QueryJobType.EXTRACT_IR == job.get_type():
extract_ir_config: ExtractIrJobConfig = job.get_config()
waiting_jobs = active_file_split_ir_extractions.pop(extract_ir_config.file_split_id)
else:
extract_json_config: ExtractJsonJobConfig = job.get_config()
waiting_jobs = active_archive_json_extractions.pop(extract_json_config.archive_id)

waiting_jobs = get_waiting_jobs(job)
waiting_jobs.remove(job_id)

for waiting_job in waiting_jobs:
logger.info(f"Setting status to {new_job_status.to_str()} for waiting jobs: {waiting_job}.")
set_job_or_task_status(
Expand Down Expand Up @@ -995,7 +1000,11 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri):
msg = ReducerHandlerMessage(ReducerHandlerMessageType.FAILURE)
await job.reducer_handler_msg_queues.put_to_handler(msg)

waiting_jobs = get_waiting_jobs(job)

waiting_jobs.remove(job_id)
del active_jobs[job_id]

set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
Expand Down

0 comments on commit 7886f3f

Please sign in to comment.