Skip to content

Commit 0e7c4f5

Browse files
committed
process updates through job update loop
1 parent 2973bee commit 0e7c4f5

File tree

1 file changed

+10
-18
lines changed

1 file changed

+10
-18
lines changed

openeo/extra/job_management/_manager.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -511,18 +511,21 @@ def run_jobs(
511511
stats = collections.defaultdict(int)
512512

513513
self._worker_pool = _JobManagerWorkerThreadPool()
514-
self._download_pool = _JobManagerWorkerThreadPool()
514+
515+
if self._download_results:
516+
self._download_pool = _JobManagerWorkerThreadPool()
515517

516518

517519
while (
518520
sum(
519521
job_db.count_by_status(
520522
statuses=["not_started", "created", "queued_for_start", "queued", "running"]
521-
).values()
522-
)
523-
> 0
523+
).values()) > 0
524+
525+
or (self._worker_pool is not None and self._worker_pool.num_pending_tasks() > 0)
524526

525-
or self._worker_pool.num_pending_tasks() > 0
527+
or (self._download_pool is not None and self._download_pool.num_pending_tasks() > 0)
528+
526529
):
527530
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
528531
stats["run_jobs loop"] += 1
@@ -532,18 +535,6 @@ def run_jobs(
532535
time.sleep(self.poll_sleep)
533536
stats["sleep"] += 1
534537

535-
# TODO; run post process after shutdown once more to ensure completion?
536-
# Wait for all download tasks to complete
537-
if self._download_results and self._download_pool is not None:
538-
_log.info("Waiting for download tasks to complete...")
539-
while self._download_pool.num_pending_tasks() > 0:
540-
self._process_threadworker_updates(
541-
worker_pool=self._download_pool,
542-
job_db=job_db,
543-
stats=stats
544-
)
545-
time.sleep(1) # Brief pause to avoid busy waiting
546-
_log.info("All download tasks completed.")
547538

548539
self._worker_pool.shutdown()
549540
self._download_pool.shutdown()
@@ -775,13 +766,14 @@ def on_job_done(self, job: BatchJob, row):
775766
root_url=job_con.root_url,
776767
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
777768
job_id=job.job_id,
778-
df_idx=row.name, # TODO figure out correct index usage
769+
df_idx=row.name, #this is going to be the index in the not saterted dataframe; should not be an issue as there is no db update for download task
779770
download_dir=job_dir,
780771
)
781772
_log.info(f"Submitting download task {task} to download thread pool")
782773

783774
if self._download_pool is None:
784775
self._download_pool = _JobManagerWorkerThreadPool()
776+
785777
self._download_pool.submit_task(task)
786778

787779
def on_job_error(self, job: BatchJob, row):

0 commit comments

Comments
 (0)