Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BA-675): Implement Image status filtering logics #3647

Draft
wants to merge 3 commits into
base: topic/02-10-feat_implment_image_soft_hard_delete_apis
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/manager/graphql-reference/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
is_installed: Boolean
is_operation: Boolean @deprecated(reason: "Deprecated since 24.03.4. This field is ignored if `load_filters` is specified and is not null.")

"""Added in 25.3.0."""
load_only_active: Boolean = true

Check warning on line 87 in docs/manager/graphql-reference/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Argument 'load_only_active: Boolean' (with default value) added to field 'Queries.images'

Adding a new argument to an existing field may involve a change in resolve function logic that potentially may cause some side effects.

"""
Added in 24.03.8. Allowed values are: [general, operational, customized]. When superuser queries with `customized` option set the resolver will return every customized images (including those not owned by callee). To resolve images owned by user only call `customized_images`.
"""
Expand Down
25 changes: 15 additions & 10 deletions src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from ai.backend.common.docker import ImageRef
from ai.backend.manager.models.container_registry import ContainerRegistryRow
from ai.backend.manager.models.group import GroupRow
from ai.backend.manager.models.image import ImageIdentifier, rescan_images
from ai.backend.manager.models.image import ImageIdentifier, ImageStatus, rescan_images

if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncConnection as SAConnection
Expand Down Expand Up @@ -1259,6 +1259,7 @@ async def _commit_and_upload(reporter: ProgressReporter) -> None:
== f"{params.image_visibility.value}:{image_owner_id}"
)
)
.where(ImageRow.status == ImageStatus.ALIVE)
)
existing_image_count = await sess.scalar(query)

Expand All @@ -1275,16 +1276,20 @@ async def _commit_and_upload(reporter: ProgressReporter) -> None:
)

