diff --git a/.vscode/launch.json b/.vscode/launch.json index 1bcf9ac..3853466 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,7 +9,7 @@ "type": "debugpy", "request": "launch", "module": "uvicorn", - "args": ["app:app", "--port", "5000", "--reload"] + "args": ["ldap_ui.app:app", "--port", "5000", "--reload"] } ] } diff --git a/backend/ldap_ui/ldap_api.py b/backend/ldap_ui/ldap_api.py index 9997f5b..4221894 100644 --- a/backend/ldap_ui/ldap_api.py +++ b/backend/ldap_ui/ldap_api.py @@ -33,6 +33,7 @@ ldap_connect, result, ) +from .schema import ObjectClass as OC from .schema import frontend_schema __all__ = ("api",) @@ -97,7 +98,7 @@ def _entry(schema: SubSchema, res: Tuple[str, Any]) -> dict[str, Any]: soc = [ oc.names[0] for oc in map(lambda o: schema.get_obj(ObjectClass, o), ocs) - if oc.kind == 0 + if oc.kind == OC.Kind.structural.value ] aux = set( schema.get_obj(ObjectClass, a).names[0] @@ -207,7 +208,9 @@ async def blob(request: Request) -> Response: if request.method == "GET": if attr not in attrs or len(attrs[attr]) <= index: - raise HTTPException(404, f"Attribute {attr} not found for DN {dn}") + raise HTTPException( + HTTPStatus.NOT_FOUND.value, f"Attribute {attr} not found for DN {dn}" + ) return Response( attrs[attr][index], @@ -235,7 +238,9 @@ async def blob(request: Request) -> Response: if request.method == "DELETE": if attr not in attrs or len(attrs[attr]) <= index: - raise HTTPException(404, f"Attribute {attr} not found for DN {dn}") + raise HTTPException( + HTTPStatus.NOT_FOUND.value, f"Attribute {attr} not found for DN {dn}" + ) await empty(connection, connection.modify(dn, [(1, attr, None)])) data = attrs[attr][:index] + attrs[attr][index + 1 :] if data: @@ -421,7 +426,9 @@ async def attribute_range(request: Request) -> JSONResponse: ) if not values: - raise HTTPException(404, f"No values found for attribute {attribute}") + raise HTTPException( + HTTPStatus.NOT_FOUND.value, f"No values found for attribute {attribute}" + ) minimum, maximum = min(values), max(values) return JSONResponse( @@ -436,4 +443,5 @@ async def attribute_range(request: Request) -> JSONResponse: @api.route("/schema") async def json_schema(request: Request) -> JSONResponse: "Dump the LDAP schema as JSON" - return JSONResponse(frontend_schema(request.app.state.schema)) + schema = frontend_schema(request.app.state.schema) + return JSONResponse(schema.model_dump()) diff --git a/backend/ldap_ui/ldap_helpers.py b/backend/ldap_ui/ldap_helpers.py index d891784..4ba7b7c 100644 --- a/backend/ldap_ui/ldap_helpers.py +++ b/backend/ldap_ui/ldap_helpers.py @@ -11,6 +11,7 @@ """ import contextlib +from http import HTTPStatus from typing import AsyncGenerator, Generator, Tuple import ldap @@ -83,9 +84,12 @@ async def unique( res = r else: connection.abandon(msgid) - raise HTTPException(500, "Non-unique result") + raise HTTPException( + HTTPStatus.INTERNAL_SERVER_ERROR.value, + "Non-unique result", + ) if res is None: - raise HTTPException(404, "Empty search result") + raise HTTPException(HTTPStatus.NOT_FOUND.value, "Empty search result") return res @@ -97,7 +101,7 @@ async def empty( async for r in result(connection, msgid): connection.abandon(msgid) - raise HTTPException(500, "Unexpected result") + raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR.value, "Unexpected result") async def get_entry_by_dn( @@ -109,4 +113,4 @@ async def get_entry_by_dn( try: return await unique(connection, connection.search(dn, ldap.SCOPE_BASE)) except ldap.NO_SUCH_OBJECT: - raise HTTPException(404, f"DN not found: {dn}") + raise HTTPException(HTTPStatus.NOT_FOUND.value, f"DN not found: {dn}") diff --git a/backend/ldap_ui/schema.py b/backend/ldap_ui/schema.py index d8b4841..c6d0c30 100644 --- a/backend/ldap_ui/schema.py +++ b/backend/ldap_ui/schema.py @@ -6,125 +6,155 @@ to the user. """ -from typing import Any, Generator +from enum import IntEnum +from typing import Generator, Optional, Type, TypeVar, Union from ldap.schema import SubSchema -from ldap.schema.models import AttributeType, LDAPSyntax, ObjectClass +from ldap.schema.models import AttributeType, SchemaElement +from ldap.schema.models import LDAPSyntax as LDAPSyntaxType +from ldap.schema.models import ObjectClass as ObjectClassType +from pydantic import BaseModel, Field, field_serializer -__all__ = ("frontend_schema",) +__all__ = ("frontend_schema", "Attribute", "ObjectClass") +T = TypeVar("T", bound=SchemaElement) -# Object class constants -SCHEMA_OC_KIND = { - 0: "structural", - 1: "abstract", - 2: "auxiliary", -} -# Attribute usage constants -SCHEMA_ATTR_USAGE = { - 0: "userApplications", - 1: "directoryOperation", - 2: "distributedOperation", - 3: "dSAOperation", -} +class Element(BaseModel): + "Common attributes od schema elements" + oid: str + name: str + names: list[str] = Field(min_length=1) + desc: Optional[str] + obsolete: bool + sup: list[str] # TODO check -def element(obj) -> dict: - "Basic information about an schema element" + +def element(obj: Union[AttributeType, ObjectClassType]) -> Element: name = obj.names[0] - return { - "oid": obj.oid, - "name": name[:1].lower() + name[1:], - "names": obj.names, - "desc": obj.desc, - "obsolete": bool(obj.obsolete), - "sup": sorted(obj.sup), - } - - -def object_class_dict(obj) -> dict: - "Additional information about an object class" - r = element(obj) - r.update( - { - "may": sorted(obj.may), - "must": sorted(obj.must), - "kind": SCHEMA_OC_KIND[obj.kind], - } - ) - return r - - -def attribute_dict(obj) -> dict: - "Additional information about an attribute" - r = element(obj) - r.update( - { - "single_value": bool(obj.single_value), - "no_user_mod": bool(obj.no_user_mod), - "usage": SCHEMA_ATTR_USAGE[obj.usage], - # FIXME avoid null values below - "equality": obj.equality, - "syntax": obj.syntax, - "substr": obj.substr, - "ordering": obj.ordering, - } + return Element( + oid=obj.oid, + name=name[:1].lower() + name[1:], + names=obj.names, + desc=obj.desc, + obsolete=bool(obj.obsolete), + sup=sorted(obj.sup), ) - return r -def syntax_dict(obj) -> dict: - "Information about an attribute syntax" - return { - "oid": obj.oid, - "desc": obj.desc, - "not_human_readable": bool(obj.not_human_readable), - } +class ObjectClass(Element): + class Kind(IntEnum): + structural = 0 + abstract = 1 + auxiliary = 2 + + may: list[str] + must: list[str] + kind: Kind + + @field_serializer("kind") + def serialize_kind(self, kind: Kind, _info) -> str: + return kind.name + + +class Attribute(Element): + class Usage(IntEnum): + userApplications = 0 + directoryOperation = 1 + distributedOperation = 2 + dSAOperation = 3 + single_value: bool + no_user_mod: bool + usage: Usage + equality: Optional[str] + syntax: Optional[str] + substr: Optional[str] + ordering: Optional[str] -def lowercase_dict(attr: str, items) -> dict: + @field_serializer("usage") + def serialize_kind(self, usage: Usage, _info) -> str: + return usage.name + + +class Syntax(BaseModel): + oid: str + desc: str + not_human_readable: bool + + +def lowercase_dict(attr: str, items: list[T]) -> dict[str, T]: "Create an dictionary with lowercased keys extracted from a given attribute" - return {obj[attr].lower(): obj for obj in items} + return {getattr(obj, attr).lower(): obj for obj in items} def extract_type( - sub_schema: SubSchema, schema_class: Any -) -> Generator[Any, None, None]: + sub_schema: SubSchema, schema_class: Type[T] +) -> Generator[T, None, None]: "Get non-obsolete objects from the schema for a type" for oid in sub_schema.listall(schema_class): obj = sub_schema.get_obj(schema_class, oid) - if schema_class is LDAPSyntax or not obj.obsolete: + if schema_class is LDAPSyntaxType or not obj.obsolete: yield obj +class Schema(BaseModel): + attributes: dict[str, Attribute] + objectClasses: dict[str, ObjectClass] + syntaxes: dict[str, Syntax] + + # See: https://www.python-ldap.org/en/latest/reference/ldap-schema.html -def frontend_schema(sub_schema: SubSchema) -> dict[Any]: +def frontend_schema(sub_schema: SubSchema) -> Schema: "Dump an LDAP SubSchema" - return dict( + return Schema( attributes=lowercase_dict( "name", sorted( - map( - attribute_dict, - extract_type(sub_schema, AttributeType), + ( + Attribute( + single_value=bool(attr.single_value), + no_user_mod=bool(attr.no_user_mod), + usage=Attribute.Usage(attr.usage), + # FIXME avoid null values below + equality=attr.equality, + syntax=attr.syntax, + substr=attr.substr, + ordering=attr.ordering, + **element(attr).model_dump(), + ) + for attr in extract_type(sub_schema, AttributeType) ), - key=lambda x: x["name"], + key=lambda x: x.name, ), ), objectClasses=lowercase_dict( "name", sorted( - map( - object_class_dict, - extract_type(sub_schema, ObjectClass), + ( + ObjectClass( + may=sorted(oc.may), + must=sorted(oc.must), + kind=ObjectClass.Kind(oc.kind), + **element(oc).model_dump(), + ) + for oc in extract_type(sub_schema, ObjectClassType) ), - key=lambda x: x["name"], + key=lambda x: x.name, ), ), syntaxes=lowercase_dict( - "oid", map(syntax_dict, extract_type(sub_schema, LDAPSyntax)) + "oid", + [ + Syntax( + oid=stx.oid, + desc=stx.desc, + not_human_readable=bool(stx.not_human_readable), + ) + for stx in extract_type(sub_schema, LDAPSyntaxType) + ], ), )