Skip to content

chore: support getting the latest iteration status #3414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions tensorrt_llm/_torch/pyexecutor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class PyTorchConfig:
kv_cache_dtype: str = "auto"
use_kv_cache: bool = True
enable_iter_perf_stats: bool = False
iter_perf_latest_stats_size: Optional[int] = None
# If true, enables per request stats per iteration
# Must also set enable_iter_perf_stats to true to get request stats
enable_iter_req_stats: bool = False
Expand Down
9 changes: 5 additions & 4 deletions tensorrt_llm/_torch/pyexecutor/py_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
import traceback
import weakref
from collections import namedtuple
from collections import deque, namedtuple
from contextlib import contextmanager
from itertools import chain
from typing import Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -202,6 +202,7 @@ def __init__(self,
self.max_draft_tokens = max_draft_tokens
self.print_log = model_engine.pytorch_backend_config.print_iter_log
self.enable_iter_perf_stats = model_engine.pytorch_backend_config.enable_iter_perf_stats
self.iter_perf_latest_stats_size = model_engine.pytorch_backend_config.iter_perf_latest_stats_size
self.enable_iter_req_stats = model_engine.pytorch_backend_config.enable_iter_req_stats
self.num_fetch_requests_cur_rank = 0
self.num_fetch_requests = 0
Expand Down Expand Up @@ -249,7 +250,7 @@ def __init__(self,
self.is_shutdown = False

self.stats_lock = threading.Lock()
self.stats = []
self.stats = deque(maxlen=self.iter_perf_latest_stats_size)
self.start_times = {}
self.new_active_requests_queue_latency_ms = 0
self.gather_all_responses = False
Expand Down Expand Up @@ -391,8 +392,8 @@ def get_latest_iteration_stats(self):
latest_stats = (IterationStats(), None)
try:
self.stats_lock.acquire()
latest_stats = self.stats
self.stats = []
latest_stats = list(self.stats)
self.stats.clear()
finally:
self.stats_lock.release()

Expand Down
14 changes: 14 additions & 0 deletions tensorrt_llm/executor/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,20 @@ async def __anext__(self):
self._done = True
raise StopAsyncIteration

def get_latest(self, exp_size: Optional[int] = None) -> List[dict]:
results = []
cur_size = len(self.queue)
if cur_size == 0:
return results
if exp_size is None:
exp_size = cur_size
while cur_size > 0:
data = self.queue.get()
if cur_size <= exp_size:
results.append(json.loads(data))
cur_size -= 1
return results


def compute_logprobs(
k_prompt_logprobs: int,
Expand Down
6 changes: 6 additions & 0 deletions tensorrt_llm/llmapi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ async def get(self, timeout=None):
self._event.clear()
return res

def __len__(self):
return len(self._q)


class _SyncQueue:
"""
Expand Down Expand Up @@ -435,6 +438,9 @@ def get(self, timeout=None):
except asyncio.QueueEmpty:
time.sleep(0.01)

def __len__(self):
return len(self._aq)


def set_sched_setaffinity(required_cores: int):
''' Set the CPU affinity of the current process to the required number of
Expand Down
15 changes: 13 additions & 2 deletions tensorrt_llm/serve/openai_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# yapf: disable
from tensorrt_llm.executor import CppExecutorError
from tensorrt_llm.executor.postproc_worker import PostprocParams
from tensorrt_llm.executor.result import IterationResult
from tensorrt_llm.inputs import prompt_inputs
from tensorrt_llm.inputs.utils import ConversationMessage, apply_chat_template
from tensorrt_llm.llmapi import LLM
Expand Down Expand Up @@ -196,8 +197,18 @@ async def get_model(self) -> JSONResponse:

async def get_iteration_stats(self) -> JSONResponse:
stats = []
async for stat in self.llm.get_stats_async(2):
stats.append(stat)
enable_iter_perf_stats = self.llm.pytorch_backend_config.enable_iter_perf_stats
if not enable_iter_perf_stats:
return stats
iter_perf_latest_stats_size = self.llm.pytorch_backend_config.iter_perf_latest_stats_size
if iter_perf_latest_stats_size is not None:
iter_result = self.llm.get_stats_async(2)
if isinstance(iter_result, IterationResult):
for stat in iter_result.get_latest(iter_perf_latest_stats_size):
stats.append(stat)
else:
async for stat in self.llm.get_stats_async(2):
stats.append(stat)
return JSONResponse(content=stats)

async def get_kv_cache_events(self) -> JSONResponse:
Expand Down
45 changes: 45 additions & 0 deletions tests/unittest/llmapi/apps/_test_openai_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,48 @@ def test_metrics(client):
assert "pinnedMemUsage" in response_dict
assert "staticBatchingStats" in response_dict
assert "timestamp" in response_dict


def test_metrics_size():
build_config = BuildConfig()
build_config.max_batch_size = 8
build_config.max_seq_len = 512
iter_perf_latest_stats_size = 3
hf_tokenizer = AutoTokenizer.from_pretrained(llama_model_path)
llm = PyTorchLLM(
model=llama_model_path,
tokenizer=hf_tokenizer,
build_config=build_config,
kv_cache_config=KvCacheConfig(),
backend="pytorch",
pytorch_backend_config=PyTorchConfig(
enable_overlap_scheduler=True,
enable_iter_perf_stats=True,
iter_perf_latest_stats_size=iter_perf_latest_stats_size,
))

app_instance = OpenAIServer(llm, model=llama_model_path)
client = TestClient(app_instance.app)

response = client.get("/metrics")
assert response.status_code == 200
assert len(response.json()) == 0

response = client.post("/v1/completions",
json={
"prompt": "A B C",
"model": llama_model_path,
"max_tokens": 100
})
assert response.status_code == 200

response = client.get("/metrics")
assert response.status_code == 200
assert len(response.json()) == iter_perf_latest_stats_size
response_dict = response.json()[0]
assert "cpuMemUsage" in response_dict
assert "gpuMemUsage" in response_dict

response = client.get("/metrics")
assert response.status_code == 200
assert len(response.json()) == 0