Skip to content

Commit

Permalink
Map NOOP volume before call request through storage session mgr
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa committed Feb 17, 2025
1 parent 65e3a14 commit b2ff6dd
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 41 deletions.
5 changes: 4 additions & 1 deletion src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
SESSION_PRIORITY_MAX,
SESSION_PRIORITY_MIN,
)
from ..models.vfolder import is_unmanaged
from ..types import UserScope
from ..utils import query_userinfo as _query_userinfo
from .auth import auth_required
Expand Down Expand Up @@ -2368,7 +2369,9 @@ async def get_task_logs(request: web.Request, params: Any) -> web.StreamResponse
)
log_vfolder = matched_vfolders[0]

proxy_name, volume_name = root_ctx.storage_manager.split_host(log_vfolder["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
log_vfolder["host"], is_unmanaged(log_vfolder["unmanaged_path"])
)
response = web.StreamResponse(status=200)
response.headers[hdrs.CONTENT_TYPE] = "text/plain"
prepared = False
Expand Down
69 changes: 51 additions & 18 deletions src/ai/backend/manager/api/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
from ..models.vfolder import (
VFolderPermissionRow,
delete_vfolder_relation_rows,
is_unmanaged,
)
from ..models.vfolder import VFolderRow as VFolderDBRow
from .auth import admin_required, auth_required, superadmin_required
Expand Down Expand Up @@ -613,7 +614,9 @@ async def create(request: web.Request, params: CreateRequestModel) -> web.Respon
if max_quota_scope_size and max_quota_scope_size > 0:
options["initial_max_size_for_quota_scope"] = max_quota_scope_size
body_data: dict[str, Any] = {
"volume": root_ctx.storage_manager.split_host(folder_host)[1],
"volume": root_ctx.storage_manager.get_proxy_and_volume(
folder_host, is_unmanaged(unmanaged_path)
)[1],
"vfid": str(vfid),
"options": options,
}
Expand Down Expand Up @@ -918,7 +921,7 @@ async def get_volume_perf_metric(request: web.Request, params: Any) -> web.Respo
request["user"]["email"],
request["keypair"]["access_key"],
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(params["folder_host"])
async with root_ctx.storage_manager.request(
proxy_name,
"GET",
Expand Down Expand Up @@ -964,7 +967,9 @@ async def get_info(request: web.Request, row: VFolderRow) -> web.Response:
else:
is_owner = row["is_owner"]
permission = row["permission"]
proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
row["host"], is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"GET",
Expand Down Expand Up @@ -1014,7 +1019,9 @@ async def get_quota(request: web.Request, params: Any) -> web.Response:
)[0]
await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE)
root_ctx: RootContext = request.app["_root.context"]
proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"])
)
log.info(
"VFOLDER.GET_QUOTA (email:{}, volume_name:{}, vf:{})",
request["user"]["email"],
Expand Down Expand Up @@ -1072,7 +1079,9 @@ async def update_quota(request: web.Request, params: Any) -> web.Response:
await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE)
root_ctx: RootContext = request.app["_root.context"]
folder_host = params["folder_host"]
proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host)
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
folder_host, is_unmanaged(vfolder_row["unmanaged_path"])
)
quota = int(params["input"]["size_bytes"])
log.info(
"VFOLDER.UPDATE_QUOTA (email:{}, volume_name:{}, quota:{}, vf:{})",
Expand Down Expand Up @@ -1158,7 +1167,9 @@ async def get_usage(request: web.Request, params: Any) -> web.Response:
)[0]
await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE)
root_ctx: RootContext = request.app["_root.context"]
proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"])
)
log.info(
"VFOLDER.GET_USAGE (email:{}, volume_name:{}, vf:{})",
request["user"]["email"],
Expand Down Expand Up @@ -1192,7 +1203,9 @@ async def get_used_bytes(request: web.Request, params: Any) -> web.Response:
)[0]
await check_vfolder_status(vfolder_row, VFolderStatusSet.READABLE)
root_ctx: RootContext = request.app["_root.context"]
proxy_name, volume_name = root_ctx.storage_manager.split_host(params["folder_host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
params["folder_host"], is_unmanaged(vfolder_row["unmanaged_path"])
)
log.info("VFOLDER.GET_USED_BYTES (volume_name:{}, vf:{})", volume_name, params["id"])
async with root_ctx.storage_manager.request(
proxy_name,
Expand Down Expand Up @@ -1353,7 +1366,9 @@ async def mkdir(request: web.Request, params: Any, row: VFolderRow) -> web.Respo
request.match_info["name"],
params["path"],
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
row["host"], is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1413,7 +1428,9 @@ async def create_download_session(
domain_name=domain_name,
permission=VFolderHostPermission.DOWNLOAD_FILE,
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host)
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
folder_host, is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1469,7 +1486,9 @@ async def create_upload_session(request: web.Request, params: Any, row: VFolderR
domain_name=domain_name,
permission=VFolderHostPermission.UPLOAD_FILE,
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host)
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
folder_host, is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1526,7 +1545,9 @@ async def rename_file(request: web.Request, params: Any, row: VFolderRow) -> web
params["target_path"],
params["new_name"],
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(folder_host)
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
folder_host, is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1563,7 +1584,9 @@ async def move_file(request: web.Request, params: Any, row: VFolderRow) -> web.R
params["src"],
params["dst"],
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
row["host"], is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1601,7 +1624,9 @@ async def delete_files(request: web.Request, params: Any, row: VFolderRow) -> we
params["files"],
recursive,
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
row["host"], is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -1638,7 +1663,9 @@ async def list_files(request: web.Request, params: Any, row: VFolderRow) -> web.
request.match_info["name"],
params["path"],
)
proxy_name, volume_name = root_ctx.storage_manager.split_host(row["host"])
proxy_name, volume_name = root_ctx.storage_manager.get_proxy_and_volume(
row["host"], is_unmanaged(row["unmanaged_path"])
)
async with root_ctx.storage_manager.request(
proxy_name,
"POST",
Expand Down Expand Up @@ -2460,7 +2487,7 @@ async def delete_from_trash_bin(
# fs-level deletion may fail or take longer time
await initiate_vfolder_deletion(
root_ctx.db,
[VFolderDeletionInfo(VFolderID.from_row(row), row["host"])],
[VFolderDeletionInfo(VFolderID.from_row(row), row["host"], row["unmanaged_path"])],
root_ctx.storage_manager,
app_ctx.storage_ptask_group,
)
Expand Down Expand Up @@ -2496,7 +2523,7 @@ async def force_delete(request: web.Request) -> web.Response:
)
await initiate_vfolder_deletion(
root_ctx.db,
[VFolderDeletionInfo(VFolderID.from_row(row), row["host"])],
[VFolderDeletionInfo(VFolderID.from_row(row), row["host"], row["unmanaged_path"])],
root_ctx.storage_manager,
force=True,
)
Expand Down Expand Up @@ -2706,8 +2733,13 @@ async def clone(request: web.Request, params: Any, row: VFolderRow) -> web.Respo
source_folder_id = VFolderID(row["quota_scope_id"], row["id"])
target_folder_host = params["folder_host"]
target_quota_scope_id = "..." # TODO: implement
source_proxy_name, source_volume_name = root_ctx.storage_manager.split_host(source_folder_host)
target_proxy_name, target_volume_name = root_ctx.storage_manager.split_host(target_folder_host)
source_unmanaged_path = row["unmanaged_path"]
source_proxy_name, source_volume_name = root_ctx.storage_manager.get_proxy_and_volume(
source_folder_host, is_unmanaged(source_unmanaged_path)
)
target_proxy_name, target_volume_name = root_ctx.storage_manager.get_proxy_and_volume(
target_folder_host
)

# check if the source vfolder is allowed to be cloned
if not row["cloneable"]:
Expand Down Expand Up @@ -2814,6 +2846,7 @@ async def clone(request: web.Request, params: Any, row: VFolderRow) -> web.Respo
VFolderCloneInfo(
source_folder_id,
source_folder_host,
source_unmanaged_path,
domain_name,
target_quota_scope_id,
params["target_name"],
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/manager/models/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ async def validate_model_definition(
vfid = VFolderID(model_vfolder_row["quota_scope_id"], model_vfolder_row["id"])
folder_host = model_vfolder_row["host"]

proxy_name, volume_name = storage_manager.split_host(folder_host)
proxy_name, volume_name = storage_manager.get_proxy_and_volume(folder_host)

if model_definition_path:
path = Path(model_definition_path)
Expand Down
5 changes: 4 additions & 1 deletion src/ai/backend/manager/models/gql_models/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
VFolderRow,
VirtualFolder,
get_permission_ctx,
is_unmanaged,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -566,7 +567,9 @@ async def _fetch_file(
quota_scope_id = vfolder_row.quota_scope_id
host = vfolder_row.host
vfolder_id = VFolderID(quota_scope_id, vfolder_row_id)
proxy_name, volume_name = graph_ctx.storage_manager.split_host(host)
proxy_name, volume_name = graph_ctx.storage_manager.get_proxy_and_volume(
host, is_unmanaged(vfolder_row.unmanaged_path)
)
try:
async with graph_ctx.storage_manager.request(
proxy_name,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/models/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,9 @@ async def delete_vfolders(
result = await db_session.scalars(query)
rows = cast(list[VFolderRow], result.fetchall())
for vf in rows:
target_vfs.append(VFolderDeletionInfo(VFolderID.from_row(vf), vf.host))
target_vfs.append(
VFolderDeletionInfo(VFolderID.from_row(vf), vf.host, vf.unmanaged_path)
)

storage_ptask_group = aiotools.PersistentTaskGroup()
try:
Expand Down
23 changes: 16 additions & 7 deletions src/ai/backend/manager/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,26 @@ async def aclose(self) -> None:
await asyncio.gather(*close_aws, return_exceptions=True)

@staticmethod
def split_host(vfolder_host: str) -> Tuple[str, str]:
def _split_host(vfolder_host: str) -> Tuple[str, str]:
proxy_name, _, volume_name = vfolder_host.partition(":")
return proxy_name, volume_name

@classmethod
def get_proxy_and_volume(
cls, vfolder_host: str, should_be_noop: bool = False
) -> tuple[str, str]:
proxy_name, volume_name = cls._split_host(vfolder_host)
if should_be_noop:
volume_name = NOOP_STORAGE_VOLUME_NAME
return proxy_name, volume_name

@staticmethod
def parse_host(proxy_name: str, volume_name: str) -> str:
return f"{proxy_name}:{volume_name}"

@classmethod
def is_noop_host(cls, vfolder_host: str) -> bool:
return cls.split_host(vfolder_host)[1] == NOOP_STORAGE_VOLUME_NAME
return cls._split_host(vfolder_host)[1] == NOOP_STORAGE_VOLUME_NAME

async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]:
"""
Expand Down Expand Up @@ -178,7 +187,7 @@ async def get_mount_path(
"GET",
"folder/mount",
json={
"volume": self.split_host(vfolder_host)[1],
"volume": self.get_proxy_and_volume(vfolder_host)[1],
"vfid": str(vfolder_id),
"subpath": str(subpath),
},
Expand All @@ -195,7 +204,7 @@ async def request(
*args,
**kwargs,
) -> AsyncIterator[Tuple[yarl.URL, aiohttp.ClientResponse]]:
proxy_name, _ = self.split_host(vfolder_host_or_proxy_name)
proxy_name, _ = self.get_proxy_and_volume(vfolder_host_or_proxy_name)
try:
proxy_info = self._proxies[proxy_name]
except KeyError:
Expand Down Expand Up @@ -256,7 +265,7 @@ async def resolve_hardware_metadata(self, info: graphene.ResolveInfo) -> Hardwar

async def resolve_performance_metric(self, info: graphene.ResolveInfo) -> Mapping[str, Any]:
ctx: GraphQueryContext = info.context
proxy_name, volume_name = ctx.storage_manager.split_host(self.id)
proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(self.id)
try:
proxy_info = ctx.storage_manager._proxies[proxy_name]
except KeyError:
Expand All @@ -276,7 +285,7 @@ async def resolve_performance_metric(self, info: graphene.ResolveInfo) -> Mappin

async def resolve_usage(self, info: graphene.ResolveInfo) -> Mapping[str, Any]:
ctx: GraphQueryContext = info.context
proxy_name, volume_name = ctx.storage_manager.split_host(self.id)
proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(self.id)
try:
proxy_info = ctx.storage_manager._proxies[proxy_name]
except KeyError:
Expand Down Expand Up @@ -341,7 +350,7 @@ async def load_by_id(
ctx: GraphQueryContext,
id: str,
) -> StorageVolume:
proxy_name, volume_name = ctx.storage_manager.split_host(id)
proxy_name, volume_name = ctx.storage_manager.get_proxy_and_volume(id)
try:
proxy_info = ctx.storage_manager._proxies[proxy_name]
except KeyError:
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,9 @@ async def delete_vfolders(
)
rows = cast(list[VFolderRow], result.fetchall())
for vf in rows:
target_vfs.append(VFolderDeletionInfo(VFolderID.from_row(vf), vf.host))
target_vfs.append(
VFolderDeletionInfo(VFolderID.from_row(vf), vf.host, vf.unmanaged_path)
)

storage_ptask_group = aiotools.PersistentTaskGroup()
try:
Expand Down
Loading

0 comments on commit b2ff6dd

Please sign in to comment.