Skip to content

Commit 8d0303a

Browse files
committed
Drop times queue and handling timing in each worker
1 parent 94890cd commit 8d0303a

File tree

4 files changed

+95
-87
lines changed

4 files changed

+95
-87
lines changed

src/guidellm/scheduler/result.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"SchedulerRequestResult",
1818
"SchedulerResult",
1919
"SchedulerRunInfo",
20-
"WorkerProcessRequestTime",
20+
"WorkerProcessRequest",
2121
"WorkerProcessResult",
2222
]
2323

@@ -147,8 +147,8 @@ class SchedulerRequestResult(
147147

148148

149149
@dataclass
150-
class WorkerProcessRequestTime:
151-
start_time: float
150+
class WorkerProcessRequest(Generic[RequestT, ResponseT]):
151+
session: RequestSession[RequestT, ResponseT]
152152
timeout_time: float
153153
queued_time: float
154154

@@ -163,6 +163,5 @@ class WorkerProcessResult(Generic[RequestT, ResponseT]):
163163

164164
@dataclass
165165
class MPQueues(Generic[RequestT, ResponseT]):
166-
requests: Queue[RequestSession[RequestT, ResponseT]]
167-
times: Queue[WorkerProcessRequestTime]
166+
requests: Queue[WorkerProcessRequest[RequestT, ResponseT]]
168167
responses: Queue[WorkerProcessResult[RequestT, ResponseT]]

src/guidellm/scheduler/scheduler.py

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from loguru import logger
1818

1919
from guidellm.config import settings
20-
from guidellm.request.session import RequestSession
2120
from guidellm.request.types import (
2221
RequestT,
2322
ResponseT,
@@ -27,7 +26,7 @@
2726
SchedulerRequestResult,
2827
SchedulerResult,
2928
SchedulerRunInfo,
30-
WorkerProcessRequestTime,
29+
WorkerProcessRequest,
3130
WorkerProcessResult,
3231
)
3332
from guidellm.scheduler.strategy import SchedulingStrategy
@@ -127,10 +126,14 @@ async def run(
127126
) as executor,
128127
):
129128
requests_iter: Optional[Iterator[Any]] = None
129+
# TODO: Configurable delay and move somewhere more appropriate
130+
scheduling_strategy.start_time = (
131+
time.time()
132+
) # Add a small delay to allow processes to start
130133
futures, queues, stop_event = await self._start_processes(
131134
manager, executor, scheduling_strategy
132135
)
133-
run_info, requests_iter, times_iter = self._run_setup(
136+
run_info, requests_iter = self._run_setup(
134137
futures, scheduling_strategy, max_number, max_duration
135138
)
136139
yield SchedulerResult(
@@ -147,17 +150,16 @@ async def run(
147150

148151
if (
149152
requests_iter is None
150-
and run_info.completed_requests >= run_info.created_requests
153+
# FIXME: Need new way to handle max requests
154+
# and run_info.completed_requests >= run_info.created_requests
151155
):
152156
# we've exhausted all requests we've wanted to run
153157
# and yielded all responses
154158
break
155159

156160
requests_iter = self._add_requests(
157161
requests_iter,
158-
times_iter,
159162
queues.requests,
160-
queues.times,
161163
run_info,
162164
)
163165
await asyncio.sleep(0) # enable requests to start
@@ -196,7 +198,6 @@ async def _start_processes(
196198
requests=manager.Queue(
197199
maxsize=scheduling_strategy.processing_requests_limit
198200
),
199-
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
200201
responses=manager.Queue(),
201202
)
202203
stop_event = manager.Event()
@@ -229,10 +230,12 @@ async def _start_processes(
229230
executor,
230231
self.worker.process_loop_asynchronous,
231232
queues,
233+
scheduling_strategy,
232234
stop_event,
233235
False, # TODO: Make configurable
234236
requests_limit,
235237
id_,
238+
num_processes,
236239
)
237240
)
238241

@@ -246,11 +249,9 @@ def _run_setup(
246249
scheduling_strategy: SchedulingStrategy,
247250
max_number: Optional[int],
248251
max_duration: Optional[float],
249-
) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]:
252+
) -> tuple[SchedulerRunInfo, Iterator[Any]]:
250253
requests_iter = iter(self.request_loader)
251-
start_time = time.time()
252-
times_iter = iter(scheduling_strategy.request_times())
253-
end_time = time.time() + (max_duration or math.inf)
254+
end_time = scheduling_strategy.start_time + (max_duration or math.inf)
254255
end_number = max_number or math.inf
255256

256257
try:
@@ -268,27 +269,28 @@ def _run_setup(
268269
)
269270

270271
info = SchedulerRunInfo(
271-
start_time=start_time,
272+
start_time=scheduling_strategy.start_time,
272273
end_time=end_time,
273274
end_number=end_number,
274275
processes=len(processes),
275276
strategy=scheduling_strategy,
276277
)
277278

278-
return info, requests_iter, times_iter
279+
return info, requests_iter
279280

