diff --git a/benchmarks/run_async_requests.py b/benchmarks/run_async_requests.py index f6c908c..6ccb10d 100644 --- a/benchmarks/run_async_requests.py +++ b/benchmarks/run_async_requests.py @@ -26,10 +26,10 @@ async def requests_func(messages: List[Dict[str, Any]]): async def main(messages_list: List[List[Dict[str, Any]]]): - print("异步并发请求结果") - s1 = time.time() - await asyncio.gather(*[requests_func(messages) for messages in messages_list]) - print(f"time cost: {time.time() - s1:.4f} s") + # print("异步并发请求结果") + # s1 = time.time() + # await asyncio.gather(*[requests_func(messages) for messages in messages_list]) + # print(f"time cost: {time.time() - s1:.4f} s") print("单独请求结果") s1 = time.time() @@ -40,7 +40,7 @@ async def main(messages_list: List[List[Dict[str, Any]]]): def load_message(): - with open("asserts/messages.json", "r") as f: + with open("asserts/debug_messages.json", "r") as f: messages_dict = json.load(f) return messages_dict diff --git a/tllm/engine.py b/tllm/engine.py index f24196e..2e6ad8e 100644 --- a/tllm/engine.py +++ b/tllm/engine.py @@ -13,11 +13,11 @@ def __init__(self, generator: Union[LLMGenerator, ImageGenerator], sleep_time: f self.generator = generator self.prefill_queue: asyncio.Queue = asyncio.Queue() self.decoding_queue: asyncio.Queue = asyncio.Queue() + self.abort_queue: asyncio.Queue = asyncio.Queue() self.processing_task = None self.limit_size: int = limit_size # 每次最多处理 5 个请求,prefill + decode self.sleep_time: float = sleep_time self.logger = SingletonLogger.setup_master_logger() - self.abort_queue: asyncio.Queue = asyncio.Queue() self.queue_not_empty: asyncio.Event = asyncio.Event() self._loop = None @@ -70,7 +70,7 @@ async def _generate(self): try: await self.queue_not_empty.wait() except Exception as e: - self.logger.debug("exception: " + str(e)) + self.logger.debug(f"exception: {str(e)}") await asyncio.sleep(0.01) continue try: @@ -79,18 +79,12 @@ async def _generate(self): for request_data in request_data_list: if not request_data.is_stop: self.decoding_queue.put_nowait(request_data) - async with request_data.condition: - request_data.condition.notify() - # await asyncio.sleep(self.sleep_time) except asyncio.CancelledError: # LLM Generate Error or Server Cancel Engine - self.logger.error("CancelledError") for request_data in request_data_list: - request_data.is_stop = True - request_data.finish_reason_list = ["Server Error"] - async with request_data.condition: - request_data.condition.notify() + request_data.is_normal_process = False + self.logger.error("CancelledError") traceback.print_exc() except Exception as e: self.logger.error(f"Error input_ids: {'\n'.join(x.input_ids for x in request_data_list)}") @@ -100,9 +94,12 @@ async def _generate(self): self.logger.error(f"BaseException Error processing prefill_queue data: {str(e)}") traceback.print_exc() finally: + for request_data in request_data_list: + async with request_data.condition: + request_data.condition.notify() if self.prefill_queue.empty(): self.queue_not_empty.clear() - await asyncio.sleep(0) + await asyncio.sleep(self.sleep_time) async def generate_stream(self, request_data: SequenceRequestData): self.prefill_queue.put_nowait(request_data) @@ -114,39 +111,15 @@ async def generate_stream(self, request_data: SequenceRequestData): await asyncio.wait_for(request_data.condition.wait(), request_data.timeout) # 流式返回数据的内容 yield request_data.to_request_output() - try: - if hasattr(request_data, "ttft_cost_time"): - self.logger.info( - f"[request_id] {request_data.request_id}] ttft: {request_data.ttft_cost_time:.4f} s" - ) - self.logger.info( - f"[request_id] {request_data.request_id}] tpot: {(len(request_data.output_ids) - 1) / (time.perf_counter() - request_data.decode_start_ts):.4f} token/s" - ) - except: - pass - yield request_data.to_request_output() # Need it? - except asyncio.CancelledError: - self.logger.debug(f"CancelledError: {request_data.request_id}") - raise asyncio.CancelledError("CancelledError") - except asyncio.TimeoutError: - self.logger.debug(f"Processing timed out: {request_data.request_id}") - raise asyncio.CancelledError("TimeoutError") - except Exception as e: - traceback.print_exc() - raise asyncio.CancelledError("UnknownException") - except BaseException as e: - traceback.print_exc() - raise asyncio.CancelledError("UnknownBaseException") - - async def generate(self, request_data: SequenceRequestData): - self.prefill_queue.put_nowait(request_data) - self.queue_not_empty.set() - - try: - async with request_data.condition: - while not request_data.is_stop: - await asyncio.wait_for(request_data.condition.wait(), request_data.timeout) - return request_data.to_request_output() # 返回最终的数据对象 + if getattr(request_data, "ttft_cost_time", None) is not None: + self.logger.info( + f"[request_id] {request_data.request_id}] ttft: {request_data.ttft_cost_time:.4f} s" + ) + self.logger.info( + f"[request_id] {request_data.request_id}] tpot: {(len(request_data.output_ids) - 1) / (time.perf_counter() - request_data.decode_start_ts):.4f} token/s" + ) + # Need it? + yield request_data.to_request_output() except asyncio.CancelledError: self.logger.debug(f"CancelledError: {request_data.request_id}") raise asyncio.CancelledError("CancelledError") diff --git a/tllm/entrypoints/api_server.py b/tllm/entrypoints/api_server.py index bfe5ff8..0cea6d3 100644 --- a/tllm/entrypoints/api_server.py +++ b/tllm/entrypoints/api_server.py @@ -57,6 +57,7 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re raise ValueError("OpenAIServing instance is not initialized") if raw_request.headers.get("authorization") == "Bearer anythingllm": request.max_tokens = openai_serving_chat.max_model_len + logger.debug(f"request: {request.model_dump()}") try: generator = await openai_serving_chat.create_chat_completion(request, raw_request) if request.stream: @@ -64,23 +65,11 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re else: assert isinstance(generator, ChatCompletionResponse) return JSONResponse(content=generator.model_dump()) + except HTTPException as e: + return JSONResponse(content={"error": str(e)}, status_code=e.status_code) except Exception as e: - logger.error(f"Error processing request: {str(e)}") - return JSONResponse(content={"error": str(e)}, status_code=499) - - -@app.post("/v1/completions") -async def create_completion(request: ChatCompletionRequest, raw_request: Request) -> ChatCompletionResponse: - if not ws_manager.has_full_model: - raise ValueError("No available Full Node to process the request") - if openai_serving_chat is None: - raise ValueError("OpenAIServing instance is not initialized") - generator = await openai_serving_chat.create_chat_completion(request, raw_request) - if request.stream: - return StreamingResponse(content=generator, media_type="text/event-stream") - else: - assert isinstance(generator, ChatCompletionResponse) - return JSONResponse(content=generator.model_dump()) + logger.error(f"Unknown Exception: {str(e)}") + return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/v1/create_image") diff --git a/tllm/entrypoints/server_chat.py b/tllm/entrypoints/server_chat.py index 6bd2be2..a8205a6 100644 --- a/tllm/entrypoints/server_chat.py +++ b/tllm/entrypoints/server_chat.py @@ -28,6 +28,10 @@ def create_error_response(message: str) -> ChatCompletionResponse: raise HTTPException(status_code=499, detail=message) +def create_server_error_response(message: str) -> ChatCompletionResponse: + raise HTTPException(status_code=500, detail=message) + + class OpenAIServing: def __init__(self, engine: AsyncEngine, args): self.engine = engine @@ -86,6 +90,9 @@ async def chat_completion_stream_generator( await self.engine.abort(request_id) create_error_response("Client disconnected") res: RequestOutput + if res is None: + create_server_error_response("Server Engine Error") + output = res.outputs[0] i = output.index @@ -112,7 +119,7 @@ async def chat_completion_stream_generator( yield "data: [DONE]\n\n" except asyncio.CancelledError: await self.engine.abort(request_id) - create_error_response("Client disconnected") + create_server_error_response("Server Engine Error") async def chat_completion_full_generator( self, @@ -130,10 +137,12 @@ async def chat_completion_full_generator( await self.engine.abort(request_id) create_error_response("Client disconnected") await asyncio.sleep(0.01) + if res is None: + create_server_error_response("Server Engine Error") final_res: RequestOutput = res except asyncio.CancelledError: await self.engine.abort(request_id) - create_error_response("Client disconnected") + create_server_error_response("Server Engine Error") output = final_res.outputs[0] message = ChatMessage(role=self.response_role, content=output.text) diff --git a/tllm/schemas.py b/tllm/schemas.py index bded29a..8b68477 100644 --- a/tllm/schemas.py +++ b/tllm/schemas.py @@ -188,6 +188,7 @@ class SequenceRequestData: is_stop: bool = False is_prefill: bool = True is_gen_image: bool = False # 是否为生图模式 + is_normal_process: bool = True # 是否处于正常处理状态 condition: asyncio.Condition = field(default_factory=asyncio.Condition) @@ -202,6 +203,9 @@ def __repr__(self) -> str: return f"request_id={self.request_id}; output_ids={self.output_ids}" def to_request_output(self) -> RequestOutput: + if not self.is_normal_process: + self.is_stop = True + return None if not self.is_stop: return RequestOutput( self.request_id,