diff --git a/python/examples/dining_philosophers.py b/python/examples/dining_philosophers.py index c520ba0ae..4d6cb5d01 100644 --- a/python/examples/dining_philosophers.py +++ b/python/examples/dining_philosophers.py @@ -44,9 +44,10 @@ from enum import auto, Enum from typing import Any, cast +import monarch.actor from monarch._src.actor.actor_mesh import ActorMesh from monarch._src.actor.host_mesh import _spawn_admin -from monarch.actor import Actor, current_rank, endpoint, this_host +from monarch.actor import Actor, current_rank, endpoint, MeshFailure, this_host from monarch.distributed_telemetry.actor import start_telemetry @@ -76,13 +77,19 @@ async def _request_chopsticks(self) -> None: left, right = self._chopstick_indices() self.left_status = ChopstickStatus.REQUESTED self.right_status = ChopstickStatus.REQUESTED - await self.waiter.request_chopsticks.call_one(self.rank, left, right) + try: + await self.waiter.request_chopsticks.call_one(self.rank, left, right) + except Exception: + print(f"philosopher {self.rank}: waiter unreachable, stopping", flush=True) async def _release_chopsticks(self) -> None: left, right = self._chopstick_indices() self.left_status = ChopstickStatus.NONE self.right_status = ChopstickStatus.NONE - await self.waiter.release_chopsticks.call_one(left, right) + try: + await self.waiter.release_chopsticks.call_one(left, right) + except Exception: + print(f"philosopher {self.rank}: waiter unreachable, stopping", flush=True) @endpoint async def start(self, waiter: Any) -> None: @@ -154,12 +161,21 @@ async def release_chopsticks(self, left: int, right: int) -> None: NUM_PHILOSOPHERS = 5 +def _swallow_fault(failure: MeshFailure) -> None: + """Prevent unhandled actor faults from crashing the process.""" + print(f"[fault suppressed] {str(failure)[:120]}...", flush=True) + + async def async_main( dashboard: bool = False, dashboard_port: int = 8265, kill_waiter_after: float | None = None, ) -> None: telemetry_url = None + monarch.actor.unhandled_fault_hook = ( + _swallow_fault # pyre-ignore[9]: monkey-patching module hook + ) + if dashboard: _, telemetry_url = start_telemetry( include_dashboard=True, dashboard_port=dashboard_port @@ -203,6 +219,10 @@ async def async_main( await asyncio.sleep(kill_waiter_after) print("Killing the waiter...") cast(ActorMesh[Waiter], waiter).stop().get() + print( + "\nWaiter killed. Dashboard still running — press Ctrl+C to exit.", + flush=True, + ) await asyncio.sleep(float("inf")) except (KeyboardInterrupt, asyncio.CancelledError): pass