Skip to content

Commit

Permalink
handler serve engine error and avoid a error request block other request
Browse files Browse the repository at this point in the history
  • Loading branch information
wnma3mz committed Feb 2, 2025
1 parent 9d127c6 commit cf7fbfb
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 67 deletions.
10 changes: 5 additions & 5 deletions benchmarks/run_async_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ async def requests_func(messages: List[Dict[str, Any]]):


async def main(messages_list: List[List[Dict[str, Any]]]):
print("异步并发请求结果")
s1 = time.time()
await asyncio.gather(*[requests_func(messages) for messages in messages_list])
print(f"time cost: {time.time() - s1:.4f} s")
# print("异步并发请求结果")
# s1 = time.time()
# await asyncio.gather(*[requests_func(messages) for messages in messages_list])
# print(f"time cost: {time.time() - s1:.4f} s")

print("单独请求结果")
s1 = time.time()
Expand All @@ -40,7 +40,7 @@ async def main(messages_list: List[List[Dict[str, Any]]]):


def load_message():
with open("asserts/messages.json", "r") as f:
with open("asserts/debug_messages.json", "r") as f:
messages_dict = json.load(f)
return messages_dict

Expand Down
61 changes: 17 additions & 44 deletions tllm/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def __init__(self, generator: Union[LLMGenerator, ImageGenerator], sleep_time: f
self.generator = generator
self.prefill_queue: asyncio.Queue = asyncio.Queue()
self.decoding_queue: asyncio.Queue = asyncio.Queue()
self.abort_queue: asyncio.Queue = asyncio.Queue()
self.processing_task = None
self.limit_size: int = limit_size # 每次最多处理 5 个请求,prefill + decode
self.sleep_time: float = sleep_time
self.logger = SingletonLogger.setup_master_logger()
self.abort_queue: asyncio.Queue = asyncio.Queue()
self.queue_not_empty: asyncio.Event = asyncio.Event()
self._loop = None

Expand Down Expand Up @@ -70,7 +70,7 @@ async def _generate(self):
try:
await self.queue_not_empty.wait()
except Exception as e:
self.logger.debug("exception: " + str(e))
self.logger.debug(f"exception: {str(e)}")
await asyncio.sleep(0.01)
continue
try:
Expand All @@ -79,18 +79,12 @@ async def _generate(self):
for request_data in request_data_list:
if not request_data.is_stop:
self.decoding_queue.put_nowait(request_data)
async with request_data.condition:
request_data.condition.notify()

# await asyncio.sleep(self.sleep_time)
except asyncio.CancelledError:
# LLM Generate Error or Server Cancel Engine
self.logger.error("CancelledError")
for request_data in request_data_list:
request_data.is_stop = True
request_data.finish_reason_list = ["Server Error"]
async with request_data.condition:
request_data.condition.notify()
request_data.is_normal_process = False
self.logger.error("CancelledError")
traceback.print_exc()
except Exception as e:
self.logger.error(f"Error input_ids: {'\n'.join(x.input_ids for x in request_data_list)}")
Expand All @@ -100,9 +94,12 @@ async def _generate(self):
self.logger.error(f"BaseException Error processing prefill_queue data: {str(e)}")
traceback.print_exc()
finally:
for request_data in request_data_list:
async with request_data.condition:
request_data.condition.notify()
if self.prefill_queue.empty():
self.queue_not_empty.clear()
await asyncio.sleep(0)
await asyncio.sleep(self.sleep_time)

async def generate_stream(self, request_data: SequenceRequestData):
self.prefill_queue.put_nowait(request_data)
Expand All @@ -114,39 +111,15 @@ async def generate_stream(self, request_data: SequenceRequestData):
await asyncio.wait_for(request_data.condition.wait(), request_data.timeout)
# 流式返回数据的内容
yield request_data.to_request_output()
try:
if hasattr(request_data, "ttft_cost_time"):
self.logger.info(
f"[request_id] {request_data.request_id}] ttft: {request_data.ttft_cost_time:.4f} s"
)
self.logger.info(
f"[request_id] {request_data.request_id}] tpot: {(len(request_data.output_ids) - 1) / (time.perf_counter() - request_data.decode_start_ts):.4f} token/s"
)
except:
pass
yield request_data.to_request_output() # Need it?
except asyncio.CancelledError:
self.logger.debug(f"CancelledError: {request_data.request_id}")
raise asyncio.CancelledError("CancelledError")
except asyncio.TimeoutError:
self.logger.debug(f"Processing timed out: {request_data.request_id}")
raise asyncio.CancelledError("TimeoutError")
except Exception as e:
traceback.print_exc()
raise asyncio.CancelledError("UnknownException")
except BaseException as e:
traceback.print_exc()
raise asyncio.CancelledError("UnknownBaseException")

async def generate(self, request_data: SequenceRequestData):
self.prefill_queue.put_nowait(request_data)
self.queue_not_empty.set()

try:
async with request_data.condition:
while not request_data.is_stop:
await asyncio.wait_for(request_data.condition.wait(), request_data.timeout)
return request_data.to_request_output() # 返回最终的数据对象
if getattr(request_data, "ttft_cost_time", None) is not None:
self.logger.info(
f"[request_id] {request_data.request_id}] ttft: {request_data.ttft_cost_time:.4f} s"
)
self.logger.info(
f"[request_id] {request_data.request_id}] tpot: {(len(request_data.output_ids) - 1) / (time.perf_counter() - request_data.decode_start_ts):.4f} token/s"
)
# Need it?
yield request_data.to_request_output()
except asyncio.CancelledError:
self.logger.debug(f"CancelledError: {request_data.request_id}")
raise asyncio.CancelledError("CancelledError")
Expand Down
21 changes: 5 additions & 16 deletions tllm/entrypoints/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,19 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re
raise ValueError("OpenAIServing instance is not initialized")
if raw_request.headers.get("authorization") == "Bearer anythingllm":
request.max_tokens = openai_serving_chat.max_model_len
logger.debug(f"request: {request.model_dump()}")
try:
generator = await openai_serving_chat.create_chat_completion(request, raw_request)
if request.stream:
return StreamingResponse(content=generator, media_type="text/event-stream")
else:
assert isinstance(generator, ChatCompletionResponse)
return JSONResponse(content=generator.model_dump())
except HTTPException as e:
return JSONResponse(content={"error": str(e)}, status_code=e.status_code)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return JSONResponse(content={"error": str(e)}, status_code=499)


@app.post("/v1/completions")
async def create_completion(request: ChatCompletionRequest, raw_request: Request) -> ChatCompletionResponse:
if not ws_manager.has_full_model:
raise ValueError("No available Full Node to process the request")
if openai_serving_chat is None:
raise ValueError("OpenAIServing instance is not initialized")
generator = await openai_serving_chat.create_chat_completion(request, raw_request)
if request.stream:
return StreamingResponse(content=generator, media_type="text/event-stream")
else:
assert isinstance(generator, ChatCompletionResponse)
return JSONResponse(content=generator.model_dump())
logger.error(f"Unknown Exception: {str(e)}")
return JSONResponse(content={"error": str(e)}, status_code=500)


@app.post("/v1/create_image")
Expand Down
13 changes: 11 additions & 2 deletions tllm/entrypoints/server_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def create_error_response(message: str) -> ChatCompletionResponse:
raise HTTPException(status_code=499, detail=message)


def create_server_error_response(message: str) -> ChatCompletionResponse:
raise HTTPException(status_code=500, detail=message)


class OpenAIServing:
def __init__(self, engine: AsyncEngine, args):
self.engine = engine
Expand Down Expand Up @@ -86,6 +90,9 @@ async def chat_completion_stream_generator(
await self.engine.abort(request_id)
create_error_response("Client disconnected")
res: RequestOutput
if res is None:
create_server_error_response("Server Engine Error")

output = res.outputs[0]
i = output.index

Expand All @@ -112,7 +119,7 @@ async def chat_completion_stream_generator(
yield "data: [DONE]\n\n"
except asyncio.CancelledError:
await self.engine.abort(request_id)
create_error_response("Client disconnected")
create_server_error_response("Server Engine Error")

async def chat_completion_full_generator(
self,
Expand All @@ -130,10 +137,12 @@ async def chat_completion_full_generator(
await self.engine.abort(request_id)
create_error_response("Client disconnected")
await asyncio.sleep(0.01)
if res is None:
create_server_error_response("Server Engine Error")
final_res: RequestOutput = res
except asyncio.CancelledError:
await self.engine.abort(request_id)
create_error_response("Client disconnected")
create_server_error_response("Server Engine Error")

output = final_res.outputs[0]
message = ChatMessage(role=self.response_role, content=output.text)
Expand Down
4 changes: 4 additions & 0 deletions tllm/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class SequenceRequestData:
is_stop: bool = False
is_prefill: bool = True
is_gen_image: bool = False # 是否为生图模式
is_normal_process: bool = True # 是否处于正常处理状态

condition: asyncio.Condition = field(default_factory=asyncio.Condition)

Expand All @@ -202,6 +203,9 @@ def __repr__(self) -> str:
return f"request_id={self.request_id}; output_ids={self.output_ids}"

def to_request_output(self) -> RequestOutput:
if not self.is_normal_process:
self.is_stop = True
return None
if not self.is_stop:
return RequestOutput(
self.request_id,
Expand Down

0 comments on commit cf7fbfb

Please sign in to comment.