# check if image with same name exists and reuse ID it if is
query = sa.select(ImageRow).where(
ImageRow.name.like(f"{new_canonical}%")
& (
ImageRow.labels["ai.backend.customized-image.owner"].as_string()
== f"{params.image_visibility.value}:{image_owner_id}"
)
& (
ImageRow.labels["ai.backend.customized-image.name"].as_string()
== params.image_name
query = (
sa.select(ImageRow)
.where(
ImageRow.name.like(f"{new_canonical}%")
& (
ImageRow.labels["ai.backend.customized-image.owner"].as_string()
== f"{params.image_visibility.value}:{image_owner_id}"
)
& (
ImageRow.labels["ai.backend.customized-image.name"].as_string()
== params.image_name
)
)
.where(ImageRow.status == ImageStatus.ALIVE)
)
existing_row = await sess.scalar(query)

Expand Down
18 changes: 11 additions & 7 deletions src/ai/backend/manager/cli/image_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async def list_images(cli_ctx, short, installed_only):
):
displayed_items = []
try:
# Idea: Add `deleted` option to include deleted images.
items = await ImageRow.list(session)
# NOTE: installed/installed_agents fields are no longer provided in CLI,
# until we finish the epic refactoring of image metadata db.
Expand Down Expand Up @@ -250,20 +251,23 @@ async def validate_image_canonical(
if current or architecture is not None:
if current:
architecture = architecture or CURRENT_ARCH
image_row = await session.scalar(
sa.select(ImageRow).where(
(ImageRow.name == canonical) & (ImageRow.architecture == architecture)
)

assert architecture is not None
image_row = await ImageRow.resolve(
session, [ImageIdentifier(canonical, architecture)]
)
if image_row is None:
raise UnknownImageReference(f"{canonical}/{architecture}")

for key, value in validate_image_labels(image_row.labels).items():
print(f"{key:<40}: ", end="")
if isinstance(value, list):
value = f"{', '.join(value)}"
print(value)
else:
rows = await session.scalars(sa.select(ImageRow).where(ImageRow.name == canonical))
rows = await session.scalars(
sa.select(ImageRow)
.where(ImageRow.name == canonical)
.where(ImageRow.status == ImageStatus.ALIVE)
)
image_rows = rows.fetchall()
if not image_rows:
raise UnknownImageReference(f"{canonical}")
Expand Down
14 changes: 12 additions & 2 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ai.backend.manager.models.container_registry import ContainerRegistryRow

from ..defs import INTRINSIC_SLOTS_MIN
from ..models.image import ImageIdentifier, ImageRow, ImageType
from ..models.image import ImageIdentifier, ImageRow, ImageStatus, ImageType
from ..models.utils import ExtendedAsyncSAEngine

log = BraceStyleAdapter(logging.getLogger(__spec__.name))
Expand Down Expand Up @@ -131,7 +131,7 @@ async def commit_rescan_result(self) -> None:
existing_images = await session.scalars(
sa.select(ImageRow).where(
sa.func.ROW(ImageRow.name, ImageRow.architecture).in_(image_identifiers),
),
)
)
is_local = self.registry_name == "local"

Expand All @@ -146,6 +146,15 @@ async def commit_rescan_result(self) -> None:
image_row.resources = update["resources"]
image_row.is_local = is_local

if image_row.status == ImageStatus.DELETED:
image_row.status = ImageStatus.ALIVE

progress_msg = f"Restored deleted image - {image_ref.canonical}/{image_ref.architecture} ({update['config_digest']})"
log.info(progress_msg)

if (reporter := progress_reporter.get()) is not None:
await reporter.update(1, message=progress_msg)

for image_identifier, update in _all_updates.items():
try:
parsed_img = ImageRef.from_image_str(
Expand Down Expand Up @@ -178,6 +187,7 @@ async def commit_rescan_result(self) -> None:
accelerators=update.get("accels"),
labels=update["labels"],
resources=update["resources"],
status=ImageStatus.ALIVE,
)
)
progress_msg = f"Updated image - {parsed_img.canonical}/{image_identifier.architecture} ({update['config_digest']})"
Expand Down
6 changes: 4 additions & 2 deletions src/ai/backend/manager/container_registry/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ai.backend.common.docker import arch_name_aliases, get_docker_connector
from ai.backend.logging import BraceStyleAdapter

from ..models.image import ImageRow
from ..models.image import ImageRow, ImageStatus
from .base import (
BaseContainerRegistry,
concurrency_sema,
Expand Down Expand Up @@ -82,10 +82,12 @@ async def _read_image_info(
config_digest = data["Id"]
async with self.db.begin_readonly_session() as db_session:
already_exists = await db_session.scalar(
sa.select([sa.func.count(ImageRow.id)]).where(
sa.select([sa.func.count(ImageRow.id)])
.where(
ImageRow.config_digest == config_digest,
ImageRow.is_local == sa.false(),
)
.where(ImageRow.status == ImageStatus.ALIVE),
)
if already_exists > 0:
return {}, "already synchronized from a remote registry"
Expand Down
13 changes: 10 additions & 3 deletions src/ai/backend/manager/models/gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ class Queries(graphene.ObjectType):
is_operation=graphene.Boolean(
deprecation_reason="Deprecated since 24.03.4. This field is ignored if `load_filters` is specified and is not null."
),
load_only_active=graphene.Boolean(
default_value=True,
description="Added in 25.3.0.",
),
load_filters=graphene.List(
graphene.String,
default_value=None,
Expand Down Expand Up @@ -1373,13 +1377,15 @@ async def resolve_image(
client_role = ctx.user["role"]
client_domain = ctx.user["domain_name"]
if id:
item = await Image.load_item_by_id(info.context, uuid.UUID(id))
item = await Image.load_item_by_id(info.context, uuid.UUID(id), load_only_active=False)
else:
if not (reference and architecture):
raise InvalidAPIParameters(
"reference/architecture and id can't be omitted at the same time!"
)
item = await Image.load_item(info.context, reference, architecture)
item = await Image.load_item(
info.context, reference, architecture, load_only_active=False
)
if client_role == UserRole.SUPERADMIN:
pass
elif client_role in (UserRole.ADMIN, UserRole.USER):
Expand Down Expand Up @@ -1428,6 +1434,7 @@ async def resolve_images(
*,
is_installed: bool | None = None,
is_operation=False,
load_only_active: bool = True,
load_filters: list[str] | None = None,
image_filters: list[str] | None = None,
) -> Sequence[Image]:
Expand Down Expand Up @@ -1459,7 +1466,7 @@ async def resolve_images(
# but to conform with previous implementation...
image_load_types.add(ImageLoadFilter.OPERATIONAL)

items = await Image.load_all(ctx, types=image_load_types)
items = await Image.load_all(ctx, types=image_load_types, load_only_active=load_only_active)
if client_role == UserRole.SUPERADMIN:
pass
elif client_role in (UserRole.ADMIN, UserRole.USER):
Expand Down
41 changes: 34 additions & 7 deletions src/ai/backend/manager/models/gql_models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,15 @@ async def batch_load_by_canonical(
cls,
graph_ctx: GraphQueryContext,
image_names: Sequence[str],
load_only_active: bool = True,
) -> Sequence[Optional[Image]]:
query = (
sa.select(ImageRow)
.where(ImageRow.name.in_(image_names))
.options(selectinload(ImageRow.aliases))
)
if load_only_active:
query = query.where(ImageRow.status == ImageStatus.ALIVE)
async with graph_ctx.db.begin_readonly_session() as session:
result = await session.execute(query)
return [await Image.from_row(graph_ctx, row) for row in result.scalars().all()]
Expand All @@ -210,18 +213,22 @@ async def batch_load_by_image_ref(
cls,
graph_ctx: GraphQueryContext,
image_refs: Sequence[ImageRef],
load_only_active: bool = True,
) -> Sequence[Optional[Image]]:
image_names = [x.canonical for x in image_refs]
return await cls.batch_load_by_canonical(graph_ctx, image_names)
return await cls.batch_load_by_canonical(graph_ctx, image_names, load_only_active)

@classmethod
async def load_item_by_id(
cls,
ctx: GraphQueryContext,
id: UUID,
load_only_active: bool = True,
) -> Image:
async with ctx.db.begin_readonly_session() as session:
row = await ImageRow.get(session, id, load_aliases=True)
row = await ImageRow.get(
session, id, load_aliases=True, load_only_active=load_only_active
)
if not row:
raise ImageNotFound

Expand All @@ -233,6 +240,7 @@ async def load_item(
ctx: GraphQueryContext,
reference: str,
architecture: str,
load_only_active: bool = True,
) -> Image:
try:
async with ctx.db.begin_readonly_session() as session:
Expand All @@ -242,6 +250,7 @@ async def load_item(
ImageIdentifier(reference, architecture),
ImageAlias(reference),
],
load_only_active=load_only_active,
)
except UnknownImageReference:
raise ImageNotFound
Expand All @@ -253,9 +262,12 @@ async def load_all(
ctx: GraphQueryContext,
*,
types: set[ImageLoadFilter] = set(),
load_only_active: bool = True,
) -> Sequence[Image]:
async with ctx.db.begin_readonly_session() as session:
rows = await ImageRow.list(session, load_aliases=True)
rows = await ImageRow.list(
session, load_aliases=True, load_only_active=load_only_active
)
items: list[Image] = [
item async for item in cls.bulk_load(ctx, rows) if item.matches_filter(ctx, types)
]
Expand Down Expand Up @@ -355,12 +367,16 @@ async def batch_load_by_name_and_arch(
cls,
graph_ctx: GraphQueryContext,
name_and_arch: Sequence[tuple[str, str]],
load_only_active: bool = True,
) -> Sequence[Sequence[ImageNode]]:
query = (
sa.select(ImageRow)
.where(sa.tuple_(ImageRow.name, ImageRow.architecture).in_(name_and_arch))
.options(selectinload(ImageRow.aliases))
)
if load_only_active:
query = query.where(ImageRow.status == ImageStatus.ALIVE)

async with graph_ctx.db.begin_readonly_session() as db_session:
return await batch_multiresult_in_scalar_stream(
graph_ctx,
Expand All @@ -376,9 +392,12 @@ async def batch_load_by_image_identifier(
cls,
graph_ctx: GraphQueryContext,
image_ids: Sequence[ImageIdentifier],
load_only_active: bool = True,
) -> Sequence[Sequence[ImageNode]]:
name_and_arch_tuples = [(img.canonical, img.architecture) for img in image_ids]
return await cls.batch_load_by_name_and_arch(graph_ctx, name_and_arch_tuples)
return await cls.batch_load_by_name_and_arch(
graph_ctx, name_and_arch_tuples, load_only_active
)

@overload
@classmethod
Expand Down Expand Up @@ -421,6 +440,7 @@ def from_row(cls, row: ImageRow | None) -> ImageNode | None:
],
supported_accelerators=(row.accelerators or "").split(","),
aliases=[alias_row.alias for alias_row in row.aliases],
status=row.status,
)

@classmethod
Expand All @@ -445,6 +465,7 @@ def from_legacy_image(cls, row: Image) -> ImageNode:
resource_limits=row.resource_limits,
supported_accelerators=row.supported_accelerators,
aliases=row.aliases,
status=row.status,
)

@classmethod
Expand Down Expand Up @@ -500,7 +521,9 @@ async def mutate(
client_role = ctx.user["role"]

async with ctx.db.begin_session() as session:
image_row = await ImageRow.get(session, _image_id, load_aliases=True)
image_row = await ImageRow.get(
session, _image_id, load_only_active=True, load_aliases=True
)
if not image_row:
raise ObjectNotFound("image")
if client_role != UserRole.SUPERADMIN:
Expand Down Expand Up @@ -648,7 +671,9 @@ async def mutate(
client_role = ctx.user["role"]

async with ctx.db.begin_session() as session:
image_row = await ImageRow.get(session, _image_id, load_aliases=True)
image_row = await ImageRow.get(
session, _image_id, load_only_active=True, load_aliases=True
)
if not image_row:
raise ObjectNotFound("image")
if client_role != UserRole.SUPERADMIN:
Expand Down Expand Up @@ -702,7 +727,9 @@ async def mutate(
client_role = ctx.user["role"]

async with ctx.db.begin_readonly_session() as session:
image_row = await ImageRow.get(session, _image_id, load_aliases=True)
image_row = await ImageRow.get(
session, _image_id, load_only_active=True, load_aliases=True
)
if not image_row:
raise ImageNotFound
if client_role != UserRole.SUPERADMIN:
Expand Down
Loading
Loading