280281
def _add_requests(
281282
self,
282283
requests_iter: Optional[Iterator[Any]],
283-
times_iter: Iterator[float],
284-
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
285-
times_queue: Queue[WorkerProcessRequestTime],
284+
requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]],
286285
run_info: SchedulerRunInfo,
287286
) -> Optional[Iterator[Any]]:
288287
if requests_iter is not None:
289288
try:
290289
added_count = 0
291290

291+
if time.time() >= run_info.end_time:
292+
raise StopIteration
293+
292294
while (
293295
not requests_queue.full()
294296
and added_count < settings.max_add_requests_per_loop
@@ -297,23 +299,16 @@ def _add_requests(
297299
raise StopIteration
298300

299301
session = next(requests_iter)
300-
requests_queue.put(session)
301-
for _ in range(len(session)):
302-
if (
303-
request_time := next(times_iter)
304-
) >= run_info.end_time or time.time() >= run_info.end_time:
305-
raise StopIteration
306-
307-
work_req = WorkerProcessRequestTime(
308-
start_time=request_time,
309-
timeout_time=run_info.end_time,
310-
queued_time=time.time(),
311-
)
312-
times_queue.put(work_req)
313-
314-
run_info.created_requests += 1
315-
run_info.queued_requests += 1
316-
added_count += 1
302+
work_req = WorkerProcessRequest(
303+
session=session,
304+
timeout_time=run_info.end_time,
305+
queued_time=time.time(),
306+
)
307+
requests_queue.put(work_req)
308+
309+
run_info.created_requests += len(session)
310+
run_info.queued_requests += len(session)
311+
added_count += len(session)
317312
except StopIteration:
318313
# we've reached the limit number, limit time, or exhausted the requests
319314
# set to None to stop adding more and tell the loop no more requests

src/guidellm/scheduler/strategy.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class SchedulingStrategy(StandardBaseModel):
4444
type_: Literal["strategy"] = Field(
4545
description="The type of scheduling strategy schedule requests with.",
4646
)
47+
start_time: float = Field(
48+
default_factory=time.time,
49+
description="The start time for the scheduling strategy.",
50+
)
4751

4852
@property
4953
def processing_mode(self) -> Literal["sync", "async"]:
@@ -175,8 +179,9 @@ def request_times(self) -> Generator[float, None, None]:
175179
176180
:return: A generator that yields time.time() for immediate request scheduling.
177181
"""
182+
init_time = self.start_time
178183
while True:
179-
yield time.time()
184+
yield max(init_time, time.time())
180185

181186

182187
class ConcurrentStrategy(SchedulingStrategy):
@@ -262,8 +267,9 @@ def request_times(self) -> Generator[float, None, None]:
262267
263268
:return: A generator that yields time.time() for immediate request scheduling.
264269
"""
270+
init_time = self.start_time
265271
while True:
266-
yield time.time()
272+
yield max(init_time, time.time())
267273

268274

269275
class ThroughputStrategy(SchedulingStrategy):
@@ -336,10 +342,9 @@ def request_times(self) -> Generator[float, None, None]:
336342
:return: A generator that yields the start time.time()
337343
for immediate request scheduling.
338344
"""
339-
start_time = time.time()
340-
345+
init_time = self.start_time
341346
while True:
342-
yield start_time
347+
yield init_time
343348

344349

345350
class AsyncConstantStrategy(ThroughputStrategy):
@@ -391,24 +396,24 @@ def request_times(self) -> Generator[float, None, None]:
391396
392397
:return: A generator that yields timestamps for request scheduling.
393398
"""
394-
start_time = time.time()
395399
constant_increment = 1.0 / self.rate
396400

401+
init_time = self.start_time
397402
# handle bursts first to get to the desired rate
398403
if self.initial_burst is not None:
399404
# send an initial burst equal to the rate
400405
# to reach the target rate
401406
burst_count = math.floor(self.rate)
402407
for _ in range(burst_count):
403-
yield start_time
408+
yield init_time
404409

405-
start_time += constant_increment
410+
init_time += constant_increment
406411

407412
counter = 0
408413

409414
# continue with constant rate after bursting
410415
while True:
411-
yield start_time + constant_increment * counter
416+
yield init_time + constant_increment * counter
412417
counter += 1
413418

414419

@@ -461,24 +466,23 @@ def request_times(self) -> Generator[float, None, None]:
461466
462467
:return: A generator that yields timestamps for request scheduling.
463468
"""
464-
start_time = time.time()
465-
469+
init_time = self.start_time
466470
if self.initial_burst is not None:
467471
# send an initial burst equal to the rate
468472
# to reach the target rate
469473
burst_count = math.floor(self.rate)
470474
for _ in range(burst_count):
471-
yield start_time
475+
yield init_time
472476
else:
473-
yield start_time
477+
yield init_time
474478

475479
# set the random seed for reproducibility
476480
rand = random.Random(self.random_seed) # noqa: S311
477481

478482
while True:
479483
inter_arrival_time = rand.expovariate(self.rate)
480-
start_time += inter_arrival_time
481-
yield start_time
484+
init_time += inter_arrival_time
485+
yield init_time
482486

483487

484488
def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> str:

0 commit comments

Comments
 (0)