From 8fd6bbfc0a3364f6bcdb2f48565c7c9507453e6a Mon Sep 17 00:00:00 2001 From: lujianghu Date: Fri, 31 Jan 2025 11:13:53 +0800 Subject: [PATCH] fix is_local bugs --- README.md | 2 +- requirements/mlx.txt | 3 +-- setup.py | 2 +- tllm/entrypoints/api_server.py | 7 ++++--- tllm/entrypoints/utils.py | 4 ++-- tllm/grpc/master_service/master_server.py | 3 ++- tllm/grpc/worker_service/worker_server.py | 4 ++-- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2e1a99f..678d0fe 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ 1. install dependencies -- for mlx (macos arm): `pip install -e ".[mlx]" && pip install -r requirements/mlx.txt` +- for mlx (macos arm): `pip install -U -e ".[mlx]" && pip install -e git+https://github.com/wnma3mz/mlx_clip.git#egg=mlx_clip` - for nvidia: `pip install -e ".[torch]"` 2. run server diff --git a/requirements/mlx.txt b/requirements/mlx.txt index 1aa9fee..36f2aa3 100644 --- a/requirements/mlx.txt +++ b/requirements/mlx.txt @@ -1,4 +1,3 @@ mlx==0.22.0 mlx-lm==0.21.1 -mlx-vlm==0.1.12 --e git+https://github.com/wnma3mz/mlx_clip.git#egg=mlx_clip \ No newline at end of file +mlx-vlm==0.1.12 \ No newline at end of file diff --git a/setup.py b/setup.py index cdc66bb..0bd1349 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ # 平台特定依赖 with open(root_dir / "requirements" / "mlx.txt") as fid: - mlx_requires = [l.strip() for l in fid.readlines() if not l.startswith("-e")] + mlx_requires = [l.strip() for l in fid.readlines()] with open(root_dir / "requirements" / "torch.txt") as fid: torch_requires = [l.strip() for l in fid.readlines()] diff --git a/tllm/entrypoints/api_server.py b/tllm/entrypoints/api_server.py index 2ff0a71..bfe5ff8 100644 --- a/tllm/entrypoints/api_server.py +++ b/tllm/entrypoints/api_server.py @@ -151,7 +151,7 @@ async def update_model_url(): for clients in host_list ] worker_rpc_manager.update_url(host_list) - master_url = args.hostname if is_local(args.hostname) else f"{args.hostname}:{args.grpc_port}" + master_url = args.hostname if args.is_local else f"{args.hostname}:{args.grpc_port}" await worker_rpc_manager.send_config(master_url, host_list) # 后台持续进行健康检查,如果有节点挂掉,需要重新分配 await worker_rpc_manager.start_health_check() @@ -185,7 +185,7 @@ async def init_model_func( async def init_app(engine: AsyncEngine, args): global app, openai_serving_chat, image_serving - logger.info("args: %s", args) + logger.info("Master Args: %s", args) if args.is_image: image_serving = ImageServing(engine, args) else: @@ -225,7 +225,8 @@ async def run_server(args) -> None: uvicorn_kwargs = {"host": ["::", "0.0.0.0"], "port": args.http_port, "timeout_graceful_shutdown": 5} - if is_local(args.hostname): + args.is_local = is_local(args.hostname) + if args.is_local: if os.path.isfile(MASTER_SOCKET_PATH): os.remove(MASTER_SOCKET_PATH) if os.path.isfile(CLIENT_SOCKET_PATH): diff --git a/tllm/entrypoints/utils.py b/tllm/entrypoints/utils.py index faea790..220a32e 100644 --- a/tllm/entrypoints/utils.py +++ b/tllm/entrypoints/utils.py @@ -137,7 +137,7 @@ def start(self): return self.process = Process(target=self.run_server) self.process.start() - self.logger.info(f"Started gRPC process (PID: {self.process.pid})") + # self.logger.info(f"Worker gRPC process (PID: {self.process.pid})") def shutdown(self): """关闭 gRPC 服务器进程""" @@ -148,7 +148,7 @@ def shutdown(self): self.process.join(timeout=5) if self.process.is_alive(): self.process.kill() - self.logger.info("gRPC process stopped") + self.logger.info("Worker gRPC process stopped") async def serve_http(app: FastAPI, grpc_process, engine, master_server, **uvicorn_kwargs: Dict): diff --git a/tllm/grpc/master_service/master_server.py b/tllm/grpc/master_service/master_server.py index d55ea0c..47ea7e0 100644 --- a/tllm/grpc/master_service/master_server.py +++ b/tllm/grpc/master_service/master_server.py @@ -30,12 +30,13 @@ async def stop(self): await self.server.stop(grace=5) await self.server.wait_for_termination() except (Exception, asyncio.CancelledError) as e: - print("master handler error", str(e)) + self.logger.info("master handler error", str(e)) async def Forward( self, request: schemas_pb2.ForwardRequest, context: grpc.ServicerContext ) -> schemas_pb2.ForwardResponse: """处理从最后一个节点返回的结果""" + self.logger.info("master handler request") request_id = "-".join(x for x in list(request.uuid_list)) try: diff --git a/tllm/grpc/worker_service/worker_server.py b/tllm/grpc/worker_service/worker_server.py index 11b34c4..cc05422 100644 --- a/tllm/grpc/worker_service/worker_server.py +++ b/tllm/grpc/worker_service/worker_server.py @@ -41,7 +41,7 @@ async def start(self, ip_addr_list: List[str], port: int = 50051): schemas_pb2_grpc.add_RPCServiceServicer_to_server(self, self.server) self.server.add_insecure_port(f"[::]:{port}") self.server.add_insecure_port(f"unix://{CLIENT_SOCKET_PATH}") - self.logger.info(f"Starting gRPC server on [::]:{port}") + self.logger.info(f"Starting Worker gRPC server on [::]:{port}") await self.server.start() self.http_client.is_running = True @@ -174,7 +174,7 @@ async def run(args): client_id = f"test-{str(uuid.uuid4())[:8]}-{comm.rank}" rpc_servicer = WorkerServer(comm, logger, args.master_addr, client_id) - logger.info("args: %s", args) + logger.info("Worker Args: %s", args) try: await rpc_servicer.start(ip_addr_list, args.grpc_port) except Exception as e: