Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions python/examples/dining_philosophers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@
from enum import auto, Enum
from typing import Any, cast

import monarch.actor
from monarch._src.actor.actor_mesh import ActorMesh
from monarch._src.actor.host_mesh import _spawn_admin
from monarch.actor import Actor, current_rank, endpoint, this_host
from monarch.actor import Actor, current_rank, endpoint, MeshFailure, this_host
from monarch.distributed_telemetry.actor import start_telemetry


Expand Down Expand Up @@ -76,13 +77,19 @@ async def _request_chopsticks(self) -> None:
left, right = self._chopstick_indices()
self.left_status = ChopstickStatus.REQUESTED
self.right_status = ChopstickStatus.REQUESTED
await self.waiter.request_chopsticks.call_one(self.rank, left, right)
try:
await self.waiter.request_chopsticks.call_one(self.rank, left, right)
except Exception:
print(f"philosopher {self.rank}: waiter unreachable, stopping", flush=True)

async def _release_chopsticks(self) -> None:
left, right = self._chopstick_indices()
self.left_status = ChopstickStatus.NONE
self.right_status = ChopstickStatus.NONE
await self.waiter.release_chopsticks.call_one(left, right)
try:
await self.waiter.release_chopsticks.call_one(left, right)
except Exception:
print(f"philosopher {self.rank}: waiter unreachable, stopping", flush=True)

@endpoint
async def start(self, waiter: Any) -> None:
Expand Down Expand Up @@ -154,12 +161,21 @@ async def release_chopsticks(self, left: int, right: int) -> None:
NUM_PHILOSOPHERS = 5


def _swallow_fault(failure: MeshFailure) -> None:
"""Prevent unhandled actor faults from crashing the process."""
print(f"[fault suppressed] {str(failure)[:120]}...", flush=True)


async def async_main(
dashboard: bool = False,
dashboard_port: int = 8265,
kill_waiter_after: float | None = None,
) -> None:
telemetry_url = None
monarch.actor.unhandled_fault_hook = (
_swallow_fault # pyre-ignore[9]: monkey-patching module hook
)

if dashboard:
_, telemetry_url = start_telemetry(
include_dashboard=True, dashboard_port=dashboard_port
Expand Down Expand Up @@ -203,6 +219,10 @@ async def async_main(
await asyncio.sleep(kill_waiter_after)
print("Killing the waiter...")
cast(ActorMesh[Waiter], waiter).stop().get()
print(
"\nWaiter killed. Dashboard still running — press Ctrl+C to exit.",
flush=True,
)
await asyncio.sleep(float("inf"))
except (KeyboardInterrupt, asyncio.CancelledError):
pass
Expand Down
Loading