From cc08c2906d12142f7adcdcb7aefe727c05ff0a26 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 17 Feb 2025 23:40:22 +0900 Subject: [PATCH] Map NOOP volume before call request through storage session mgr --- src/ai/backend/manager/api/session.py | 5 +- src/ai/backend/manager/api/vfolder.py | 69 ++++++++++++++----- src/ai/backend/manager/models/endpoint.py | 2 +- .../manager/models/gql_models/vfolder.py | 5 +- src/ai/backend/manager/models/group.py | 4 +- src/ai/backend/manager/models/storage.py | 23 +++++-- src/ai/backend/manager/models/user.py | 4 +- src/ai/backend/manager/models/vfolder.py | 32 ++++++--- src/ai/backend/manager/registry.py | 2 +- 9 files changed, 105 insertions(+), 41 deletions(-) diff --git a/src/ai/backend/manager/api/session.py b/src/ai/backend/manager/api/session.py index 8cfbb071671..e1bb5cc2400 100644 --- a/src/ai/backend/manager/api/session.py +++ b/src/ai/backend/manager/api/session.py @@ -107,6 +107,7 @@ SESSION_PRIORITY_MAX, SESSION_PRIORITY_MIN, ) +from ..models.vfolder import is_unmanaged from ..types import UserScope from ..utils import query_userinfo as _query_userinfo from .auth import auth_required @@ -2368,7 +2369,9 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse ) log_vfolder = matched_vfolders[0] - proxy_name, volume_name = root_ctx.storage_manager.split_host(log_vfolder["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + log_vfolder["host"], is_unmanaged(log_vfolder["unmanaged_path"]) + ) response = web.StreamResponse(status=200) response.headers[hdrs.CONTENT_TYPE] = "text/plain" prepared = False diff --git a/src/ai/backend/manager/api/vfolder.py b/src/ai/backend/manager/api/vfolder.py index 85e51624546..7e44c883912 100644 --- a/src/ai/backend/manager/api/vfolder.py +++ b/src/ai/backend/manager/api/vfolder.py @@ -109,6 +109,7 @@ from ..models.vfolder import ( VFolderPermissionRow, delete_vfolder_relation_rows, + is_unmanaged, ) from ..models.vfolder import VFolderRow as VFolderDBRow from .auth import admin_required, auth_required, superadmin_required @@ -613,7 +614,9 @@ async def create(request: web.Request, params: CreateRequestModel) -> web.Respon if max_quota_scope_size and max_quota_scope_size > 0: options["initial_max_size_for_quota_scope"] = max_quota_scope_size body_data: dict[str, Any] = { - "volume": root_ctx.storage_manager.split_host(folder_host)[1], + "volume": root_ctx.storage_manager.get_proxy_and_volume( + folder_host, is_unmanaged(unmanaged_path) + )[1], "vfid": str(vfid), "options": options, } @@ -918,7 +921,7 @@ async def get_volume_perf_metric(request: web.Request, params: Any) -> web.Respo request["user"]["email"], request["keypair"]["access_key"], ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(params["folder_host"]) async with root_ctx.storage_manager.request( proxy_name, "GET", @@ -964,7 +967,9 @@ async def get_info(request: web.Request, row: VFolderRow) -> web.Response: else: is_owner = row["is_owner"] permission = row["permission"] - proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + row["host"], is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "GET", @@ -1014,7 +1019,9 @@ async def get_quota(request: web.Request, params: Any) -> web.Response: )[0] await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE) root_ctx: RootContext = request.app["_root.context"] - proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"]) + ) log.info( "VFOLDER.GET_QUOTA (email:{}, volume_name:{}, vf:{})", request["user"]["email"], @@ -1072,7 +1079,9 @@ async def update_quota(request: web.Request, params: Any) -> web.Response: await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE) root_ctx: RootContext = request.app["_root.context"] folder_host = params["folder_host"] - proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + folder_host, is_unmanaged(vfolder_row["unmanaged_path"]) + ) quota = int(params["input"]["size_bytes"]) log.info( "VFOLDER.UPDATE_QUOTA (email:{}, volume_name:{}, quota:{}, vf:{})", @@ -1158,7 +1167,9 @@ async def get_usage(request: web.Request, params: Any) -> web.Response: )[0] await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE) root_ctx: RootContext = request.app["_root.context"] - proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"]) + ) log.info( "VFOLDER.GET_USAGE (email:{}, volume_name:{}, vf:{})", request["user"]["email"], @@ -1192,7 +1203,9 @@ async def get_used_bytes(request: web.Request, params: Any) -> web.Response: )[0] await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE) root_ctx: RootContext = request.app["_root.context"] - proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"]) + ) log.info("VFOLDER.GET_USED_BYTES (volume_name:{}, vf:{})", volume_name, params["id"]) async with root_ctx.storage_manager.request( proxy_name, @@ -1353,7 +1366,9 @@ async def mkdir(request: web.Request, params: Any, row: VFolderRow) -> web.Respo request.match_info["name"], params["path"], ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + row["host"], is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1413,7 +1428,9 @@ async def create_download_session( domain_name=domain_name, permission=VFolderHostPermission.DOWNLOAD_FILE, ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + folder_host, is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1469,7 +1486,9 @@ async def create_upload_session(request: web.Request, params: Any, row: VFolderR domain_name=domain_name, permission=VFolderHostPermission.UPLOAD_FILE, ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + folder_host, is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1526,7 +1545,9 @@ async def rename_file(request: web.Request, params: Any, row: VFolderRow) -> web params["target_path"], params["new_name"], ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + folder_host, is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1563,7 +1584,9 @@ async def move_file(request: web.Request, params: Any, row: VFolderRow) -> web.R params["src"], params["dst"], ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + row["host"], is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1601,7 +1624,9 @@ async def delete_files(request: web.Request, params: Any, row: VFolderRow) -> we params["files"], recursive, ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + row["host"], is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -1638,7 +1663,9 @@ async def list_files(request: web.Request, params: Any, row: VFolderRow) -> web. request.match_info["name"], params["path"], ) - proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"]) + proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume( + row["host"], is_unmanaged(row["unmanaged_path"]) + ) async with root_ctx.storage_manager.request( proxy_name, "POST", @@ -2460,7 +2487,7 @@ async def delete_from_trash_bin( # fs-level deletion may fail or take longer time await initiate_vfolder_deletion( root_ctx.db, - [VFolderDeletionInfo(VFolderID.from_row(row), row["host"])], + [VFolderDeletionInfo(VFolderID.from_row(row), row["host"], row["unmanaged_path"])], root_ctx.storage_manager, app_ctx.storage_ptask_group, ) @@ -2496,7 +2523,7 @@ async def force_delete(request: web.Request) -> web.Response: ) await initiate_vfolder_deletion( root_ctx.db, - [VFolderDeletionInfo(VFolderID.from_row(row), row["host"])], + [VFolderDeletionInfo(VFolderID.from_row(row), row["host"], row["unmanaged_path"])], root_ctx.storage_manager, force=True, ) @@ -2706,8 +2733,13 @@ async def clone(request: web.Request, params: Any, row: VFolderRow) -> web.Respo source_folder_id = VFolderID(row["quota_scope_id"], row["id"]) target_folder_host = params["folder_host"] target_quota_scope_id = "..." # TODO: implement - source_proxy_name, source_volume_name = root_ctx.storage_manager.split_host(source_folder_host) - target_proxy_name, target_volume_name = root_ctx.storage_manager.split_host(target_folder_host) + source_unmanaged_path = row["unmanaged_path"] + source_proxy_name, source_volume_name = root_ctx.storage_manager.get_proxy_and_volume( + source_folder_host, is_unmanaged(source_unmanaged_path) + ) + target_proxy_name, target_volume_name = root_ctx.storage_manager.get_proxy_and_volume( + target_folder_host + ) # check if the source vfolder is allowed to be cloned if not row["cloneable"]: @@ -2814,6 +2846,7 @@ async def clone(request: web.Request, params: Any, row: VFolderRow) -> web.Respo VFolderCloneInfo( source_folder_id, source_folder_host, + source_unmanaged_path, domain_name, target_quota_scope_id, params["target_name"], diff --git a/src/ai/backend/manager/models/endpoint.py b/src/ai/backend/manager/models/endpoint.py index 1812605b6b8..836063783eb 100644 --- a/src/ai/backend/manager/models/endpoint.py +++ b/src/ai/backend/manager/models/endpoint.py @@ -814,7 +814,7 @@ async def validate_model_definition( vfid = VFolderID(model_vfolder_row["quota_scope_id"], model_vfolder_row["id"]) folder_host = model_vfolder_row["host"] - proxy_name, volume_name = storage_manager.split_host(folder_host) + proxy_name, volume_name = storage_manager.get_proxy_and_volume(folder_host) if model_definition_path: path = Path(model_definition_path) diff --git a/src/ai/backend/manager/models/gql_models/vfolder.py b/src/ai/backend/manager/models/gql_models/vfolder.py index 63795d7f0d9..8f35dffab37 100644 --- a/src/ai/backend/manager/models/gql_models/vfolder.py +++ b/src/ai/backend/manager/models/gql_models/vfolder.py @@ -56,6 +56,7 @@ VFolderRow, VirtualFolder, get_permission_ctx, + is_unmanaged, ) if TYPE_CHECKING: @@ -566,7 +567,9 @@ async def _fetch_file( quota_scope_id = vfolder_row.quota_scope_id host = vfolder_row.host vfolder_id = VFolderID(quota_scope_id, vfolder_row_id) - proxy_name, volume_name = graph_ctx.storage_manager.split_host(host) + proxy_name, volume_name = graph_ctx.storage_manager.get_proxy_and_volume( + host, is_unmanaged(vfolder_row.unmanaged_path) + ) try: async with graph_ctx.storage_manager.request( proxy_name, diff --git a/src/ai/backend/manager/models/group.py b/src/ai/backend/manager/models/group.py index e1f0263e0bb..16306aa8adb 100644 --- a/src/ai/backend/manager/models/group.py +++ b/src/ai/backend/manager/models/group.py @@ -824,7 +824,9 @@ async def delete_vfolders( result = await db_session.scalars(query) rows = cast(list[VFolderRow], result.fetchall()) for vf in rows: - target_vfs.append(VFolderDeletionInfo(VFolderID.from_row(vf), vf.host)) + target_vfs.append( + VFolderDeletionInfo(VFolderID.from_row(vf), vf.host, vf.unmanaged_path) + ) storage_ptask_group = aiotools.PersistentTaskGroup() try: diff --git a/src/ai/backend/manager/models/storage.py b/src/ai/backend/manager/models/storage.py index ce95f413602..d369240c81b 100644 --- a/src/ai/backend/manager/models/storage.py +++ b/src/ai/backend/manager/models/storage.py @@ -119,17 +119,26 @@ async def aclose(self) -> None: await asyncio.gather(*close_aws, return_exceptions=True) @staticmethod - def split_host(vfolder_host: str) -> Tuple[str, str]: + def _split_host(vfolder_host: str) -> Tuple[str, str]: proxy_name, _, volume_name = vfolder_host.partition(":") return proxy_name, volume_name + @classmethod + def get_proxy_and_volume( + cls, vfolder_host: str, should_be_noop: bool = False + ) -> tuple[str, str]: + proxy_name, volume_name = cls._split_host(vfolder_host) + if should_be_noop: + volume_name = NOOP_STORAGE_VOLUME_NAME + return proxy_name, volume_name + @staticmethod def parse_host(proxy_name: str, volume_name: str) -> str: return f"{proxy_name}:{volume_name}" @classmethod def is_noop_host(cls, vfolder_host: str) -> bool: - return cls.split_host(vfolder_host)[1] == NOOP_STORAGE_VOLUME_NAME + return cls._split_host(vfolder_host)[1] == NOOP_STORAGE_VOLUME_NAME async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]: """ @@ -178,7 +187,7 @@ async def get_mount_path( "GET", "folder/mount", json={ - "volume": self.split_host(vfolder_host)[1], + "volume": self.get_proxy_and_volume(vfolder_host)[1], "vfid": str(vfolder_id), "subpath": str(subpath), }, @@ -195,7 +204,7 @@ async def request( *args, **kwargs, ) -> AsyncIterator[Tuple[yarl.URL, aiohttp.ClientResponse]]: - proxy_name, _ = self.split_host(vfolder_host_or_proxy_name) + proxy_name, _ = self.get_proxy_and_volume(vfolder_host_or_proxy_name) try: proxy_info = self._proxies[proxy_name] except KeyError: @@ -256,7 +265,7 @@ async def resolve_hardware_metadata(self, info: graphene.ResolveInfo) -> Hardwar async def resolve_performance_metric(self, info: graphene.ResolveInfo) -> Mapping[str, Any]: ctx: GraphQueryContext = info.context - proxy_name, volume_name = ctx.storage_manager.split_host(self.id) + proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(self.id) try: proxy_info = ctx.storage_manager._proxies[proxy_name] except KeyError: @@ -276,7 +285,7 @@ async def resolve_performance_metric(self, info: graphene.ResolveInfo) -> Mappin async def resolve_usage(self, info: graphene.ResolveInfo) -> Mapping[str, Any]: ctx: GraphQueryContext = info.context - proxy_name, volume_name = ctx.storage_manager.split_host(self.id) + proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(self.id) try: proxy_info = ctx.storage_manager._proxies[proxy_name] except KeyError: @@ -341,7 +350,7 @@ async def load_by_id( ctx: GraphQueryContext, id: str, ) -> StorageVolume: - proxy_name, volume_name = ctx.storage_manager.split_host(id) + proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(id) try: proxy_info = ctx.storage_manager._proxies[proxy_name] except KeyError: diff --git a/src/ai/backend/manager/models/user.py b/src/ai/backend/manager/models/user.py index e856f4acc48..a5a40c9f0af 100644 --- a/src/ai/backend/manager/models/user.py +++ b/src/ai/backend/manager/models/user.py @@ -1179,7 +1179,9 @@ async def delete_vfolders( ) rows = cast(list[VFolderRow], result.fetchall()) for vf in rows: - target_vfs.append(VFolderDeletionInfo(VFolderID.from_row(vf), vf.host)) + target_vfs.append( + VFolderDeletionInfo(VFolderID.from_row(vf), vf.host, vf.unmanaged_path) + ) storage_ptask_group = aiotools.PersistentTaskGroup() try: diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index a74e12830cc..000544cdc41 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -342,11 +342,13 @@ class VFolderPermissionSetAlias(enum.Enum): class VFolderDeletionInfo(NamedTuple): vfolder_id: VFolderID host: str + unmanaged_path: Optional[str] class VFolderCloneInfo(NamedTuple): source_vfolder_id: VFolderID source_host: str + unmanaged_path: Optional[str] domain_name: str # Target Vfolder infos @@ -541,6 +543,10 @@ def vfid(self) -> VFolderID: return VFolderID(self.quota_scope_id, self.id) +def is_unmanaged(unmanaged_path: Optional[str]) -> bool: + return (unmanaged_path is not None) and bool(unmanaged_path) + + def verify_vfolder_name(folder: str) -> bool: if folder in RESERVED_VFOLDERS: return False @@ -1030,7 +1036,7 @@ async def prepare_vfolder_mounts( "POST", "folder/file/mkdir", params={ - "volume": storage_manager.split_host(vfolder["host"])[1], + "volume": storage_manager.get_proxy_and_volume(vfolder["host"])[1], "vfid": str(VFolderID(vfolder["quota_scope_id"], vfolder["id"])), "relpaths": [str(user_scope.user_uuid.hex)], "exist_ok": True, @@ -1224,8 +1230,10 @@ async def _update_status() -> None: await execute_with_retry(_update_status) - target_proxy, target_volume = storage_manager.split_host(vfolder_info.target_host) - source_proxy, source_volume = storage_manager.split_host(vfolder_info.source_host) + target_proxy, target_volume = storage_manager.get_proxy_and_volume(vfolder_info.target_host) + source_proxy, source_volume = storage_manager.get_proxy_and_volume( + vfolder_info.source_host, is_unmanaged(vfolder_info.unmanaged_path) + ) # Generate the ID of the destination vfolder. # TODO: If we refactor to use ORM, the folder ID will be created from the database by inserting @@ -1332,7 +1340,7 @@ async def initiate_vfolder_deletion( ) -> int: """Purges VFolder content from storage host.""" vfolder_info_len = len(requested_vfolders) - vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in requested_vfolders) + vfolder_ids = tuple(vf_id.folder_id for vf_id, _, _ in requested_vfolders) vfolders.c.id.in_(vfolder_ids) if vfolder_info_len == 0: return 0 @@ -1352,8 +1360,10 @@ async def initiate_vfolder_deletion( already_deleted: list[VFolderDeletionInfo] = [] for vfolder_info in requested_vfolders: - folder_id, host_name = vfolder_info - proxy_name, volume_name = storage_manager.split_host(host_name) + folder_id, host_name, unmanaged_path = vfolder_info + proxy_name, volume_name = storage_manager.get_proxy_and_volume( + host_name, is_unmanaged(unmanaged_path) + ) try: async with storage_manager.request( proxy_name, @@ -1369,7 +1379,7 @@ async def initiate_vfolder_deletion( if e.status == 410: already_deleted.append(vfolder_info) if already_deleted: - vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in already_deleted) + vfolder_ids = tuple(vf_id.folder_id for vf_id, _, _ in already_deleted) await update_vfolder_status( db_engine, vfolder_ids, VFolderOperationStatus.DELETE_COMPLETE, do_log=False @@ -2082,7 +2092,9 @@ def resolve_id(self, info: graphene.ResolveInfo) -> str: async def resolve_details(self, info: graphene.ResolveInfo) -> Optional[int]: graph_ctx: GraphQueryContext = info.context - proxy_name, volume_name = graph_ctx.storage_manager.split_host(self.storage_host_name) + proxy_name, volume_name = graph_ctx.storage_manager.get_proxy_and_volume( + self.storage_host_name + ) try: async with graph_ctx.storage_manager.request( proxy_name, @@ -2168,7 +2180,7 @@ async def mutate( ) ) max_vfolder_size = props.hard_limit_bytes - proxy_name, volume_name = graph_ctx.storage_manager.split_host(storage_host_name) + proxy_name, volume_name = graph_ctx.storage_manager.get_proxy_and_volume(storage_host_name) request_body = { "volume": volume_name, "qsid": str(qsid), @@ -2212,7 +2224,7 @@ async def mutate( ) -> SetQuotaScope: qsid = QuotaScopeID.parse(quota_scope_id) graph_ctx: GraphQueryContext = info.context - proxy_name, volume_name = graph_ctx.storage_manager.split_host(storage_host_name) + proxy_name, volume_name = graph_ctx.storage_manager.get_proxy_and_volume(storage_host_name) request_body: dict[str, Any] = { "volume": volume_name, "qsid": str(qsid), diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index 024ea8d683e..3da07ffefed 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -422,7 +422,7 @@ async def gather_agent_hwinfo(self, instance_id: AgentId) -> Mapping[str, Hardwa return {k: check_type(v, HardwareMetadata) for k, v in result.items()} async def gather_storage_hwinfo(self, vfolder_host: str) -> HardwareMetadata: - proxy_name, volume_name = self.storage_manager.split_host(vfolder_host) + proxy_name, volume_name = self.storage_manager.get_proxy_and_volume(vfolder_host) async with self.storage_manager.request( proxy_name, "GET",