diff --git a/tests/test_buff_engine.py b/tests/test_buff_engine.py new file mode 100644 index 00000000..e9f5e968 --- /dev/null +++ b/tests/test_buff_engine.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import pytest + +from zsim.sim_progress.buff_engine import ( + BuffDefinition, + BuffEngine, + BuffExecutionContext, + BuffInstance, + BuffRegistry, + BuffStore, + Condition, + ConditionEvaluator, + ConditionOperator, + Effect, + EffectExecutor, + EventRouter, + TargetSelector, + Trigger, +) + + +@dataclass +class DummyEvent: + type: str + payload: dict[str, int] + + +@dataclass +class DummyActor: + name: str + level: int + + +def build_definition() -> BuffDefinition: + return BuffDefinition( + buff_id="buff.test", + name="Test Buff", + tags=("test",), + max_stacks=3, + duration=10.0, + stacking_rule="refresh", + triggers=(Trigger(event_type="skill.cast", parameters={"skill": "E"}),), + conditions=( + Condition.logical( + ConditionOperator.AND, + Condition.comparison("event.type", "==", "skill.cast"), + Condition.logical( + ConditionOperator.OR, + Condition.comparison("actor.level", ">=", 10), + Condition.comparison("event.payload.combo", ">=", 3), + ), + ), + ), + effects=(Effect("add.atk", {"value": 120}),), + target_selector=TargetSelector(scope="self", filters=({"type": "ally"},)), + metadata={"category": "unit"}, + ) + + +def test_condition_evaluator_handles_nested_logic() -> None: + definition = build_definition() + evaluator = ConditionEvaluator() + context = BuffExecutionContext( + event={"type": "skill.cast", "payload": {"combo": 4}}, + actor=DummyActor(name="Hero", level=5), + ) + assert evaluator.matches(definition, context) + + failing_context = BuffExecutionContext( + event={"type": "skill.cast", "payload": {"combo": 1}}, + actor=DummyActor(name="Hero", level=5), + ) + assert not evaluator.matches(definition, failing_context) + + +def test_registry_round_trip() -> None: + definition = build_definition() + with BuffRegistry() as registry: + registry.register_definition(definition) + loaded = registry.get_definition(definition.buff_id) + + assert loaded.buff_id == definition.buff_id + assert loaded.name == definition.name + assert loaded.triggers[0].event_type == "skill.cast" + assert loaded.effects[0].template_id == "add.atk" + assert loaded.target_selector.scope == "self" + + +def test_event_router_and_engine_dispatch() -> None: + definition = build_definition() + registry = BuffRegistry() + engine = BuffEngine( + registry=registry, + router=EventRouter(registry), + evaluator=ConditionEvaluator(), + executor=EffectExecutor(), + ) + results: list[tuple[str, int]] = [] + + def handler(effect: Effect, buff: BuffDefinition, context: BuffExecutionContext) -> int: + value = effect.parameters.model_dump()["value"] + results.append((buff.buff_id, value)) + return value + + engine.executor.register_handler("add.atk", handler) + engine.register_definition(definition) + + context = BuffExecutionContext( + event={"type": "skill.cast", "payload": {"combo": 5}}, + actor=DummyActor(name="Hero", level=20), + ) + + dispatch_results = engine.dispatch("skill.cast", context) + assert dispatch_results == [(definition.buff_id, [120])] + assert results == [(definition.buff_id, 120)] + + +def test_buff_store_operations() -> None: + store = BuffStore() + instance = BuffInstance(buff_id="buff.test", owner_id="hero", remaining_duration=1.0) + store.add(instance) + assert store.get("hero") == (instance,) + + store.tick_all(0.5) + assert pytest.approx(store.get("hero")[0].remaining_duration, rel=1e-6) == 0.5 + + removed = store.purge_expired() + assert removed == 0 + + store.tick_all(0.5) + assert store.get("hero")[0].expired + removed = store.purge_expired() + assert removed == 1 + assert store.get("hero") == () diff --git a/zsim/sim_progress/buff_engine/__init__.py b/zsim/sim_progress/buff_engine/__init__.py new file mode 100644 index 00000000..a80de60a --- /dev/null +++ b/zsim/sim_progress/buff_engine/__init__.py @@ -0,0 +1,41 @@ +"""Modernized buff engine for ZSim. + +This package provides data definitions, registry access and runtime services for +handling buff lifecycle in the simulator. The module is intentionally +decoupled from legacy Buff logic and can be adopted incrementally. +""" + +from .definitions import ( + BuffDefinition, + Trigger, + Condition, + Effect, + TargetSelector, + ConditionOperator, +) +from .registry import BuffRegistry +from .engine import ( + EventRouter, + ConditionEvaluator, + EffectExecutor, + BuffEngine, + BuffExecutionContext, +) +from .store import BuffInstance, BuffStore + +__all__ = [ + "BuffDefinition", + "Trigger", + "Condition", + "ConditionOperator", + "Effect", + "TargetSelector", + "BuffRegistry", + "EventRouter", + "ConditionEvaluator", + "EffectExecutor", + "BuffEngine", + "BuffExecutionContext", + "BuffInstance", + "BuffStore", +] diff --git a/zsim/sim_progress/buff_engine/definitions.py b/zsim/sim_progress/buff_engine/definitions.py new file mode 100644 index 00000000..f1267f98 --- /dev/null +++ b/zsim/sim_progress/buff_engine/definitions.py @@ -0,0 +1,272 @@ +"""Data definitions for the modern buff engine.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any, Callable, Iterable, Mapping, MutableMapping, Sequence + +from pydantic import BaseModel, ConfigDict + + +class ConditionOperator(StrEnum): + """Supported operators when evaluating conditions.""" + + ALWAYS = "always" + NEVER = "never" + COMPARISON = "comparison" + AND = "and" + OR = "or" + NOT = "not" + + +_COMPARATORS: Mapping[str, Callable[[Any, Any], bool]] = { + "==": lambda lhs, rhs: lhs == rhs, + "!=": lambda lhs, rhs: lhs != rhs, + ">": lambda lhs, rhs: lhs is not None and rhs is not None and lhs > rhs, + ">=": lambda lhs, rhs: lhs is not None and rhs is not None and lhs >= rhs, + "<": lambda lhs, rhs: lhs is not None and rhs is not None and lhs < rhs, + "<=": lambda lhs, rhs: lhs is not None and rhs is not None and lhs <= rhs, + "in": lambda lhs, rhs: lhs in rhs if rhs is not None else False, + "not_in": lambda lhs, rhs: lhs not in rhs if rhs is not None else True, +} + + +@dataclass(frozen=True, slots=True) +class Condition: + """Declarative condition tree used by the :class:`ConditionEvaluator`.""" + + operator: ConditionOperator + comparator: str | None = None + path: str | None = None + value: Any | None = None + operands: tuple["Condition", ...] = field(default_factory=tuple) + + def evaluate(self, context: Mapping[str, Any]) -> bool: + """Evaluate the condition against ``context``. + + ``context`` is expected to be a mapping that can resolve the fields + referenced by the condition. + """ + + if self.operator is ConditionOperator.ALWAYS: + return True + if self.operator is ConditionOperator.NEVER: + return False + if self.operator is ConditionOperator.NOT: + if not self.operands: + raise ValueError("NOT condition requires an operand") + return not self.operands[0].evaluate(context) + if self.operator is ConditionOperator.AND: + if not self.operands: + raise ValueError("AND condition requires at least one operand") + return all(op.evaluate(context) for op in self.operands) + if self.operator is ConditionOperator.OR: + if not self.operands: + raise ValueError("OR condition requires at least one operand") + return any(op.evaluate(context) for op in self.operands) + if self.operator is ConditionOperator.COMPARISON: + if self.path is None or self.comparator is None: + raise ValueError("Comparison condition requires field and comparator") + comparator = _COMPARATORS.get(self.comparator) + if comparator is None: + raise ValueError(f"Unsupported comparator: {self.comparator}") + lhs = _resolve_field(context, self.path) + rhs = self.value + return comparator(lhs, rhs) + raise ValueError(f"Unsupported operator: {self.operator}") + + @staticmethod + def always() -> "Condition": + return Condition(operator=ConditionOperator.ALWAYS) + + @staticmethod + def never() -> "Condition": + return Condition(operator=ConditionOperator.NEVER) + + @staticmethod + def comparison(path: str, comparator: str, value: Any) -> "Condition": + return Condition( + operator=ConditionOperator.COMPARISON, + path=path, + comparator=comparator, + value=value, + ) + + @staticmethod + def logical(operator: ConditionOperator, *operands: "Condition") -> "Condition": + if operator not in {ConditionOperator.AND, ConditionOperator.OR, ConditionOperator.NOT}: + raise ValueError("Logical conditions must use AND/OR/NOT operator") + return Condition(operator=operator, operands=tuple(operands)) + + +def _resolve_field(context: Mapping[str, Any], dotted_path: str) -> Any: + """Resolve dotted field names against nested mappings.""" + + current: Any = context + for part in dotted_path.split("."): + if isinstance(current, Mapping): + current = current.get(part) + elif hasattr(current, part): + current = getattr(current, part) + else: + return None + return current + + +class EffectPayload(BaseModel): + """Loose schema for buff effects. + + Effect templates can optionally subclass this model to provide additional + validation. By default extra keys are permitted so that legacy payloads + can be loaded without immediate migration work. + """ + + model_config = ConfigDict(extra="allow") + + +@dataclass(frozen=True, slots=True) +class Effect: + """Represents an effect record bound to a buff definition.""" + + template_id: str + parameters: EffectPayload + + def __init__(self, template_id: str, parameters: Mapping[str, Any] | EffectPayload): + object.__setattr__(self, "template_id", template_id) + if isinstance(parameters, EffectPayload): + payload = parameters + else: + payload = EffectPayload.model_validate(parameters) + object.__setattr__(self, "parameters", payload) + + +@dataclass(frozen=True, slots=True) +class Trigger: + """Event trigger definition.""" + + event_type: str + parameters: Mapping[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True, slots=True) +class TargetSelector: + """Describes how targets should be resolved for a buff.""" + + scope: str + filters: tuple[Mapping[str, Any], ...] = field(default_factory=tuple) + + +@dataclass(slots=True) +class BuffDefinition: + """Top level registration entity for a buff.""" + + buff_id: str + name: str + tags: tuple[str, ...] + max_stacks: int + duration: float | None + stacking_rule: str + triggers: tuple[Trigger, ...] + conditions: tuple[Condition, ...] + effects: tuple[Effect, ...] + target_selector: TargetSelector + metadata: MutableMapping[str, Any] = field(default_factory=dict) + + def iter_triggers(self) -> Iterable[Trigger]: + return self.triggers + + def iter_effects(self) -> Iterable[Effect]: + return self.effects + + def to_record(self) -> Mapping[str, Any]: + """Serialize the definition to a mapping suitable for persistence.""" + + return { + "buff_id": self.buff_id, + "name": self.name, + "tags": list(self.tags), + "max_stacks": self.max_stacks, + "duration": self.duration, + "stacking_rule": self.stacking_rule, + "conditions": [self._condition_to_dict(cond) for cond in self.conditions], + "target_selector": { + "scope": self.target_selector.scope, + "filters": list(self.target_selector.filters), + }, + "metadata": dict(self.metadata), + } + + @staticmethod + def _condition_to_dict(condition: Condition) -> Mapping[str, Any]: + return { + "operator": condition.operator.value, + "comparator": condition.comparator, + "path": condition.path, + "value": condition.value, + "operands": [BuffDefinition._condition_to_dict(op) for op in condition.operands], + } + + @classmethod + def from_record( + cls, + record: Mapping[str, Any], + triggers: Sequence[Trigger], + effects: Sequence[Effect], + ) -> "BuffDefinition": + return cls( + buff_id=str(record["buff_id"]), + name=str(record.get("name", record["buff_id"])), + tags=_ensure_tuple(record.get("tags", ())), + max_stacks=int(record.get("max_stacks", 1)), + duration=record.get("duration"), + stacking_rule=str(record.get("stacking_rule", "refresh")), + triggers=tuple(triggers), + conditions=tuple( + _condition_from_dict(raw) for raw in _ensure_sequence(record.get("conditions", ())) + ), + effects=tuple(effects), + target_selector=_selector_from_payload(record.get("target_selector", {})), + metadata=_ensure_mapping(record.get("metadata", {})), + ) + + +def _condition_from_dict(payload: Mapping[str, Any]) -> Condition: + operator = ConditionOperator(str(payload.get("operator", ConditionOperator.ALWAYS.value))) + operands = tuple(_condition_from_dict(item) for item in payload.get("operands", ())) + return Condition( + operator=operator, + comparator=payload.get("comparator"), + path=payload.get("path"), + value=payload.get("value"), + operands=operands, + ) + + +def _ensure_sequence(value: Any) -> Sequence[Mapping[str, Any]]: + if isinstance(value, str): + value = json.loads(value) + return tuple(value or ()) + + +def _ensure_tuple(value: Any) -> tuple[str, ...]: + if isinstance(value, str): + value = json.loads(value) + return tuple(value or ()) + + +def _ensure_mapping(value: Any) -> MutableMapping[str, Any]: + if isinstance(value, str): + value = json.loads(value) + return dict(value or {}) + + +def _selector_from_payload(value: Any) -> TargetSelector: + if isinstance(value, str): + value = json.loads(value) + scope = str((value or {}).get("scope", "self")) + filters = value.get("filters", ()) if isinstance(value, Mapping) else () + if isinstance(filters, str): + filters = json.loads(filters) + return TargetSelector(scope=scope, filters=tuple(filters or ())) diff --git a/zsim/sim_progress/buff_engine/engine.py b/zsim/sim_progress/buff_engine/engine.py new file mode 100644 index 00000000..d4eeadc3 --- /dev/null +++ b/zsim/sim_progress/buff_engine/engine.py @@ -0,0 +1,131 @@ +"""Runtime services that operate on the buff registry.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Callable, Iterable, Mapping, MutableMapping, Sequence + +from .definitions import BuffDefinition, Condition, Effect +from .registry import BuffRegistry +from .store import BuffStore + +EffectHandler = Callable[[Effect, BuffDefinition, "BuffExecutionContext"], Any] + + +@dataclass(slots=True) +class BuffExecutionContext: + """Container passed to effect handlers during dispatch.""" + + event: Mapping[str, Any] + actor: Any | None = None + targets: Sequence[Any] = field(default_factory=tuple) + store: BuffStore | None = None + extra: MutableMapping[str, Any] = field(default_factory=dict) + + def as_mapping(self) -> Mapping[str, Any]: + mapping: dict[str, Any] = dict(self.extra) + mapping.setdefault("event", self.event) + mapping.setdefault("actor", self.actor) + mapping.setdefault("targets", self.targets) + mapping.setdefault("store", self.store) + return mapping + + +class EventRouter: + """Index based dispatcher that maps events to candidate buffs.""" + + def __init__(self, registry: BuffRegistry) -> None: + self._registry = registry + self._cache: dict[str, tuple[str, ...]] = {} + self._dirty = True + + def invalidate(self) -> None: + self._dirty = True + + def _ensure_cache(self) -> None: + if self._dirty: + self._cache = dict(self._registry.as_event_index()) + self._dirty = False + + def route(self, event_type: str) -> tuple[str, ...]: + self._ensure_cache() + return self._cache.get(event_type, ()) + + +class ConditionEvaluator: + """Applies condition trees for a buff against a runtime context.""" + + def matches(self, definition: BuffDefinition, context: BuffExecutionContext) -> bool: + if not definition.conditions: + return True + mapping = context.as_mapping() + return all( + self._evaluate_condition(condition, mapping) for condition in definition.conditions + ) + + def _evaluate_condition(self, condition: Condition, mapping: Mapping[str, Any]) -> bool: + return condition.evaluate(mapping) + + +class EffectExecutor: + """Resolves effect templates and executes them.""" + + def __init__(self, handlers: Mapping[str, EffectHandler] | None = None) -> None: + self._handlers: dict[str, EffectHandler] = dict(handlers or {}) + + def register_handler(self, template_id: str, handler: EffectHandler) -> None: + self._handlers[template_id] = handler + + def execute(self, definition: BuffDefinition, context: BuffExecutionContext) -> list[Any]: + results: list[Any] = [] + for effect in definition.effects: + handler = self._handlers.get(effect.template_id) + if handler is None: + raise KeyError(f"Effect handler not registered for template {effect.template_id}") + results.append(handler(effect, definition, context)) + return results + + +class BuffEngine: + """Facade that wires the registry and runtime services together.""" + + def __init__( + self, + registry: BuffRegistry, + router: EventRouter | None = None, + evaluator: ConditionEvaluator | None = None, + executor: EffectExecutor | None = None, + ) -> None: + self.registry = registry + self.router = router or EventRouter(registry) + self.evaluator = evaluator or ConditionEvaluator() + self.executor = executor or EffectExecutor() + + def register_definition(self, definition: BuffDefinition) -> None: + self.registry.register_definition(definition) + self.router.invalidate() + + def dispatch( + self, event_type: str, context: BuffExecutionContext + ) -> list[tuple[str, list[Any]]]: + """Dispatch an event and execute matched buff effects. + + Returns a list of tuples ``(buff_id, effect_results)`` representing the + buffs whose conditions matched and were executed. + """ + + executed: list[tuple[str, list[Any]]] = [] + for buff_id in self.router.route(event_type): + definition = self.registry.get_definition(buff_id) + if not self.evaluator.matches(definition, context): + continue + results = self.executor.execute(definition, context) + executed.append((buff_id, results)) + return executed + + def refresh_router(self) -> None: + self.router.invalidate() + + def iter_buffs_for_event(self, event_type: str) -> Iterable[BuffDefinition]: + for buff_id in self.router.route(event_type): + yield self.registry.get_definition(buff_id) diff --git a/zsim/sim_progress/buff_engine/registry.py b/zsim/sim_progress/buff_engine/registry.py new file mode 100644 index 00000000..826a8d74 --- /dev/null +++ b/zsim/sim_progress/buff_engine/registry.py @@ -0,0 +1,255 @@ +"""SQLite backed registry for buff definitions.""" + +from __future__ import annotations + +from contextlib import AbstractContextManager +from pathlib import Path +import json +import sqlite3 +from typing import Iterable, Iterator, Mapping, Sequence + +from .definitions import ( + BuffDefinition, + Condition, + Effect, + EffectPayload, + TargetSelector, + Trigger, + _condition_from_dict, +) + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS buffs ( + buff_id TEXT PRIMARY KEY, + name TEXT NOT NULL, + tags TEXT NOT NULL, + max_stacks INTEGER NOT NULL, + duration REAL, + stacking_rule TEXT NOT NULL, + conditions TEXT NOT NULL, + target_selector TEXT NOT NULL, + metadata TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS buff_triggers ( + buff_id TEXT NOT NULL, + trigger_order INTEGER NOT NULL, + event_type TEXT NOT NULL, + parameters TEXT NOT NULL, + PRIMARY KEY (buff_id, trigger_order), + FOREIGN KEY (buff_id) REFERENCES buffs(buff_id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS buff_effects ( + buff_id TEXT NOT NULL, + effect_order INTEGER NOT NULL, + template_id TEXT NOT NULL, + parameters TEXT NOT NULL, + PRIMARY KEY (buff_id, effect_order), + FOREIGN KEY (buff_id) REFERENCES buffs(buff_id) ON DELETE CASCADE +); +""" + + +class BuffRegistry(AbstractContextManager): + """Persisted registry for the modern buff system.""" + + def __init__(self, database: str | Path | None = None) -> None: + self._path = str(database or ":memory:") + self._conn = sqlite3.connect(self._path, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._ensure_schema() + + def _ensure_schema(self) -> None: + with self._conn: # auto commit + self._conn.executescript(_SCHEMA) + + def close(self) -> None: + self._conn.close() + + # Context manager support ------------------------------------------------- + def __enter__(self) -> "BuffRegistry": + return self + + def __exit__(self, *exc_info) -> None: + self.close() + + # CRUD ------------------------------------------------------------------- + def register_definition(self, definition: BuffDefinition) -> None: + record = definition.to_record() + with self._conn: + self._conn.execute( + """ + INSERT INTO buffs (buff_id, name, tags, max_stacks, duration, stacking_rule, conditions, target_selector, metadata) + VALUES (:buff_id, :name, :tags, :max_stacks, :duration, :stacking_rule, :conditions, :target_selector, :metadata) + ON CONFLICT(buff_id) DO UPDATE SET + name = excluded.name, + tags = excluded.tags, + max_stacks = excluded.max_stacks, + duration = excluded.duration, + stacking_rule = excluded.stacking_rule, + conditions = excluded.conditions, + target_selector = excluded.target_selector, + metadata = excluded.metadata + """, + { + "buff_id": definition.buff_id, + "name": record["name"], + "tags": json.dumps(record["tags"], ensure_ascii=False), + "max_stacks": record["max_stacks"], + "duration": record["duration"], + "stacking_rule": record["stacking_rule"], + "conditions": json.dumps(record["conditions"], ensure_ascii=False), + "target_selector": json.dumps(record["target_selector"], ensure_ascii=False), + "metadata": json.dumps(record["metadata"], ensure_ascii=False), + }, + ) + self._conn.execute( + "DELETE FROM buff_triggers WHERE buff_id = ?", + (definition.buff_id,), + ) + trigger_rows = [ + ( + definition.buff_id, + index, + trigger.event_type, + json.dumps(trigger.parameters, ensure_ascii=False), + ) + for index, trigger in enumerate(definition.triggers) + ] + if trigger_rows: + self._conn.executemany( + "INSERT INTO buff_triggers (buff_id, trigger_order, event_type, parameters) VALUES (?, ?, ?, ?)", + trigger_rows, + ) + self._conn.execute( + "DELETE FROM buff_effects WHERE buff_id = ?", + (definition.buff_id,), + ) + effect_rows = [ + ( + definition.buff_id, + index, + effect.template_id, + json.dumps(effect.parameters.model_dump(), ensure_ascii=False), + ) + for index, effect in enumerate(definition.effects) + ] + if effect_rows: + self._conn.executemany( + "INSERT INTO buff_effects (buff_id, effect_order, template_id, parameters) VALUES (?, ?, ?, ?)", + effect_rows, + ) + + def get_definition(self, buff_id: str) -> BuffDefinition: + cursor = self._conn.cursor() + row = cursor.execute("SELECT * FROM buffs WHERE buff_id = ?", (buff_id,)).fetchone() + if row is None: + raise KeyError(buff_id) + triggers = self._load_triggers_for_buff(buff_id) + effects = self._load_effects_for_buff(buff_id) + return BuffDefinition.from_record(dict(row), triggers=triggers, effects=effects) + + def iter_definitions(self) -> Iterator[BuffDefinition]: + cursor = self._conn.cursor() + for row in cursor.execute("SELECT * FROM buffs ORDER BY buff_id"): + buff_id = row["buff_id"] + triggers = self._load_triggers_for_buff(buff_id) + effects = self._load_effects_for_buff(buff_id) + yield BuffDefinition.from_record(dict(row), triggers=triggers, effects=effects) + + def list_triggers_for_event(self, event_type: str) -> Sequence[tuple[str, Trigger]]: + cursor = self._conn.cursor() + rows = cursor.execute( + """ + SELECT buff_id, trigger_order, event_type, parameters + FROM buff_triggers + WHERE event_type = ? + ORDER BY trigger_order + """, + (event_type,), + ).fetchall() + result: list[tuple[str, Trigger]] = [] + for row in rows: + trigger = Trigger( + event_type=row["event_type"], parameters=json.loads(row["parameters"]) + ) + result.append((row["buff_id"], trigger)) + return result + + def _load_triggers_for_buff(self, buff_id: str) -> Sequence[Trigger]: + cursor = self._conn.cursor() + rows = cursor.execute( + "SELECT event_type, parameters FROM buff_triggers WHERE buff_id = ? ORDER BY trigger_order", + (buff_id,), + ).fetchall() + return tuple( + Trigger(event_type=row["event_type"], parameters=json.loads(row["parameters"])) + for row in rows + ) + + def _load_effects_for_buff(self, buff_id: str) -> Sequence[Effect]: + cursor = self._conn.cursor() + rows = cursor.execute( + "SELECT template_id, parameters FROM buff_effects WHERE buff_id = ? ORDER BY effect_order", + (buff_id,), + ).fetchall() + return tuple( + Effect( + template_id=row["template_id"], + parameters=EffectPayload.model_validate_json(row["parameters"]), + ) + for row in rows + ) + + def delete(self, buff_id: str) -> None: + with self._conn: + self._conn.execute("DELETE FROM buffs WHERE buff_id = ?", (buff_id,)) + + def clear(self) -> None: + with self._conn: + self._conn.execute("DELETE FROM buff_triggers") + self._conn.execute("DELETE FROM buff_effects") + self._conn.execute("DELETE FROM buffs") + + def load_target_selector(self, buff_id: str) -> TargetSelector: + row = self._conn.execute( + "SELECT target_selector FROM buffs WHERE buff_id = ?", + (buff_id,), + ).fetchone() + if row is None: + raise KeyError(buff_id) + payload = json.loads(row["target_selector"]) + return TargetSelector(scope=payload["scope"], filters=tuple(payload.get("filters", ()))) + + def load_conditions(self, buff_id: str) -> Sequence[Condition]: + row = self._conn.execute( + "SELECT conditions FROM buffs WHERE buff_id = ?", + (buff_id,), + ).fetchone() + if row is None: + raise KeyError(buff_id) + payload = json.loads(row["conditions"]) + return tuple(_condition_from_dict(item) for item in payload) + + # Utility ---------------------------------------------------------------- + def count(self) -> int: + cursor = self._conn.cursor() + row = cursor.execute("SELECT COUNT(*) AS total FROM buffs").fetchone() + return int(row["total"]) if row else 0 + + def as_event_index(self) -> Mapping[str, tuple[str, ...]]: + cursor = self._conn.cursor() + rows = cursor.execute( + "SELECT DISTINCT event_type, buff_id FROM buff_triggers ORDER BY event_type, buff_id" + ).fetchall() + event_map: dict[str, list[str]] = {} + for row in rows: + event_map.setdefault(row["event_type"], []).append(row["buff_id"]) + return {key: tuple(value) for key, value in event_map.items()} + + def iter_effects(self, buff_id: str) -> Iterable[Effect]: + return self._load_effects_for_buff(buff_id) + + def iter_triggers(self, buff_id: str) -> Iterable[Trigger]: + return self._load_triggers_for_buff(buff_id) diff --git a/zsim/sim_progress/buff_engine/store.py b/zsim/sim_progress/buff_engine/store.py new file mode 100644 index 00000000..bffdd6cf --- /dev/null +++ b/zsim/sim_progress/buff_engine/store.py @@ -0,0 +1,96 @@ +"""Runtime storage abstraction for active buff instances.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any, Callable, Iterator, MutableMapping + + +@dataclass(slots=True) +class BuffInstance: + """Light-weight runtime state for a buff.""" + + buff_id: str + owner_id: str + stacks: int = 1 + remaining_duration: float | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def tick(self, delta: float) -> None: + if self.remaining_duration is None: + return + self.remaining_duration = max(0.0, self.remaining_duration - delta) + + @property + def expired(self) -> bool: + return self.remaining_duration is not None and self.remaining_duration <= 0 + + +class BuffStore: + """Encapsulates CRUD operations on the dynamic buff dictionary.""" + + def __init__( + self, backing_store: MutableMapping[str, list[BuffInstance]] | None = None + ) -> None: + self._store: MutableMapping[str, list[BuffInstance]] + if backing_store is None: + self._store = defaultdict(list) + self._owns_store = True + else: + self._store = backing_store + self._owns_store = False + + def add(self, instance: BuffInstance) -> None: + self._store.setdefault(instance.owner_id, []).append(instance) + + def remove(self, owner_id: str, predicate: Callable[[BuffInstance], bool]) -> int: + instances = self._store.get(owner_id, []) + kept: list[BuffInstance] = [] + removed = 0 + for inst in instances: + if predicate(inst): + removed += 1 + else: + kept.append(inst) + if kept: + self._store[owner_id] = kept + else: + self._store.pop(owner_id, None) + return removed + + def get(self, owner_id: str) -> tuple[BuffInstance, ...]: + return tuple(self._store.get(owner_id, ())) + + def find(self, owner_id: str, buff_id: str) -> tuple[BuffInstance, ...]: + return tuple(inst for inst in self._store.get(owner_id, ()) if inst.buff_id == buff_id) + + def tick_all(self, delta: float) -> None: + for instances in list(self._store.values()): + for inst in instances: + inst.tick(delta) + + def purge_expired(self) -> int: + removed = 0 + for owner_id in list(self._store.keys()): + removed += self.remove(owner_id, lambda inst: inst.expired) + return removed + + def clear(self) -> None: + self._store.clear() + + def iter_all(self) -> Iterator[tuple[str, BuffInstance]]: + for owner_id, instances in self._store.items(): + for inst in instances: + yield owner_id, inst + + def as_dict(self) -> MutableMapping[str, list[BuffInstance]]: + return self._store + + def owns_store(self) -> bool: + return self._owns_store + + def sync_from(self, source: MutableMapping[str, list[BuffInstance]]) -> None: + self._store.clear() + for owner_id, instances in source.items(): + self._store[owner_id] = list(instances)