Skip to content

Commit

Permalink
start incremental pendulum consolidation (#16808)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 22, 2025
1 parent 8b7cd10 commit 7548f1b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 42 deletions.
9 changes: 6 additions & 3 deletions flows/automation-assessments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from uuid import uuid4

import anyio
import pendulum

from prefect import flow
from prefect.client.orchestration import get_client
Expand All @@ -19,6 +18,8 @@
EventResourceFilter,
)
from prefect.logging import get_run_logger
from prefect.types import DateTime
from prefect.types._datetime import parse_datetime


@asynccontextmanager
Expand All @@ -34,7 +35,9 @@ async def create_or_replace_automation(
for existing in response.json():
name = str(existing["name"])
if name.startswith(automation["name"]):
age = pendulum.now("UTC") - pendulum.parse(existing["created"])
parsed_datetime = parse_datetime(existing["created"])
assert isinstance(parsed_datetime, DateTime)
age = DateTime.now("UTC") - parsed_datetime
assert isinstance(age, timedelta)
if age > timedelta(minutes=10):
logger.info(
Expand Down Expand Up @@ -68,7 +71,7 @@ async def wait_for_event(
logger = get_run_logger()

filter = EventFilter(
occurred=EventOccurredFilter(since=pendulum.now("UTC")),
occurred=EventOccurredFilter(since=DateTime.now("UTC")),
event=EventNameFilter(name=[]),
resource=EventResourceFilter(id=[resource_id]),
)
Expand Down
68 changes: 34 additions & 34 deletions src/prefect/events/filters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List, Optional, Tuple, cast
from typing import Optional
from uuid import UUID

import pendulum
from pydantic import Field

from prefect._internal.schemas.bases import PrefectBaseModel
Expand All @@ -23,7 +22,7 @@ class AutomationFilterCreated(PrefectBaseModel):
class AutomationFilterName(PrefectBaseModel):
"""Filter by `Automation.created`."""

any_: Optional[List[str]] = Field(
any_: Optional[list[str]] = Field(
default=None,
description="Only include automations with names that match any of these strings",
)
Expand All @@ -41,8 +40,8 @@ class AutomationFilter(PrefectBaseModel):
class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg]
"""A base class for filtering event data."""

def get_filters(self) -> List["EventDataFilter"]:
filters: List["EventDataFilter"] = [
def get_filters(self) -> list["EventDataFilter"]:
filters: list["EventDataFilter"] = [
filter
for filter in [getattr(self, name) for name in self.model_fields]
if isinstance(filter, EventDataFilter)
Expand All @@ -60,14 +59,11 @@ def excludes(self, event: Event) -> bool:

class EventOccurredFilter(EventDataFilter):
since: DateTime = Field(
default_factory=lambda: cast(
DateTime,
pendulum.now("UTC").start_of("day").subtract(days=180),
),
default_factory=lambda: DateTime.now("UTC").start_of("day").subtract(days=180),
description="Only include events after this time (inclusive)",
)
until: DateTime = Field(
default_factory=lambda: cast(DateTime, pendulum.now("UTC")),
default_factory=lambda: DateTime.now("UTC"),
description="Only include events prior to this time (inclusive)",
)

Expand All @@ -76,18 +72,18 @@ def includes(self, event: Event) -> bool:


class EventNameFilter(EventDataFilter):
prefix: Optional[List[str]] = Field(
prefix: Optional[list[str]] = Field(
default=None, description="Only include events matching one of these prefixes"
)
exclude_prefix: Optional[List[str]] = Field(
exclude_prefix: Optional[list[str]] = Field(
default=None, description="Exclude events matching one of these prefixes"
)

name: Optional[List[str]] = Field(
name: Optional[list[str]] = Field(
default=None,
description="Only include events matching one of these names exactly",
)
exclude_name: Optional[List[str]] = Field(
exclude_name: Optional[list[str]] = Field(
default=None, description="Exclude events matching one of these names exactly"
)

Expand All @@ -112,20 +108,20 @@ def includes(self, event: Event) -> bool:


class EventResourceFilter(EventDataFilter):
id: Optional[List[str]] = Field(
None, description="Only include events for resources with these IDs"
id: Optional[list[str]] = Field(
default=None, description="Only include events for resources with these IDs"
)
id_prefix: Optional[List[str]] = Field(
None,
id_prefix: Optional[list[str]] = Field(
default=None,
description=(
"Only include events for resources with IDs starting with these prefixes."
),
)
labels: Optional[ResourceSpecification] = Field(
None, description="Only include events for resources with these labels"
default=None, description="Only include events for resources with these labels"
)
distinct: bool = Field(
False,
default=False,
description="Only include events for distinct resources",
)

Expand All @@ -148,35 +144,39 @@ def includes(self, event: Event) -> bool:


class EventRelatedFilter(EventDataFilter):
id: Optional[List[str]] = Field(
None, description="Only include events for related resources with these IDs"
id: Optional[list[str]] = Field(
default=None,
description="Only include events for related resources with these IDs",
)
role: Optional[List[str]] = Field(
None, description="Only include events for related resources in these roles"
role: Optional[list[str]] = Field(
default=None,
description="Only include events for related resources in these roles",
)
resources_in_roles: Optional[List[Tuple[str, str]]] = Field(
None,
resources_in_roles: Optional[list[tuple[str, str]]] = Field(
default=None,
description=(
"Only include events with specific related resources in specific roles"
),
)
labels: Optional[ResourceSpecification] = Field(
None, description="Only include events for related resources with these labels"
default=None,
description="Only include events for related resources with these labels",
)


class EventAnyResourceFilter(EventDataFilter):
id: Optional[List[str]] = Field(
None, description="Only include events for resources with these IDs"
id: Optional[list[str]] = Field(
default=None, description="Only include events for resources with these IDs"
)
id_prefix: Optional[List[str]] = Field(
None,
id_prefix: Optional[list[str]] = Field(
default=None,
description=(
"Only include events for resources with IDs starting with these prefixes"
),
)
labels: Optional[ResourceSpecification] = Field(
None, description="Only include events for related resources with these labels"
default=None,
description="Only include events for related resources with these labels",
)

def includes(self, event: Event) -> bool:
Expand All @@ -202,8 +202,8 @@ def _includes(self, resource: Resource) -> bool:


class EventIDFilter(EventDataFilter):
id: Optional[List[UUID]] = Field(
None, description="Only include events with one of these IDs"
id: Optional[list[UUID]] = Field(
default=None, description="Only include events with one of these IDs"
)

def includes(self, event: Event) -> bool:
Expand Down
11 changes: 6 additions & 5 deletions src/prefect/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from functools import partial
from typing import Annotated, Any, Dict, List, Optional, Set, TypeVar, Union
from typing_extensions import Literal, TypeAlias
from typing_extensions import Literal
import orjson
import pydantic
from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime
from pydantic_extra_types.pendulum_dt import Date as PydanticDate


from ._datetime import DateTime, Date
from pydantic import (
BeforeValidator,
Field,
Expand Down Expand Up @@ -37,8 +38,6 @@
),
]

DateTime: TypeAlias = PydanticDateTime
Date: TypeAlias = PydanticDate

BANNED_CHARACTERS = ["/", "%", "&", ">", "<"]

Expand Down Expand Up @@ -171,6 +170,8 @@ def convert_none_to_empty_dict(v: Optional[KeyValueLabels]) -> KeyValueLabels:

__all__ = [
"ClientRetryExtraCodes",
"Date",
"DateTime",
"LogLevel",
"KeyValueLabelsField",
"NonNegativeInteger",
Expand Down
19 changes: 19 additions & 0 deletions src/prefect/types/_datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

import pendulum
from pendulum.date import Date as PendulumDate
from pendulum.datetime import DateTime as PendulumDateTime
from pendulum.duration import Duration as PendulumDuration
from pendulum.time import Time as PendulumTime
from pydantic_extra_types.pendulum_dt import Date as PydanticDate
from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime
from typing_extensions import TypeAlias

DateTime: TypeAlias = PydanticDateTime
Date: TypeAlias = PydanticDate


def parse_datetime(
value: str,
) -> PendulumDateTime | PendulumDate | PendulumTime | PendulumDuration:
return pendulum.parse(value)

0 comments on commit 7548f1b

Please sign in to comment.