Skip to content

Commit

Permalink
Refactor schema.py
Browse files Browse the repository at this point in the history
  • Loading branch information
dnknth committed Oct 30, 2024
1 parent e2ea997 commit b34dc62
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "debugpy",
"request": "launch",
"module": "uvicorn",
"args": ["app:app", "--port", "5000", "--reload"]
"args": ["ldap_ui.app:app", "--port", "5000", "--reload"]
}
]
}
18 changes: 13 additions & 5 deletions backend/ldap_ui/ldap_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ldap_connect,
result,
)
from .schema import ObjectClass as OC
from .schema import frontend_schema

__all__ = ("api",)
Expand Down Expand Up @@ -97,7 +98,7 @@ def _entry(schema: SubSchema, res: Tuple[str, Any]) -> dict[str, Any]:
soc = [
oc.names[0]
for oc in map(lambda o: schema.get_obj(ObjectClass, o), ocs)
if oc.kind == 0
if oc.kind == OC.Kind.structural.value
]
aux = set(
schema.get_obj(ObjectClass, a).names[0]
Expand Down Expand Up @@ -207,7 +208,9 @@ async def blob(request: Request) -> Response:

if request.method == "GET":
if attr not in attrs or len(attrs[attr]) <= index:
raise HTTPException(404, f"Attribute {attr} not found for DN {dn}")
raise HTTPException(
HTTPStatus.NOT_FOUND.value, f"Attribute {attr} not found for DN {dn}"
)

return Response(
attrs[attr][index],
Expand Down Expand Up @@ -235,7 +238,9 @@ async def blob(request: Request) -> Response:

if request.method == "DELETE":
if attr not in attrs or len(attrs[attr]) <= index:
raise HTTPException(404, f"Attribute {attr} not found for DN {dn}")
raise HTTPException(
HTTPStatus.NOT_FOUND.value, f"Attribute {attr} not found for DN {dn}"
)
await empty(connection, connection.modify(dn, [(1, attr, None)]))
data = attrs[attr][:index] + attrs[attr][index + 1 :]
if data:
Expand Down Expand Up @@ -421,7 +426,9 @@ async def attribute_range(request: Request) -> JSONResponse:
)

if not values:
raise HTTPException(404, f"No values found for attribute {attribute}")
raise HTTPException(
HTTPStatus.NOT_FOUND.value, f"No values found for attribute {attribute}"
)

minimum, maximum = min(values), max(values)
return JSONResponse(
Expand All @@ -436,4 +443,5 @@ async def attribute_range(request: Request) -> JSONResponse:
@api.route("/schema")
async def json_schema(request: Request) -> JSONResponse:
"Dump the LDAP schema as JSON"
return JSONResponse(frontend_schema(request.app.state.schema))
schema = frontend_schema(request.app.state.schema)
return JSONResponse(schema.model_dump())
12 changes: 8 additions & 4 deletions backend/ldap_ui/ldap_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

import contextlib
from http import HTTPStatus
from typing import AsyncGenerator, Generator, Tuple

import ldap
Expand Down Expand Up @@ -83,9 +84,12 @@ async def unique(
res = r
else:
connection.abandon(msgid)
raise HTTPException(500, "Non-unique result")
raise HTTPException(
HTTPStatus.INTERNAL_SERVER_ERROR.value,
"Non-unique result",
)
if res is None:
raise HTTPException(404, "Empty search result")
raise HTTPException(HTTPStatus.NOT_FOUND.value, "Empty search result")
return res


Expand All @@ -97,7 +101,7 @@ async def empty(

async for r in result(connection, msgid):
connection.abandon(msgid)
raise HTTPException(500, "Unexpected result")
raise HTTPException(HTTPStatus.INTERNAL_SERVER_ERROR.value, "Unexpected result")


async def get_entry_by_dn(
Expand All @@ -109,4 +113,4 @@ async def get_entry_by_dn(
try:
return await unique(connection, connection.search(dn, ldap.SCOPE_BASE))
except ldap.NO_SUCH_OBJECT:
raise HTTPException(404, f"DN not found: {dn}")
raise HTTPException(HTTPStatus.NOT_FOUND.value, f"DN not found: {dn}")
188 changes: 109 additions & 79 deletions backend/ldap_ui/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,125 +6,155 @@
to the user.
"""

from typing import Any, Generator
from enum import IntEnum
from typing import Generator, Optional, Type, TypeVar, Union

from ldap.schema import SubSchema
from ldap.schema.models import AttributeType, LDAPSyntax, ObjectClass
from ldap.schema.models import AttributeType, SchemaElement
from ldap.schema.models import LDAPSyntax as LDAPSyntaxType
from ldap.schema.models import ObjectClass as ObjectClassType
from pydantic import BaseModel, Field, field_serializer

__all__ = ("frontend_schema",)
__all__ = ("frontend_schema", "Attribute", "ObjectClass")

T = TypeVar("T", bound=SchemaElement)

# Object class constants
SCHEMA_OC_KIND = {
0: "structural",
1: "abstract",
2: "auxiliary",
}

# Attribute usage constants
SCHEMA_ATTR_USAGE = {
0: "userApplications",
1: "directoryOperation",
2: "distributedOperation",
3: "dSAOperation",
}
class Element(BaseModel):
"Common attributes od schema elements"

oid: str
name: str
names: list[str] = Field(min_length=1)
desc: Optional[str]
obsolete: bool
sup: list[str] # TODO check

def element(obj) -> dict:
"Basic information about an schema element"

def element(obj: Union[AttributeType, ObjectClassType]) -> Element:
name = obj.names[0]
return {
"oid": obj.oid,
"name": name[:1].lower() + name[1:],
"names": obj.names,
"desc": obj.desc,
"obsolete": bool(obj.obsolete),
"sup": sorted(obj.sup),
}


def object_class_dict(obj) -> dict:
"Additional information about an object class"
r = element(obj)
r.update(
{
"may": sorted(obj.may),
"must": sorted(obj.must),
"kind": SCHEMA_OC_KIND[obj.kind],
}
)
return r


def attribute_dict(obj) -> dict:
"Additional information about an attribute"
r = element(obj)
r.update(
{
"single_value": bool(obj.single_value),
"no_user_mod": bool(obj.no_user_mod),
"usage": SCHEMA_ATTR_USAGE[obj.usage],
# FIXME avoid null values below
"equality": obj.equality,
"syntax": obj.syntax,
"substr": obj.substr,
"ordering": obj.ordering,
}
return Element(
oid=obj.oid,
name=name[:1].lower() + name[1:],
names=obj.names,
desc=obj.desc,
obsolete=bool(obj.obsolete),
sup=sorted(obj.sup),
)
return r


def syntax_dict(obj) -> dict:
"Information about an attribute syntax"
return {
"oid": obj.oid,
"desc": obj.desc,
"not_human_readable": bool(obj.not_human_readable),
}
class ObjectClass(Element):
class Kind(IntEnum):
structural = 0
abstract = 1
auxiliary = 2

may: list[str]
must: list[str]
kind: Kind

@field_serializer("kind")
def serialize_kind(self, kind: Kind, _info) -> str:
return kind.name


class Attribute(Element):
class Usage(IntEnum):
userApplications = 0
directoryOperation = 1
distributedOperation = 2
dSAOperation = 3

single_value: bool
no_user_mod: bool
usage: Usage
equality: Optional[str]
syntax: Optional[str]
substr: Optional[str]
ordering: Optional[str]

def lowercase_dict(attr: str, items) -> dict:
@field_serializer("usage")
def serialize_kind(self, usage: Usage, _info) -> str:
return usage.name


class Syntax(BaseModel):
oid: str
desc: str
not_human_readable: bool


def lowercase_dict(attr: str, items: list[T]) -> dict[str, T]:
"Create an dictionary with lowercased keys extracted from a given attribute"
return {obj[attr].lower(): obj for obj in items}
return {getattr(obj, attr).lower(): obj for obj in items}


def extract_type(
sub_schema: SubSchema, schema_class: Any
) -> Generator[Any, None, None]:
sub_schema: SubSchema, schema_class: Type[T]
) -> Generator[T, None, None]:
"Get non-obsolete objects from the schema for a type"

for oid in sub_schema.listall(schema_class):
obj = sub_schema.get_obj(schema_class, oid)
if schema_class is LDAPSyntax or not obj.obsolete:
if schema_class is LDAPSyntaxType or not obj.obsolete:
yield obj


class Schema(BaseModel):
attributes: dict[str, Attribute]
objectClasses: dict[str, ObjectClass]
syntaxes: dict[str, Syntax]


# See: https://www.python-ldap.org/en/latest/reference/ldap-schema.html
def frontend_schema(sub_schema: SubSchema) -> dict[Any]:
def frontend_schema(sub_schema: SubSchema) -> Schema:
"Dump an LDAP SubSchema"

return dict(
return Schema(
attributes=lowercase_dict(
"name",
sorted(
map(
attribute_dict,
extract_type(sub_schema, AttributeType),
(
Attribute(
single_value=bool(attr.single_value),
no_user_mod=bool(attr.no_user_mod),
usage=Attribute.Usage(attr.usage),
# FIXME avoid null values below
equality=attr.equality,
syntax=attr.syntax,
substr=attr.substr,
ordering=attr.ordering,
**element(attr).model_dump(),
)
for attr in extract_type(sub_schema, AttributeType)
),
key=lambda x: x["name"],
key=lambda x: x.name,
),
),
objectClasses=lowercase_dict(
"name",
sorted(
map(
object_class_dict,
extract_type(sub_schema, ObjectClass),
(
ObjectClass(
may=sorted(oc.may),
must=sorted(oc.must),
kind=ObjectClass.Kind(oc.kind),
**element(oc).model_dump(),
)
for oc in extract_type(sub_schema, ObjectClassType)
),
key=lambda x: x["name"],
key=lambda x: x.name,
),
),
syntaxes=lowercase_dict(
"oid", map(syntax_dict, extract_type(sub_schema, LDAPSyntax))
"oid",
[
Syntax(
oid=stx.oid,
desc=stx.desc,
not_human_readable=bool(stx.not_human_readable),
)
for stx in extract_type(sub_schema, LDAPSyntaxType)
],
),
)

0 comments on commit b34dc62

Please sign in to comment.