Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xin.chen committed Sep 26, 2024
1 parent cd88865 commit 716a6a2
Showing 1 changed file with 8 additions and 25 deletions.
33 changes: 8 additions & 25 deletions python/aibrix/aibrix/batch/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, policy=SchedulePolicy.FIFO):
self._inactive_jobs = set()
self._due_jobs_list = []
# Start sliding process in an async way
asyncio.create_task(self.sliding_process())
asyncio.create_task(self.time_sliding_process())
self._policy = policy

def append_job(self, job_id, due_time_seconds):
Expand All @@ -59,6 +59,8 @@ def key_func(x):
def schedule_get_job(self):
# Scheduler outputs a job to be processed following the specified policy.
job_id = None

# [TODO] use class abstraction for SchedulingPolicy
if self._policy == SchedulePolicy.FIFO:
if not self._jobs_queue.empty():
job_id = self._jobs_queue.get()
Expand All @@ -77,11 +79,11 @@ def schedule_get_job(self):

return job_id

def get_inactive_jobs(self):
return self._inactive_jobs

async def expire_jobs(self):
# This is to expire jobs, which is important to achieve fairness.
# Here fairness means that not all jobs are executed guaranteed.
# Now we use the metric of due time to mark job expired.
# Later this can extend it to other metrics.
# This is to expire jobs based on specified due time per job.
if self._policy == SchedulePolicy.FIFO:
current_time = time.time()
idx = 0
Expand All @@ -101,7 +103,7 @@ async def expire_jobs(self):
else:
print("Unsupported scheduling policy!")

async def sliding_process(self):
async def time_sliding_process(self):
"""
This is a long-running process to check if jobs have expired or not.
"""
Expand All @@ -116,22 +118,3 @@ async def sliding_process(self):
print("Sliding, round: ", round_id)
round_id += 1
await asyncio.sleep(time_to_next_run) # Wait for the remaining time


async def driver_proc():
"""
This is main driver process on how to call scheduler.
"""
_scheduler = JobScheduler()
_scheduler.append_job(1, 2)
_scheduler.append_job(2, 4)
_scheduler.append_job(3, 7)
_scheduler.append_job(4, 15)
_scheduler.append_job(100, 7)
await asyncio.sleep(5 * EXPIRE_INTERVAL)
one_job = _scheduler.schedule_get_job()
print("###### an active job: ", one_job)
await asyncio.sleep(5 * EXPIRE_INTERVAL)

one_job = _scheduler.schedule_get_job()
print("###### an active job: ", one_job)

0 comments on commit 716a6a2

Please sign in to comment.