diff --git a/tensorrt_llm/_torch/pyexecutor/config.py b/tensorrt_llm/_torch/pyexecutor/config.py index 533b21b050..d44bdf129b 100644 --- a/tensorrt_llm/_torch/pyexecutor/config.py +++ b/tensorrt_llm/_torch/pyexecutor/config.py @@ -62,6 +62,7 @@ class PyTorchConfig: kv_cache_dtype: str = "auto" use_kv_cache: bool = True enable_iter_perf_stats: bool = False + iter_perf_latest_stats_size: Optional[int] = None # If true, enables per request stats per iteration # Must also set enable_iter_perf_stats to true to get request stats enable_iter_req_stats: bool = False diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor.py b/tensorrt_llm/_torch/pyexecutor/py_executor.py index 415e92445b..95c2dfe897 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor.py @@ -9,7 +9,7 @@ import time import traceback import weakref -from collections import namedtuple +from collections import deque, namedtuple from contextlib import contextmanager from itertools import chain from typing import Dict, List, Optional, Tuple, Union @@ -202,6 +202,7 @@ def __init__(self, self.max_draft_tokens = max_draft_tokens self.print_log = model_engine.pytorch_backend_config.print_iter_log self.enable_iter_perf_stats = model_engine.pytorch_backend_config.enable_iter_perf_stats + self.iter_perf_latest_stats_size = model_engine.pytorch_backend_config.iter_perf_latest_stats_size self.enable_iter_req_stats = model_engine.pytorch_backend_config.enable_iter_req_stats self.num_fetch_requests_cur_rank = 0 self.num_fetch_requests = 0 @@ -249,7 +250,7 @@ def __init__(self, self.is_shutdown = False self.stats_lock = threading.Lock() - self.stats = [] + self.stats = deque(maxlen=self.iter_perf_latest_stats_size) self.start_times = {} self.new_active_requests_queue_latency_ms = 0 self.gather_all_responses = False @@ -391,8 +392,8 @@ def get_latest_iteration_stats(self): latest_stats = (IterationStats(), None) try: self.stats_lock.acquire() - latest_stats = self.stats - self.stats = [] + latest_stats = list(self.stats) + self.stats.clear() finally: self.stats_lock.release() diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index 0f2e1581ca..ff39fc6a76 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -590,6 +590,20 @@ async def __anext__(self): self._done = True raise StopAsyncIteration + def get_latest(self, exp_size: Optional[int] = None) -> List[dict]: + results = [] + cur_size = len(self.queue) + if cur_size == 0: + return results + if exp_size is None: + exp_size = cur_size + while cur_size > 0: + data = self.queue.get() + if cur_size <= exp_size: + results.append(json.loads(data)) + cur_size -= 1 + return results + def compute_logprobs( k_prompt_logprobs: int, diff --git a/tensorrt_llm/llmapi/utils.py b/tensorrt_llm/llmapi/utils.py index 5872174ab9..2d1c82c8cd 100644 --- a/tensorrt_llm/llmapi/utils.py +++ b/tensorrt_llm/llmapi/utils.py @@ -365,6 +365,9 @@ async def get(self, timeout=None): self._event.clear() return res + def __len__(self): + return len(self._q) + class _SyncQueue: """ @@ -435,6 +438,9 @@ def get(self, timeout=None): except asyncio.QueueEmpty: time.sleep(0.01) + def __len__(self): + return len(self._aq) + def set_sched_setaffinity(required_cores: int): ''' Set the CPU affinity of the current process to the required number of diff --git a/tensorrt_llm/serve/openai_server.py b/tensorrt_llm/serve/openai_server.py index dcc71e43bf..de661c051a 100644 --- a/tensorrt_llm/serve/openai_server.py +++ b/tensorrt_llm/serve/openai_server.py @@ -17,6 +17,7 @@ # yapf: disable from tensorrt_llm.executor import CppExecutorError from tensorrt_llm.executor.postproc_worker import PostprocParams +from tensorrt_llm.executor.result import IterationResult from tensorrt_llm.inputs import prompt_inputs from tensorrt_llm.inputs.utils import ConversationMessage, apply_chat_template from tensorrt_llm.llmapi import LLM @@ -196,8 +197,18 @@ async def get_model(self) -> JSONResponse: async def get_iteration_stats(self) -> JSONResponse: stats = [] - async for stat in self.llm.get_stats_async(2): - stats.append(stat) + enable_iter_perf_stats = self.llm.pytorch_backend_config.enable_iter_perf_stats + if not enable_iter_perf_stats: + return stats + iter_perf_latest_stats_size = self.llm.pytorch_backend_config.iter_perf_latest_stats_size + if iter_perf_latest_stats_size is not None: + iter_result = self.llm.get_stats_async(2) + if isinstance(iter_result, IterationResult): + for stat in iter_result.get_latest(iter_perf_latest_stats_size): + stats.append(stat) + else: + async for stat in self.llm.get_stats_async(2): + stats.append(stat) return JSONResponse(content=stats) async def get_kv_cache_events(self) -> JSONResponse: diff --git a/tests/unittest/llmapi/apps/_test_openai_metrics.py b/tests/unittest/llmapi/apps/_test_openai_metrics.py index 1b075b6756..ba40ef51fd 100755 --- a/tests/unittest/llmapi/apps/_test_openai_metrics.py +++ b/tests/unittest/llmapi/apps/_test_openai_metrics.py @@ -89,3 +89,48 @@ def test_metrics(client): assert "pinnedMemUsage" in response_dict assert "staticBatchingStats" in response_dict assert "timestamp" in response_dict + + +def test_metrics_size(): + build_config = BuildConfig() + build_config.max_batch_size = 8 + build_config.max_seq_len = 512 + iter_perf_latest_stats_size = 3 + hf_tokenizer = AutoTokenizer.from_pretrained(llama_model_path) + llm = PyTorchLLM( + model=llama_model_path, + tokenizer=hf_tokenizer, + build_config=build_config, + kv_cache_config=KvCacheConfig(), + backend="pytorch", + pytorch_backend_config=PyTorchConfig( + enable_overlap_scheduler=True, + enable_iter_perf_stats=True, + iter_perf_latest_stats_size=iter_perf_latest_stats_size, + )) + + app_instance = OpenAIServer(llm, model=llama_model_path) + client = TestClient(app_instance.app) + + response = client.get("/metrics") + assert response.status_code == 200 + assert len(response.json()) == 0 + + response = client.post("/v1/completions", + json={ + "prompt": "A B C", + "model": llama_model_path, + "max_tokens": 100 + }) + assert response.status_code == 200 + + response = client.get("/metrics") + assert response.status_code == 200 + assert len(response.json()) == iter_perf_latest_stats_size + response_dict = response.json()[0] + assert "cpuMemUsage" in response_dict + assert "gpuMemUsage" in response_dict + + response = client.get("/metrics") + assert response.status_code == 200 + assert len(response.json()) == 0