From d839c21225747542570928a4342a56ac6028b213 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 28 May 2026 20:38:10 +0800 Subject: [PATCH 01/11] feat: Initial scaffold for the v2 microservice operator CLI --- areal/__init__.py | 59 ++++--- areal/experimental/cli/__init__.py | 14 ++ areal/experimental/cli/commands/__init__.py | 1 + .../cli/commands/agent/__init__.py | 50 ++++++ .../experimental/cli/commands/inf/__init__.py | 44 +++++ .../cli/commands/train/__init__.py | 51 ++++++ .../cli/commands/weight_update/__init__.py | 55 +++++++ areal/experimental/cli/main.py | 84 ++++++++++ areal/experimental/cli/state.py | 106 ++++++++++++ pyproject.toml | 13 ++ tests/experimental/test_cli_lightness.py | 154 ++++++++++++++++++ 11 files changed, 612 insertions(+), 19 deletions(-) create mode 100644 areal/experimental/cli/__init__.py create mode 100644 areal/experimental/cli/commands/__init__.py create mode 100644 areal/experimental/cli/commands/agent/__init__.py create mode 100644 areal/experimental/cli/commands/inf/__init__.py create mode 100644 areal/experimental/cli/commands/train/__init__.py create mode 100644 areal/experimental/cli/commands/weight_update/__init__.py create mode 100644 areal/experimental/cli/main.py create mode 100644 areal/experimental/cli/state.py create mode 100644 tests/experimental/test_cli_lightness.py diff --git a/areal/__init__.py b/areal/__init__.py index 42be212bdb..561abd5204 100644 --- a/areal/__init__.py +++ b/areal/__init__.py @@ -4,28 +4,49 @@ from .version import __version__ # noqa -from .infra import ( - RolloutController, - StalenessManager, - TrainController, - WorkflowExecutor, - current_platform, - workflow_context, -) + +# Heavy submodules (infra, trainer) are loaded lazily via PEP 562 +# ``__getattr__`` so that ``import areal`` stays light. This is the load- +# bearing precondition for the ``areal`` console-script's lightness +# invariant (see ``tests/experimental/test_cli_lightness.py``): importing +# ``areal.experimental.cli.main`` transitively runs ``areal/__init__.py``, +# and if any eager top-level import here pulled in torch / ray / megatron +# / fastapi, the CLI could no longer be installed on a login node without +# the training stack. +# +# Backwards-compat: ``areal.RolloutController`` etc. still work because +# attribute access triggers ``__getattr__``; only bare ``import areal`` +# changes behavior (now light). +_INFRA_NAMES = frozenset({ + "RolloutController", + "StalenessManager", + "TrainController", + "WorkflowExecutor", + "current_platform", + "workflow_context", +}) + +_TRAINER_NAMES = frozenset({ + "DPOTrainer", + "PPOTrainer", + "RWTrainer", + "SFTTrainer", +}) def __getattr__(name: str): - if name in ("DPOTrainer", "PPOTrainer", "RWTrainer", "SFTTrainer"): - from .trainer import DPOTrainer, PPOTrainer, RWTrainer, SFTTrainer - - _map = { - "DPOTrainer": DPOTrainer, - "PPOTrainer": PPOTrainer, - "RWTrainer": RWTrainer, - "SFTTrainer": SFTTrainer, - } - globals().update(_map) - return _map[name] + if name in _INFRA_NAMES: + from . import infra as _infra + + value = getattr(_infra, name) + globals()[name] = value + return value + if name in _TRAINER_NAMES: + from . import trainer as _trainer + + value = getattr(_trainer, name) + globals()[name] = value + return value raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/areal/experimental/cli/__init__.py b/areal/experimental/cli/__init__.py new file mode 100644 index 0000000000..cdcaf0e367 --- /dev/null +++ b/areal/experimental/cli/__init__.py @@ -0,0 +1,14 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""AReaL operator CLI — companion to the v2 microservice control plane. + +This package exposes a single ``areal`` console-script that drives the v2 +service gateways (inference / agent / training / weight-update) from a +shell, rather than from a Python script that has to instantiate the +matching controller. It is intentionally light at import time so that +adding a verb in a follow-up PR does not pull torch / ray / megatron / +sglang / vllm into the parser-construction path. + +The full per-verb design surface is tracked in the upstream design +discussion issue. +""" diff --git a/areal/experimental/cli/commands/__init__.py b/areal/experimental/cli/commands/__init__.py new file mode 100644 index 0000000000..9881313609 --- /dev/null +++ b/areal/experimental/cli/commands/__init__.py @@ -0,0 +1 @@ +# SPDX-License-Identifier: Apache-2.0 diff --git a/areal/experimental/cli/commands/agent/__init__.py b/areal/experimental/cli/commands/agent/__init__.py new file mode 100644 index 0000000000..3d8521f1e7 --- /dev/null +++ b/areal/experimental/cli/commands/agent/__init__.py @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal agent`` — agent service operator console (scaffold). + +Drives an agent service (gateway + router + N (worker, data-proxy) pairs) +for session-centric operator and debugging work. No verbs are implemented +in this scaffold release; this module only reserves the ``areal agent`` +command name and tells the user what is coming. + +The agent CLI is session-centric (not model-centric like ``areal inf``). +Sessions can negotiate an RL session key with a configured inference +service when they start, enabling online RL trajectory tracking. +""" + +from __future__ import annotations + +import argparse + + +_DESCRIPTION = """\ +Operate an agent service: gateway + router + (worker, data-proxy) pairs. +Session-centric: the primary unit of interaction is an agent session, +not a model. + +NO VERBS IMPLEMENTED YET. This namespace currently only reserves the +`areal agent ...` command surface. + +Planned verb surface (flag matrices live in the design discussion issue): + run launch router + N pairs + gateway + stop tear them down + status health for one service + ps list locally known services + logs show gateway / router / worker / data-proxy logs + +State lives under ~/.areal/agent/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "agent", + help="Operate an agent service (scaffold — no verbs yet).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/inf/__init__.py b/areal/experimental/cli/commands/inf/__init__.py new file mode 100644 index 0000000000..f860517890 --- /dev/null +++ b/areal/experimental/cli/commands/inf/__init__.py @@ -0,0 +1,44 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal inf`` — inference service operator console (scaffold). + +Drives an inference service (gateway + router + optional model backends) +for day-to-day operator and debugging work. No verbs are implemented in +this scaffold release; this module only reserves the ``areal inf`` +command name and tells the user what is coming. +""" + +from __future__ import annotations + +import argparse + + +_DESCRIPTION = """\ +Operate an inference service: gateway + router + optional model backends. + +NO VERBS IMPLEMENTED YET. This namespace currently only reserves the +`areal inf ...` command surface. + +Planned verb surface (flag matrices live in the design discussion issue): + run launch gateway + router (optionally with --model inline) + stop tear them down + status health for one service + ps list locally known services + logs show gateway / router / model logs + +State lives under ~/.areal/inf/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "inf", + help="Operate an inference service (scaffold — no verbs yet).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/train/__init__.py b/areal/experimental/cli/commands/train/__init__.py new file mode 100644 index 0000000000..dab09229c5 --- /dev/null +++ b/areal/experimental/cli/commands/train/__init__.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal train`` — training job submitter (scaffold). + +Wraps the launch lifecycle of a training driver. Unlike ``areal inf`` / +``areal agent`` (which manage long-running services), ``areal train`` +treats each run as a job: it terminates, and the CLI's job is purely +lifecycle wrapping. The scheduling decision (local / slurm / ray) stays +inside the driver, decided by ``config.scheduler.type`` — the CLI does +not pick a scheduler. + +No verbs are implemented in this scaffold release. +""" + +from __future__ import annotations + +import argparse + + +_DESCRIPTION = """\ +Submit and observe training jobs. Job-shaped (terminates), not +service-shaped — the CLI wraps lifecycle only and does not choose the +scheduler (that decision lives in the driver via config.scheduler.type). + +NO VERBS IMPLEMENTED YET. This namespace currently only reserves the +`areal train ...` command surface. + +Planned verb surface (flag matrices live in the design discussion issue): + run run a driver in the foreground (small jobs, debugging) + start spawn a detached driver process (cluster jobs) + stop signal a running job by name + ps list locally tracked jobs + status status of one job + logs tail a job's combined stdout/stderr + +State lives under ~/.areal/train/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "train", + help="Submit and observe training jobs (scaffold — no verbs yet).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/weight_update/__init__.py b/areal/experimental/cli/commands/weight_update/__init__.py new file mode 100644 index 0000000000..b1e96c0e27 --- /dev/null +++ b/areal/experimental/cli/commands/weight_update/__init__.py @@ -0,0 +1,55 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal weight-update`` — weight-sync diagnostic console (scaffold). + +Drives the weight-update service that sits between training and +inference. The operator-facing surface is small and diagnostic-only: +humans don't invoke `/connect` / `/update_weights` / `/disconnect` +directly during normal use — those are called by adapter code inside +the training and inference engines. The CLI's job is to show whether +the sync is healthy, which (train, inference) pairs are connected, and +where the logs are. + +No verbs are implemented in this scaffold release. + +The CLI-surface namespace is ``weight-update`` (hyphenated, matching +the v2 service naming). The Python module is ``weight_update`` because +identifiers can't contain hyphens. +""" + +from __future__ import annotations + +import argparse + + +_DESCRIPTION = """\ +Diagnose the weight-update service that bridges training and inference. + +NO VERBS IMPLEMENTED YET. This namespace currently only reserves the +`areal weight-update ...` command surface. + +Planned verb surface (flag matrices live in the design discussion issue): + status is the gateway alive? how many pairs are connected? + ps list locally known weight-update services + logs tail the gateway log + +Note: there is no `run` verb in the first cut — in the v2 flow the +weight-update gateway is brought up by the training-side controller, +not by the operator. + +State lives under ~/.areal/weight-update/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "weight-update", + help="Diagnose weight-sync state (scaffold — no verbs yet).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/main.py b/areal/experimental/cli/main.py new file mode 100644 index 0000000000..1608a7a229 --- /dev/null +++ b/areal/experimental/cli/main.py @@ -0,0 +1,84 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Top-level entry point for the ``areal`` console-script. + +This module wires the four sub-CLI namespaces (`inf`, `agent`, `train`, +`weight-update`) into a single argparse tree. Each namespace lives under +``areal/experimental/cli/commands//`` and exports an +``add_parser(subparsers)`` function; this file imports them and registers +them. No verb behavior is implemented at this level. + +The import path is kept deliberately light: only stdlib and the namespace +``__init__`` modules are touched here. Heavy dependencies (torch, ray, +megatron, sglang, vllm, fastapi, …) must never appear on the import path +that ``areal --help`` triggers — the invariant is locked by +``tests/experimental/test_cli_lightness.py``. +""" + +from __future__ import annotations + +import argparse +import sys + +from areal.version import __version__ + +from areal.experimental.cli.commands import agent as cmd_agent +from areal.experimental.cli.commands import inf as cmd_inf +from areal.experimental.cli.commands import train as cmd_train +from areal.experimental.cli.commands import weight_update as cmd_weight_update + + +_DESCRIPTION = """\ +AReaL operator CLI for the v2 microservice architecture. + +Each namespace drives one of the v2 services. Verbs land incrementally; +no verbs are implemented in this scaffold release — each namespace's +--help describes its planned surface and points at the design discussion. + +Namespaces: + inf Operate an inference service (gateway + router + models) + agent Operate an agent service (gateway + router + sessions) + train Submit and observe training jobs + weight-update Diagnose weight-sync state between train and inference + +Run `areal --help` for what's planned (and what's available +today). State files for each namespace live under ~/.areal//. +""" + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="areal", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--version", + action="version", + version=f"areal {__version__}", + ) + subparsers = parser.add_subparsers( + dest="namespace", + required=True, + metavar="NAMESPACE", + ) + cmd_inf.add_parser(subparsers) + cmd_agent.add_parser(subparsers) + cmd_train.add_parser(subparsers) + cmd_weight_update.add_parser(subparsers) + return parser + + +def cli(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv if argv is not None else sys.argv[1:]) + func = getattr(args, "func", None) + if func is None: + parser.print_help() + return 2 + result = func(args) + return int(result) if isinstance(result, int) else 0 + + +if __name__ == "__main__": + sys.exit(cli()) diff --git a/areal/experimental/cli/state.py b/areal/experimental/cli/state.py new file mode 100644 index 0000000000..66a36a20cc --- /dev/null +++ b/areal/experimental/cli/state.py @@ -0,0 +1,106 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Cross-cutting state helpers shared by every sub-CLI. + +The AReaL CLI persists a small amount of local state under ``~/.areal/`` +(overridable with ``$AREAL_HOME``). Each sub-CLI owns a subdirectory with +the same internal shape, so commands can find services and jobs across +invocations without a background daemon: + + ~/.areal/ + ├── inf/ + │ ├── current-service + │ ├── services/.json + │ └── logs// + ├── agent/ + │ ├── current-service + │ ├── services/.json + │ └── logs// + ├── train/ + │ ├── current-run + │ ├── runs/.json + │ └── logs// + └── weight-update/ + ├── current-service + ├── services/.json + └── logs// + +The helpers in this module are deliberately tiny — atomic file write, +PID liveness, and the ``~/.areal/`` resolver. Per-namespace dataclasses +(``ServiceState``, ``RunState``, etc.) live alongside the verb files +that introduce them, not here. + +This module must stay import-light. Do NOT add dependencies on torch, +aiohttp, fastapi, or any other heavy package — the lightness guard test +in ``tests/experimental/test_cli_lightness.py`` will reject the change. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + + +def areal_home() -> Path: + """Return the root state directory (``$AREAL_HOME`` or ``~/.areal``). + + Creates the directory if it doesn't exist. Sub-CLIs derive their own + subdirectories from here; they should never bypass this function and + hardcode ``~/.areal`` themselves. + """ + env = os.environ.get("AREAL_HOME") + root = Path(env).expanduser() if env else Path.home() / ".areal" + root.mkdir(parents=True, exist_ok=True) + return root + + +def namespace_dir(namespace: str) -> Path: + """Return ``~/.areal//`` (created on demand). + + ``namespace`` is the on-disk directory name and follows the CLI + verb prefix (``inf``, ``agent``, ``train``, ``weight-update``). The + hyphenated form is used on disk to match the CLI surface exactly. + """ + d = areal_home() / namespace + d.mkdir(parents=True, exist_ok=True) + return d + + +def pid_alive(pid: int) -> bool: + """Cheap liveness probe: does *pid* still exist on this host? + + Uses ``kill(pid, 0)`` which signals nothing but raises ``ProcessLookupError`` + if the process is gone. Returns ``False`` for ``pid <= 0``. Note: a live + PID does not mean the service is healthy — pair this with an HTTP probe + when callers need real health status. + """ + if pid <= 0: + return False + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + # The process exists but is owned by another user. + return True + return True + + +def atomic_write_text(path: Path, content: str) -> None: + """Write *content* to *path* atomically (write-temp + rename). + + Prevents readers from observing a partially written file when two CLI + invocations race on the same state file. + """ + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with open(tmp, "w") as f: + f.write(content) + os.replace(tmp, path) + + +def atomic_write_json(path: Path, data: Any, *, indent: int = 2) -> None: + """Convenience wrapper: serialize *data* to JSON and write atomically.""" + atomic_write_text(path, json.dumps(data, indent=indent) + "\n") diff --git a/pyproject.toml b/pyproject.toml index 1d90e3e717..0f1ea7f120 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -183,6 +183,19 @@ cuda = [ sandbox = [ "daytona>=0.167.0", ] +# Minimal extra for the lightweight `areal` CLI (login-node install). The CLI +# must NOT pull in torch / transformers / sglang / vllm / ray / megatron / ...; +# the lightness guard test in `tests/experimental/test_cli_lightness.py` +# enforces this invariant. The CLI only depends on PyYAML (to peek at config +# files for driver / scheduler resolution) and colorlog (for the project's +# colored loggers). +cli = [ + "PyYAML", + "colorlog", +] + +[project.scripts] +areal = "areal.experimental.cli.main:cli" [project.urls] "Homepage" = "https://github.com/areal-project/AReaL" diff --git a/tests/experimental/test_cli_lightness.py b/tests/experimental/test_cli_lightness.py new file mode 100644 index 0000000000..59a5cfb453 --- /dev/null +++ b/tests/experimental/test_cli_lightness.py @@ -0,0 +1,154 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Guard test: ``areal.experimental.cli`` must stay import-light. + +The ``areal`` console-script is intended to be installable on a login +node (``pip install areal[cli]``) without dragging in the full training +stack (torch, transformers, sglang/vllm, ray, megatron, ...). This test +spawns a fresh subprocess, imports the CLI entrypoint, and asserts that +no module from the known-heavy list ends up in ``sys.modules``. + +Run in a fresh subprocess (not via ``importlib`` in the test process) so +that accidental imports done elsewhere in the pytest session do not mask +leaks. If you add a verb that needs a heavy dep, do the import inside +``_handle`` — never at module top level. +""" + +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[2] + +# Top-level package names that the CLI scaffold must NOT cause to be +# imported. Picked from pyproject.toml's heavy deps: training / inference +# backends, web servers, experiment trackers, and large transformer +# libraries. Also blocks AReaL's own heavy subpackages — the scaffold +# must not transitively load them either. +FORBIDDEN_TOP_LEVEL = { + # Deep-learning runtimes + "torch", + "torchvision", + "torchaudio", + "transformers", + # Inference backends + "sglang", + "vllm", + # Distributed runtime / training-stack hangers-on + "ray", + "megatron", + "mbridge", + "flash_attn", + "kernels", + "tilelang", + "modelopt", + # Web frameworks / async HTTP + "aiohttp", + "fastapi", + "uvicorn", + # Experiment trackers + "wandb", + "tensorboardx", + "swanlab", + "swanboard", + "trackio", + # Data / numerical + "datasets", + "peft", + "pandas", + "matplotlib", + "seaborn", + "numba", + "h5py", + "blosc", + "huggingface_hub", + # External LLM SDKs + "openai", + "anthropic", + "litellm", + "qwen_agent", + "openai_agents", + "claude_agent_sdk", + "openhands", + "langchain", + # CUDA / GPU stacks + "nvidia", + "cupy", + "triton", + # AReaL's own heavy subpackages — CLI must not transitively load them. + "areal.infra", + "areal.engine", + "areal.trainer", + "areal.workflow", + "areal.dataset", + "areal.reward", + "areal.api", +} + + +def _modules_after(import_stmt: str) -> set[str]: + """Spawn a fresh interpreter, run *import_stmt*, return ``sys.modules`` keys.""" + code = ( + "import sys, json\n" + f"{import_stmt}\n" + "print(json.dumps(sorted(sys.modules.keys())))\n" + ) + out = subprocess.check_output( + [sys.executable, "-c", code], + cwd=str(REPO_ROOT), + ) + last_line = out.decode().strip().splitlines()[-1] + return set(json.loads(last_line)) + + +def _leaks(modules: set[str]) -> set[str]: + leaked: set[str] = set() + for m in modules: + for f in FORBIDDEN_TOP_LEVEL: + # Exact match on a forbidden package (e.g. "areal.infra"), or + # any descendant (e.g. "areal.infra.launcher"). + if m == f or m.startswith(f + "."): + leaked.add(m) + break + return leaked + + +def test_cli_main_module_is_light(): + """Importing the CLI entry point must not load any heavy backend.""" + mods = _modules_after("import areal.experimental.cli.main") + leaked = _leaks(mods) + assert not leaked, ( + f"`import areal.experimental.cli.main` leaked heavy modules: " + f"{sorted(leaked)}" + ) + + +def test_build_parser_is_light(): + """Building the argparse tree must not load any heavy backend either.""" + mods = _modules_after( + "from areal.experimental.cli.main import build_parser\n" + "build_parser()" + ) + leaked = _leaks(mods) + assert not leaked, ( + f"`build_parser()` leaked heavy modules: {sorted(leaked)}" + ) + + +def test_each_namespace_help_is_light(): + """Triggering each namespace's --help path must stay light.""" + for ns in ("inf", "agent", "train", "weight-update"): + mods = _modules_after( + "from areal.experimental.cli.main import cli\n" + f"try:\n" + f" cli(['{ns}', '--help'])\n" + f"except SystemExit:\n" + f" pass\n" + ) + leaked = _leaks(mods) + assert not leaked, ( + f"`areal {ns} --help` leaked heavy modules: {sorted(leaked)}" + ) From a525e340c728020196fca774a3f1d38c674874ce Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 28 May 2026 21:10:48 +0800 Subject: [PATCH 02/11] chore: update scaffold --- areal/__init__.py | 12 -- tests/experimental/test_cli_lightness.py | 154 ----------------------- 2 files changed, 166 deletions(-) delete mode 100644 tests/experimental/test_cli_lightness.py diff --git a/areal/__init__.py b/areal/__init__.py index 561abd5204..44d703638d 100644 --- a/areal/__init__.py +++ b/areal/__init__.py @@ -5,18 +5,6 @@ from .version import __version__ # noqa -# Heavy submodules (infra, trainer) are loaded lazily via PEP 562 -# ``__getattr__`` so that ``import areal`` stays light. This is the load- -# bearing precondition for the ``areal`` console-script's lightness -# invariant (see ``tests/experimental/test_cli_lightness.py``): importing -# ``areal.experimental.cli.main`` transitively runs ``areal/__init__.py``, -# and if any eager top-level import here pulled in torch / ray / megatron -# / fastapi, the CLI could no longer be installed on a login node without -# the training stack. -# -# Backwards-compat: ``areal.RolloutController`` etc. still work because -# attribute access triggers ``__getattr__``; only bare ``import areal`` -# changes behavior (now light). _INFRA_NAMES = frozenset({ "RolloutController", "StalenessManager", diff --git a/tests/experimental/test_cli_lightness.py b/tests/experimental/test_cli_lightness.py deleted file mode 100644 index 59a5cfb453..0000000000 --- a/tests/experimental/test_cli_lightness.py +++ /dev/null @@ -1,154 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 - -"""Guard test: ``areal.experimental.cli`` must stay import-light. - -The ``areal`` console-script is intended to be installable on a login -node (``pip install areal[cli]``) without dragging in the full training -stack (torch, transformers, sglang/vllm, ray, megatron, ...). This test -spawns a fresh subprocess, imports the CLI entrypoint, and asserts that -no module from the known-heavy list ends up in ``sys.modules``. - -Run in a fresh subprocess (not via ``importlib`` in the test process) so -that accidental imports done elsewhere in the pytest session do not mask -leaks. If you add a verb that needs a heavy dep, do the import inside -``_handle`` — never at module top level. -""" - -from __future__ import annotations - -import json -import subprocess -import sys -from pathlib import Path - -REPO_ROOT = Path(__file__).resolve().parents[2] - -# Top-level package names that the CLI scaffold must NOT cause to be -# imported. Picked from pyproject.toml's heavy deps: training / inference -# backends, web servers, experiment trackers, and large transformer -# libraries. Also blocks AReaL's own heavy subpackages — the scaffold -# must not transitively load them either. -FORBIDDEN_TOP_LEVEL = { - # Deep-learning runtimes - "torch", - "torchvision", - "torchaudio", - "transformers", - # Inference backends - "sglang", - "vllm", - # Distributed runtime / training-stack hangers-on - "ray", - "megatron", - "mbridge", - "flash_attn", - "kernels", - "tilelang", - "modelopt", - # Web frameworks / async HTTP - "aiohttp", - "fastapi", - "uvicorn", - # Experiment trackers - "wandb", - "tensorboardx", - "swanlab", - "swanboard", - "trackio", - # Data / numerical - "datasets", - "peft", - "pandas", - "matplotlib", - "seaborn", - "numba", - "h5py", - "blosc", - "huggingface_hub", - # External LLM SDKs - "openai", - "anthropic", - "litellm", - "qwen_agent", - "openai_agents", - "claude_agent_sdk", - "openhands", - "langchain", - # CUDA / GPU stacks - "nvidia", - "cupy", - "triton", - # AReaL's own heavy subpackages — CLI must not transitively load them. - "areal.infra", - "areal.engine", - "areal.trainer", - "areal.workflow", - "areal.dataset", - "areal.reward", - "areal.api", -} - - -def _modules_after(import_stmt: str) -> set[str]: - """Spawn a fresh interpreter, run *import_stmt*, return ``sys.modules`` keys.""" - code = ( - "import sys, json\n" - f"{import_stmt}\n" - "print(json.dumps(sorted(sys.modules.keys())))\n" - ) - out = subprocess.check_output( - [sys.executable, "-c", code], - cwd=str(REPO_ROOT), - ) - last_line = out.decode().strip().splitlines()[-1] - return set(json.loads(last_line)) - - -def _leaks(modules: set[str]) -> set[str]: - leaked: set[str] = set() - for m in modules: - for f in FORBIDDEN_TOP_LEVEL: - # Exact match on a forbidden package (e.g. "areal.infra"), or - # any descendant (e.g. "areal.infra.launcher"). - if m == f or m.startswith(f + "."): - leaked.add(m) - break - return leaked - - -def test_cli_main_module_is_light(): - """Importing the CLI entry point must not load any heavy backend.""" - mods = _modules_after("import areal.experimental.cli.main") - leaked = _leaks(mods) - assert not leaked, ( - f"`import areal.experimental.cli.main` leaked heavy modules: " - f"{sorted(leaked)}" - ) - - -def test_build_parser_is_light(): - """Building the argparse tree must not load any heavy backend either.""" - mods = _modules_after( - "from areal.experimental.cli.main import build_parser\n" - "build_parser()" - ) - leaked = _leaks(mods) - assert not leaked, ( - f"`build_parser()` leaked heavy modules: {sorted(leaked)}" - ) - - -def test_each_namespace_help_is_light(): - """Triggering each namespace's --help path must stay light.""" - for ns in ("inf", "agent", "train", "weight-update"): - mods = _modules_after( - "from areal.experimental.cli.main import cli\n" - f"try:\n" - f" cli(['{ns}', '--help'])\n" - f"except SystemExit:\n" - f" pass\n" - ) - leaked = _leaks(mods) - assert not leaked, ( - f"`areal {ns} --help` leaked heavy modules: {sorted(leaked)}" - ) From 46646a8ea0cb078a7b6833bb02f22df556fec28e Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 28 May 2026 23:08:47 +0800 Subject: [PATCH 03/11] chore: mirror [cli] extra and console-script into pyproject.vllm.toml --- pyproject.vllm.toml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pyproject.vllm.toml b/pyproject.vllm.toml index 4ab022c367..a79368b0c1 100644 --- a/pyproject.vllm.toml +++ b/pyproject.vllm.toml @@ -194,6 +194,19 @@ cuda = [ sandbox = [ "daytona>=0.167.0", ] +# Minimal extra for the lightweight `areal` CLI (login-node install). The CLI +# must NOT pull in torch / transformers / sglang / vllm / ray / megatron / ...; +# the lightness guard test in `tests/experimental/test_cli_lightness.py` +# enforces this invariant. The CLI only depends on PyYAML (to peek at config +# files for driver / scheduler resolution) and colorlog (for the project's +# colored loggers). +cli = [ + "PyYAML", + "colorlog", +] + +[project.scripts] +areal = "areal.experimental.cli.main:cli" [project.urls] "Homepage" = "https://github.com/areal-project/AReaL" From 822d43ee0ed6dd92a79175c062857d6dabaf7600 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Fri, 29 May 2026 14:11:40 +0800 Subject: [PATCH 04/11] chore: regenerate uv lockfiles after [cli] extra --- uv.lock | 8 +++++++- uv.vllm.lock | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index e468608cc3..db1652123f 100644 --- a/uv.lock +++ b/uv.lock @@ -400,6 +400,10 @@ dependencies = [ ] [package.optional-dependencies] +cli = [ + { name = "colorlog", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "pyyaml", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, +] cuda = [ { name = "flash-linear-attention", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, { name = "kernels", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, @@ -482,6 +486,7 @@ requires-dist = [ { name = "claude-agent-sdk" }, { name = "colorama" }, { name = "colorlog" }, + { name = "colorlog", marker = "extra == 'cli'" }, { name = "cookiecutter", specifier = ">2.1.1" }, { name = "datasets", specifier = ">=3.0.0" }, { name = "daytona", marker = "extra == 'sandbox'", specifier = ">=0.167.0" }, @@ -534,6 +539,7 @@ requires-dist = [ { name = "python-debian", specifier = ">=0.1.49" }, { name = "python-dotenv" }, { name = "pyyaml" }, + { name = "pyyaml", marker = "extra == 'cli'" }, { name = "pyzmq" }, { name = "qwen-agent" }, { name = "ray", extras = ["default"] }, @@ -573,7 +579,7 @@ requires-dist = [ { name = "word2number" }, { name = "zstandard" }, ] -provides-extras = ["sglang", "tms", "kernels", "megatron", "cuda-train", "cuda", "sandbox"] +provides-extras = ["sglang", "tms", "kernels", "megatron", "cuda-train", "cuda", "sandbox", "cli"] [package.metadata.requires-dev] dev = [ diff --git a/uv.vllm.lock b/uv.vllm.lock index 9e9b587e61..dab9badc06 100644 --- a/uv.vllm.lock +++ b/uv.vllm.lock @@ -448,6 +448,10 @@ dependencies = [ ] [package.optional-dependencies] +cli = [ + { name = "colorlog", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "pyyaml", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, +] cuda = [ { name = "flash-linear-attention", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" }, { name = "kernels", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'x86_64' and sys_platform == 'darwin') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, @@ -528,6 +532,7 @@ requires-dist = [ { name = "claude-agent-sdk" }, { name = "colorama" }, { name = "colorlog" }, + { name = "colorlog", marker = "extra == 'cli'" }, { name = "cookiecutter", specifier = ">2.1.1" }, { name = "datasets", specifier = ">=3.0.0" }, { name = "daytona", marker = "extra == 'sandbox'", specifier = ">=0.167.0" }, @@ -579,6 +584,7 @@ requires-dist = [ { name = "python-debian", specifier = ">=0.1.49" }, { name = "python-dotenv" }, { name = "pyyaml" }, + { name = "pyyaml", marker = "extra == 'cli'" }, { name = "pyzmq" }, { name = "qwen-agent" }, { name = "ray", extras = ["default"] }, @@ -619,7 +625,7 @@ requires-dist = [ { name = "word2number" }, { name = "zstandard" }, ] -provides-extras = ["vllm", "tms", "kernels", "megatron", "cuda-train", "cuda", "sandbox"] +provides-extras = ["vllm", "tms", "kernels", "megatron", "cuda-train", "cuda", "sandbox", "cli"] [package.metadata.requires-dev] dev = [ From ec73c8fdb027d625bec598cee68ef9998a125fb9 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Fri, 29 May 2026 14:15:25 +0800 Subject: [PATCH 05/11] fix: reformat --- areal/__init__.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/areal/__init__.py b/areal/__init__.py index 44d703638d..e6edb1a6a7 100644 --- a/areal/__init__.py +++ b/areal/__init__.py @@ -5,21 +5,25 @@ from .version import __version__ # noqa -_INFRA_NAMES = frozenset({ - "RolloutController", - "StalenessManager", - "TrainController", - "WorkflowExecutor", - "current_platform", - "workflow_context", -}) - -_TRAINER_NAMES = frozenset({ - "DPOTrainer", - "PPOTrainer", - "RWTrainer", - "SFTTrainer", -}) +_INFRA_NAMES = frozenset( + { + "RolloutController", + "StalenessManager", + "TrainController", + "WorkflowExecutor", + "current_platform", + "workflow_context", + } +) + +_TRAINER_NAMES = frozenset( + { + "DPOTrainer", + "PPOTrainer", + "RWTrainer", + "SFTTrainer", + } +) def __getattr__(name: str): From 9110c53bdf52272fb62c9aa29518739fae76bcde Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Fri, 29 May 2026 15:56:35 +0800 Subject: [PATCH 06/11] chore: ruff --fix on full repo --- areal/experimental/cli/commands/agent/__init__.py | 1 - areal/experimental/cli/commands/inf/__init__.py | 1 - areal/experimental/cli/commands/train/__init__.py | 1 - areal/experimental/cli/commands/weight_update/__init__.py | 1 - areal/experimental/cli/main.py | 4 +--- 5 files changed, 1 insertion(+), 7 deletions(-) diff --git a/areal/experimental/cli/commands/agent/__init__.py b/areal/experimental/cli/commands/agent/__init__.py index 3d8521f1e7..6b83926151 100644 --- a/areal/experimental/cli/commands/agent/__init__.py +++ b/areal/experimental/cli/commands/agent/__init__.py @@ -16,7 +16,6 @@ import argparse - _DESCRIPTION = """\ Operate an agent service: gateway + router + (worker, data-proxy) pairs. Session-centric: the primary unit of interaction is an agent session, diff --git a/areal/experimental/cli/commands/inf/__init__.py b/areal/experimental/cli/commands/inf/__init__.py index f860517890..71287a6fe1 100644 --- a/areal/experimental/cli/commands/inf/__init__.py +++ b/areal/experimental/cli/commands/inf/__init__.py @@ -12,7 +12,6 @@ import argparse - _DESCRIPTION = """\ Operate an inference service: gateway + router + optional model backends. diff --git a/areal/experimental/cli/commands/train/__init__.py b/areal/experimental/cli/commands/train/__init__.py index dab09229c5..94a87e1008 100644 --- a/areal/experimental/cli/commands/train/__init__.py +++ b/areal/experimental/cli/commands/train/__init__.py @@ -16,7 +16,6 @@ import argparse - _DESCRIPTION = """\ Submit and observe training jobs. Job-shaped (terminates), not service-shaped — the CLI wraps lifecycle only and does not choose the diff --git a/areal/experimental/cli/commands/weight_update/__init__.py b/areal/experimental/cli/commands/weight_update/__init__.py index b1e96c0e27..fa9de4eb44 100644 --- a/areal/experimental/cli/commands/weight_update/__init__.py +++ b/areal/experimental/cli/commands/weight_update/__init__.py @@ -21,7 +21,6 @@ import argparse - _DESCRIPTION = """\ Diagnose the weight-update service that bridges training and inference. diff --git a/areal/experimental/cli/main.py b/areal/experimental/cli/main.py index 1608a7a229..2bbd134b25 100644 --- a/areal/experimental/cli/main.py +++ b/areal/experimental/cli/main.py @@ -20,13 +20,11 @@ import argparse import sys -from areal.version import __version__ - from areal.experimental.cli.commands import agent as cmd_agent from areal.experimental.cli.commands import inf as cmd_inf from areal.experimental.cli.commands import train as cmd_train from areal.experimental.cli.commands import weight_update as cmd_weight_update - +from areal.version import __version__ _DESCRIPTION = """\ AReaL operator CLI for the v2 microservice architecture. From 74701ddd7827fd34d474a5a04df2ec1c06255a89 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Wed, 3 Jun 2026 16:19:19 +0800 Subject: [PATCH 07/11] chore: lift training verbs to top level Drop the `train` namespace and expose `run / start / stop / ps / status / logs` as top-level verbs directly under `areal`. Service-side surfaces (`inf`, `agent`, `weight-update`) stay namespaced. Asymmetric on purpose: training is the primary use case and benefits from shorter invocation (`areal run` vs `areal train run`), while service operator commands are auxiliary and read better in a namespace. Each new top-level verb is a parser-only stub (silent `return 0`), matching the same convention as the existing namespace stubs. State directory for training jobs is `~/.areal/runs/`. --- areal/experimental/cli/commands/logs.py | 34 ++++++++++ areal/experimental/cli/commands/ps.py | 33 ++++++++++ areal/experimental/cli/commands/run.py | 51 ++++++++++++++ areal/experimental/cli/commands/start.py | 38 +++++++++++ areal/experimental/cli/commands/status.py | 33 ++++++++++ areal/experimental/cli/commands/stop.py | 33 ++++++++++ .../cli/commands/train/__init__.py | 50 -------------- areal/experimental/cli/main.py | 66 ++++++++++++------- 8 files changed, 266 insertions(+), 72 deletions(-) create mode 100644 areal/experimental/cli/commands/logs.py create mode 100644 areal/experimental/cli/commands/ps.py create mode 100644 areal/experimental/cli/commands/run.py create mode 100644 areal/experimental/cli/commands/start.py create mode 100644 areal/experimental/cli/commands/status.py create mode 100644 areal/experimental/cli/commands/stop.py delete mode 100644 areal/experimental/cli/commands/train/__init__.py diff --git a/areal/experimental/cli/commands/logs.py b/areal/experimental/cli/commands/logs.py new file mode 100644 index 0000000000..090822adfc --- /dev/null +++ b/areal/experimental/cli/commands/logs.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal logs`` — tail a training job's stdout/stderr (scaffold).""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +Tail a training job's combined stdout/stderr. + +NO BEHAVIOR YET. Reserves the `areal logs` command name. + +Planned flags: + run_name Name of the run (positional, required). + -f, --follow Stream new log lines as they arrive. + -n, --lines N Number of recent lines to print initially (default: 200). + +Log files live under ~/.areal/runs//. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "logs", + help="Tail a training job's stdout/stderr (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/ps.py b/areal/experimental/cli/commands/ps.py new file mode 100644 index 0000000000..f3616b7d11 --- /dev/null +++ b/areal/experimental/cli/commands/ps.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal ps`` — list locally tracked training jobs (scaffold).""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +List locally tracked training jobs. + +NO BEHAVIOR YET. Reserves the `areal ps` command name. + +Planned flags: + --json Emit machine-readable JSON. + --all Include completed / failed runs (default: running only). + +State lives under ~/.areal/runs/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "ps", + help="List locally tracked training jobs (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/run.py b/areal/experimental/cli/commands/run.py new file mode 100644 index 0000000000..d9f7a26cdf --- /dev/null +++ b/areal/experimental/cli/commands/run.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal run`` — launch a training driver in the foreground (scaffold). + +Training is AReaL's primary use case, so the verbs that drive a training +job (`run`, `start`, `stop`, `ps`, `status`, `logs`) sit at the top +level instead of being nested under a `train` namespace. The service-side +operator surfaces (inference / agent / weight-update) stay namespaced. + +No verb behavior is implemented in this scaffold release; this module +only reserves the ``areal run`` command name. +""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +Launch a training driver in the foreground. + +NO BEHAVIOR YET. Reserves the `areal run` command name. + +Planned flags: + --config PATH Training YAML config (required). + --name NAME Override run name (default: derived from yaml). + --driver MOD:FUNC Override driver entry (default: yaml `driver:` field). + overrides... Hydra-style overrides forwarded to the driver. + +Companion verbs (also top-level): + areal start Spawn a detached driver process (background). + areal stop Signal a running job by name. + areal ps List locally tracked jobs. + areal status Status of one job. + areal logs Tail a job's combined stdout/stderr. + +State lives under ~/.areal/runs/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "run", + help="Launch a training driver in the foreground (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/start.py b/areal/experimental/cli/commands/start.py new file mode 100644 index 0000000000..1149f7a045 --- /dev/null +++ b/areal/experimental/cli/commands/start.py @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal start`` — spawn a detached training driver process (scaffold).""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +Spawn a detached training driver process (background). + +NO BEHAVIOR YET. Reserves the `areal start` command name. + +Planned flags: + --config PATH Training YAML config (required). + --name NAME Override run name. + --driver MOD:FUNC Override driver entry. + overrides... Hydra-style overrides forwarded to the driver. + +Use `areal run` for foreground execution. Use `areal ps`, `areal status`, +`areal logs`, and `areal stop` to manage the resulting background job. + +State lives under ~/.areal/runs/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "start", + help="Spawn a detached training driver process (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/status.py b/areal/experimental/cli/commands/status.py new file mode 100644 index 0000000000..416e2d02d8 --- /dev/null +++ b/areal/experimental/cli/commands/status.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal status`` — show status of one training job (scaffold).""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +Show status of one training job. + +NO BEHAVIOR YET. Reserves the `areal status` command name. + +Planned flags: + run_name Name of the run to inspect (positional, required). + --json Emit machine-readable JSON. + +State lives under ~/.areal/runs/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "status", + help="Show status of one training job (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/stop.py b/areal/experimental/cli/commands/stop.py new file mode 100644 index 0000000000..79b03cce02 --- /dev/null +++ b/areal/experimental/cli/commands/stop.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal stop`` — signal a running training job by name (scaffold).""" + +from __future__ import annotations + +import argparse + +_DESCRIPTION = """\ +Signal a running training job by name (SIGTERM, escalating to SIGKILL). + +NO BEHAVIOR YET. Reserves the `areal stop` command name. + +Planned flags: + run_name Name of the run to stop (positional, required). + --timeout SECONDS Grace period before SIGKILL (default: 15). + +State lives under ~/.areal/runs/. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "stop", + help="Signal a running training job by name (scaffold).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.set_defaults(func=_handle) + + +def _handle(_: argparse.Namespace) -> int: + return 0 diff --git a/areal/experimental/cli/commands/train/__init__.py b/areal/experimental/cli/commands/train/__init__.py deleted file mode 100644 index 94a87e1008..0000000000 --- a/areal/experimental/cli/commands/train/__init__.py +++ /dev/null @@ -1,50 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 - -"""``areal train`` — training job submitter (scaffold). - -Wraps the launch lifecycle of a training driver. Unlike ``areal inf`` / -``areal agent`` (which manage long-running services), ``areal train`` -treats each run as a job: it terminates, and the CLI's job is purely -lifecycle wrapping. The scheduling decision (local / slurm / ray) stays -inside the driver, decided by ``config.scheduler.type`` — the CLI does -not pick a scheduler. - -No verbs are implemented in this scaffold release. -""" - -from __future__ import annotations - -import argparse - -_DESCRIPTION = """\ -Submit and observe training jobs. Job-shaped (terminates), not -service-shaped — the CLI wraps lifecycle only and does not choose the -scheduler (that decision lives in the driver via config.scheduler.type). - -NO VERBS IMPLEMENTED YET. This namespace currently only reserves the -`areal train ...` command surface. - -Planned verb surface (flag matrices live in the design discussion issue): - run run a driver in the foreground (small jobs, debugging) - start spawn a detached driver process (cluster jobs) - stop signal a running job by name - ps list locally tracked jobs - status status of one job - logs tail a job's combined stdout/stderr - -State lives under ~/.areal/train/. -""" - - -def add_parser(subparsers: argparse._SubParsersAction) -> None: - p = subparsers.add_parser( - "train", - help="Submit and observe training jobs (scaffold — no verbs yet).", - description=_DESCRIPTION, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - p.set_defaults(func=_handle) - - -def _handle(_: argparse.Namespace) -> int: - return 0 diff --git a/areal/experimental/cli/main.py b/areal/experimental/cli/main.py index 2bbd134b25..41e2e5d977 100644 --- a/areal/experimental/cli/main.py +++ b/areal/experimental/cli/main.py @@ -2,17 +2,19 @@ """Top-level entry point for the ``areal`` console-script. -This module wires the four sub-CLI namespaces (`inf`, `agent`, `train`, -`weight-update`) into a single argparse tree. Each namespace lives under -``areal/experimental/cli/commands//`` and exports an -``add_parser(subparsers)`` function; this file imports them and registers -them. No verb behavior is implemented at this level. - -The import path is kept deliberately light: only stdlib and the namespace -``__init__`` modules are touched here. Heavy dependencies (torch, ray, -megatron, sglang, vllm, fastapi, …) must never appear on the import path -that ``areal --help`` triggers — the invariant is locked by -``tests/experimental/test_cli_lightness.py``. +This module wires the top-level training verbs (`run`, `start`, `stop`, +`ps`, `status`, `logs`) and the three service namespaces (`inf`, `agent`, +`weight-update`) into a single argparse tree. + +The shape is asymmetric on purpose: training is AReaL's primary use case, +so its verbs sit at the top level for ergonomic invocation (`areal run` +vs `areal train run`). Service-side operator commands stay namespaced +because they are auxiliary surfaces, not the main entry. + +The import path is kept deliberately light: only stdlib and the verb / +namespace modules are touched here. Verb implementations must defer +heavy imports (torch, ray, megatron, sglang, vllm, fastapi, ...) into +their ``_handle`` function bodies, never at module top level. """ from __future__ import annotations @@ -22,25 +24,38 @@ from areal.experimental.cli.commands import agent as cmd_agent from areal.experimental.cli.commands import inf as cmd_inf -from areal.experimental.cli.commands import train as cmd_train +from areal.experimental.cli.commands import logs as cmd_logs +from areal.experimental.cli.commands import ps as cmd_ps +from areal.experimental.cli.commands import run as cmd_run +from areal.experimental.cli.commands import start as cmd_start +from areal.experimental.cli.commands import status as cmd_status +from areal.experimental.cli.commands import stop as cmd_stop from areal.experimental.cli.commands import weight_update as cmd_weight_update from areal.version import __version__ _DESCRIPTION = """\ AReaL operator CLI for the v2 microservice architecture. -Each namespace drives one of the v2 services. Verbs land incrementally; -no verbs are implemented in this scaffold release — each namespace's ---help describes its planned surface and points at the design discussion. +Training is AReaL's primary use case and its verbs are top-level +(`areal run`, `areal start`, etc.). Service-side operator surfaces +(inference, agent, weight-update) live under their own namespaces. -Namespaces: +Top-level training verbs: + run Launch a training driver in the foreground + start Spawn a detached driver process (background) + stop Signal a running job by name + ps List locally tracked jobs + status Status of one job + logs Tail a job's combined stdout/stderr + +Service namespaces: inf Operate an inference service (gateway + router + models) agent Operate an agent service (gateway + router + sessions) - train Submit and observe training jobs weight-update Diagnose weight-sync state between train and inference -Run `areal --help` for what's planned (and what's available -today). State files for each namespace live under ~/.areal//. +Run `areal --help` for the planned surface of each verb. +Training state lives under ~/.areal/runs/. +Service state lives under ~/.areal//. """ @@ -56,13 +71,20 @@ def build_parser() -> argparse.ArgumentParser: version=f"areal {__version__}", ) subparsers = parser.add_subparsers( - dest="namespace", + dest="command", required=True, - metavar="NAMESPACE", + metavar="COMMAND", ) + # Top-level training verbs + cmd_run.add_parser(subparsers) + cmd_start.add_parser(subparsers) + cmd_stop.add_parser(subparsers) + cmd_ps.add_parser(subparsers) + cmd_status.add_parser(subparsers) + cmd_logs.add_parser(subparsers) + # Service namespaces cmd_inf.add_parser(subparsers) cmd_agent.add_parser(subparsers) - cmd_train.add_parser(subparsers) cmd_weight_update.add_parser(subparsers) return parser From 29fb793b8185b39f33a5c3d3f7ebf9a67cc8b90e Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Wed, 3 Jun 2026 22:43:16 +0800 Subject: [PATCH 08/11] feat(cli): implement areal run and areal inf run areal run: foreground training driver invoker. Resolves driver from --driver flag or yaml `driver:` field; forwards --config + Hydra overrides verbatim to the driver. Writes RunState to ~/.areal/runs/. areal inf run: detached v2 inference service launcher. Spawns gateway + router as detached subprocesses, polls /health, writes ServiceState to ~/.areal/inf/. CLI exits after the service is healthy. Both verbs lazy-import their implementations inside _handle so `areal --help` / `areal --help` stay light. --- .../experimental/cli/commands/inf/__init__.py | 29 +-- areal/experimental/cli/commands/inf/run.py | 86 +++++++ areal/experimental/cli/commands/run.py | 72 ++++-- areal/experimental/cli/gateway_client.py | 69 ++++++ areal/experimental/cli/inf_launcher.py | 232 ++++++++++++++++++ areal/experimental/cli/inf_state.py | 110 +++++++++ areal/experimental/cli/runner.py | 141 +++++++++++ areal/experimental/cli/state.py | 116 ++++----- 8 files changed, 752 insertions(+), 103 deletions(-) create mode 100644 areal/experimental/cli/commands/inf/run.py create mode 100644 areal/experimental/cli/gateway_client.py create mode 100644 areal/experimental/cli/inf_launcher.py create mode 100644 areal/experimental/cli/inf_state.py create mode 100644 areal/experimental/cli/runner.py diff --git a/areal/experimental/cli/commands/inf/__init__.py b/areal/experimental/cli/commands/inf/__init__.py index 71287a6fe1..c29cd57fab 100644 --- a/areal/experimental/cli/commands/inf/__init__.py +++ b/areal/experimental/cli/commands/inf/__init__.py @@ -1,29 +1,20 @@ # SPDX-License-Identifier: Apache-2.0 -"""``areal inf`` — inference service operator console (scaffold). - -Drives an inference service (gateway + router + optional model backends) -for day-to-day operator and debugging work. No verbs are implemented in -this scaffold release; this module only reserves the ``areal inf`` -command name and tells the user what is coming. -""" +"""``areal inf`` — inference service operator console.""" from __future__ import annotations import argparse + _DESCRIPTION = """\ Operate an inference service: gateway + router + optional model backends. -NO VERBS IMPLEMENTED YET. This namespace currently only reserves the -`areal inf ...` command surface. +Implemented verbs: + run Launch the gateway+router stack (detached). -Planned verb surface (flag matrices live in the design discussion issue): - run launch gateway + router (optionally with --model inline) - stop tear them down - status health for one service - ps list locally known services - logs show gateway / router / model logs +Planned (not yet implemented): + stop / status / ps / register / deregister / models / logs State lives under ~/.areal/inf/. """ @@ -32,12 +23,12 @@ def add_parser(subparsers: argparse._SubParsersAction) -> None: p = subparsers.add_parser( "inf", - help="Operate an inference service (scaffold — no verbs yet).", + help="Operate an inference service.", description=_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, ) - p.set_defaults(func=_handle) + sub = p.add_subparsers(dest="verb", required=True, metavar="VERB") + from areal.experimental.cli.commands.inf import run as cmd_run -def _handle(_: argparse.Namespace) -> int: - return 0 + cmd_run.add_parser(sub) diff --git a/areal/experimental/cli/commands/inf/run.py b/areal/experimental/cli/commands/inf/run.py new file mode 100644 index 0000000000..a5476902ca --- /dev/null +++ b/areal/experimental/cli/commands/inf/run.py @@ -0,0 +1,86 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""``areal inf run`` — launch the v2 inference service (detached).""" + +from __future__ import annotations + +import argparse +import time + + +_DESCRIPTION = """\ +Spawn the v2 inference gateway + router as detached subprocesses, wait +for HTTP /health, persist state under ~/.areal/inf/, and exit. + +The CLI process exits after the service is healthy; the gateway and +router keep running. Later commands (stop / status / ps / logs / ...) +reconcile via state + PID + HTTP. +""" + + +def add_parser(subparsers: argparse._SubParsersAction) -> None: + p = subparsers.add_parser( + "run", + help="Launch the inference service (detached).", + description=_DESCRIPTION, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + p.add_argument("--service", default="default", help="Service instance name.") + p.add_argument("--gateway-host", default="127.0.0.1") + p.add_argument("--gateway-port", type=int, default=8080) + p.add_argument("--router-host", default="127.0.0.1") + p.add_argument("--router-port", type=int, default=8081) + p.add_argument("--admin-api-key", default="areal-admin-key") + p.add_argument( + "--routing-strategy", default="round_robin", + choices=["round_robin", "least_busy"], + ) + p.add_argument("--poll-interval", type=float, default=5.0) + p.add_argument("--router-timeout", type=float, default=2.0) + p.add_argument("--forward-timeout", type=float, default=120.0) + p.add_argument( + "--log-level", default="info", + choices=["debug", "info", "warning", "error"], + ) + p.add_argument("--launch-timeout", type=float, default=30.0) + p.add_argument( + "--force", action="store_true", + help="Replace an existing healthy instance with the same name.", + ) + p.set_defaults(func=_handle) + + +def _handle(args: argparse.Namespace) -> int: + from areal.experimental.cli.inf_launcher import start_service + from areal.experimental.cli.inf_state import ( + get_current_service, + service_logs_dir, + set_current_service, + ) + + state = start_service( + name=args.service, + gateway_host=args.gateway_host, + gateway_port=args.gateway_port, + router_host=args.router_host, + router_port=args.router_port, + admin_api_key=args.admin_api_key, + routing_strategy=args.routing_strategy, + poll_interval=args.poll_interval, + router_timeout=args.router_timeout, + forward_timeout=args.forward_timeout, + log_level=args.log_level, + force=args.force, + launch_timeout=args.launch_timeout, + ) + + if get_current_service() is None: + set_current_service(state.name) + + logs = service_logs_dir(state.name) + print(f"Started service {state.name!r}.") + print(f" gateway: {state.gateway_url} (pid {state.gateway_pid})") + print(f" router: {state.router_url} (pid {state.router_pid})") + print(f" logs: {logs}") + print(f" started: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(state.created_at))}") + return 0 diff --git a/areal/experimental/cli/commands/run.py b/areal/experimental/cli/commands/run.py index d9f7a26cdf..a78133ee63 100644 --- a/areal/experimental/cli/commands/run.py +++ b/areal/experimental/cli/commands/run.py @@ -1,51 +1,69 @@ # SPDX-License-Identifier: Apache-2.0 -"""``areal run`` — launch a training driver in the foreground (scaffold). - -Training is AReaL's primary use case, so the verbs that drive a training -job (`run`, `start`, `stop`, `ps`, `status`, `logs`) sit at the top -level instead of being nested under a `train` namespace. The service-side -operator surfaces (inference / agent / weight-update) stay namespaced. - -No verb behavior is implemented in this scaffold release; this module -only reserves the ``areal run`` command name. -""" +"""``areal run`` — foreground training driver invoker.""" from __future__ import annotations import argparse +from pathlib import Path + _DESCRIPTION = """\ Launch a training driver in the foreground. -NO BEHAVIOR YET. Reserves the `areal run` command name. - -Planned flags: - --config PATH Training YAML config (required). - --name NAME Override run name (default: derived from yaml). - --driver MOD:FUNC Override driver entry (default: yaml `driver:` field). - overrides... Hydra-style overrides forwarded to the driver. +Resolve the driver entry from --driver or the yaml `driver:` field, then +invoke it in this process. Scheduler dispatch (local / slurm / ray) is +decided inside the driver based on config.scheduler.type. -Companion verbs (also top-level): - areal start Spawn a detached driver process (background). - areal stop Signal a running job by name. - areal ps List locally tracked jobs. - areal status Status of one job. - areal logs Tail a job's combined stdout/stderr. +Examples: + areal run --config experiments/grpo.yaml + areal run --config experiments/grpo.yaml --driver examples.math.gsm8k_rl:main + areal run --config experiments/grpo.yaml actor.lr=1e-5 +debug.foo=bar -State lives under ~/.areal/runs/. +Hydra overrides (key=value, +key=value, ~key) after the parsed flags are +forwarded verbatim to the driver. """ def add_parser(subparsers: argparse._SubParsersAction) -> None: p = subparsers.add_parser( "run", - help="Launch a training driver in the foreground (scaffold).", + help="Launch a training driver in the foreground.", description=_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, ) + p.add_argument("--config", required=True, type=Path) + p.add_argument( + "--name", default=None, + help="Override run name (default: / from yaml).", + ) + p.add_argument( + "--driver", default=None, + help="Driver entry 'module.path:func' (overrides yaml `driver:`).", + ) + p.add_argument( + "overrides", nargs=argparse.REMAINDER, + help="Hydra-style overrides forwarded to the driver.", + ) p.set_defaults(func=_handle) -def _handle(_: argparse.Namespace) -> int: - return 0 +def _handle(args: argparse.Namespace) -> int: + from areal.experimental.cli.runner import ( + resolve_driver, + resolve_name, + run_foreground, + ) + + config_path = args.config.expanduser().resolve() + if not config_path.exists(): + raise SystemExit(f"Config not found: {config_path}") + + driver = resolve_driver(config_path, cli_driver=args.driver) + name = resolve_name(config_path, cli_name=args.name) + return run_foreground( + name=name, + driver_spec=driver, + config_path=config_path, + overrides=args.overrides or [], + ) diff --git a/areal/experimental/cli/gateway_client.py b/areal/experimental/cli/gateway_client.py new file mode 100644 index 0000000000..eb733a2c20 --- /dev/null +++ b/areal/experimental/cli/gateway_client.py @@ -0,0 +1,69 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Minimal stdlib HTTP client for the inference gateway. + +Uses ``urllib.request`` so the CLI stays light. Only the endpoints the CLI +needs are wrapped; for ``areal inf run`` that's just ``GET /health``. +""" + +from __future__ import annotations + +import json +import socket +import urllib.error +import urllib.request +from dataclasses import dataclass +from typing import Any + + +class GatewayError(Exception): + pass + + +class GatewayUnreachable(GatewayError): + pass + + +class GatewayStatusError(GatewayError): + def __init__(self, status: int, body: str) -> None: + super().__init__(f"gateway returned HTTP {status}: {body[:200]}") + self.status = status + self.body = body + + +@dataclass +class GatewayClient: + url: str + admin_api_key: str | None = None + timeout: float = 5.0 + + def _request( + self, path: str, method: str = "GET", body: dict | None = None + ) -> Any: + full = self.url.rstrip("/") + path + headers = {"Accept": "application/json"} + data: bytes | None = None + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + if self.admin_api_key: + headers["Authorization"] = f"Bearer {self.admin_api_key}" + + req = urllib.request.Request(full, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + raw = resp.read() + except urllib.error.HTTPError as e: + raw = e.read() if hasattr(e, "read") else b"" + raise GatewayStatusError(e.code, raw.decode("utf-8", "replace")) from e + except (urllib.error.URLError, socket.timeout, ConnectionError) as e: + raise GatewayUnreachable(f"gateway at {self.url} unreachable: {e}") from e + if not raw: + return None + try: + return json.loads(raw.decode("utf-8")) + except json.JSONDecodeError: + return raw.decode("utf-8", "replace") + + def health(self) -> dict: + return self._request("/health") diff --git a/areal/experimental/cli/inf_launcher.py b/areal/experimental/cli/inf_launcher.py new file mode 100644 index 0000000000..fc4c05d741 --- /dev/null +++ b/areal/experimental/cli/inf_launcher.py @@ -0,0 +1,232 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Process management for ``areal inf`` services. + +Spawns the v2 inference gateway and router as detached subprocesses, polls +the gateway's ``/health`` until ready, and writes ``ServiceState``. No +hidden supervisor — later commands reconcile via state + PID + HTTP. +""" + +from __future__ import annotations + +import os +import signal +import subprocess +import sys +import time +from pathlib import Path + +from areal.experimental.cli.gateway_client import GatewayClient, GatewayUnreachable +from areal.experimental.cli.inf_state import ( + ServiceState, + liveness_summary, + service_logs_dir, + service_state_path, +) +from areal.experimental.cli.state import pid_alive + + +HEALTH_POLL_INTERVAL_S = 0.5 + + +def _spawn(cmd: list[str], log_file: Path) -> subprocess.Popen: + log_file.parent.mkdir(parents=True, exist_ok=True) + lf = open(log_file, "wb", buffering=0) + return subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=lf, + stderr=subprocess.STDOUT, + start_new_session=True, + env=os.environ.copy(), + ) + + +def _gateway_cmd( + *, + host: str, + port: int, + admin_api_key: str, + router_host: str, + router_port: int, + router_timeout: float, + forward_timeout: float, + log_level: str, +) -> list[str]: + return [ + sys.executable, "-m", "areal.experimental.inference_service.gateway", + "--host", host, + "--port", str(port), + "--admin-api-key", admin_api_key, + "--router-addr", f"http://{router_host}:{router_port}", + "--router-timeout", str(router_timeout), + "--forward-timeout", str(forward_timeout), + "--log-level", log_level, + ] + + +def _router_cmd( + *, + host: str, + port: int, + admin_api_key: str, + poll_interval: float, + routing_strategy: str, + log_level: str, +) -> list[str]: + return [ + sys.executable, "-m", "areal.experimental.inference_service.router", + "--host", host, + "--port", str(port), + "--admin-api-key", admin_api_key, + "--poll-interval", str(poll_interval), + "--routing-strategy", routing_strategy, + "--log-level", log_level, + ] + + +def _signal_pid(pid: int, sig: int) -> None: + try: + os.killpg(os.getpgid(pid), sig) + return + except ProcessLookupError: + return + except (PermissionError, OSError): + pass + try: + os.kill(pid, sig) + except ProcessLookupError: + pass + + +def _wait_dead(pids: list[int], deadline: float) -> bool: + while time.time() < deadline: + if not any(pid_alive(p) for p in pids): + return True + time.sleep(0.2) + return False + + +def _kill_state(state: ServiceState, grace: float) -> None: + pids = [state.gateway_pid, state.router_pid] + for p in pids: + if pid_alive(p): + _signal_pid(p, signal.SIGTERM) + if not _wait_dead(pids, time.time() + grace): + for p in pids: + if pid_alive(p): + _signal_pid(p, signal.SIGKILL) + + +def _refuse_if_active(name: str, force: bool) -> None: + p = service_state_path(name) + if not p.exists(): + return + try: + existing = ServiceState.load(name) + except (ValueError, FileNotFoundError, TypeError): + return + live = liveness_summary(existing) + healthy = False + if live["gateway_pid_alive"]: + try: + GatewayClient(existing.gateway_url, timeout=1.0).health() + healthy = True + except GatewayUnreachable: + healthy = False + if healthy and not force: + raise SystemExit( + f"Service {name!r} is already healthy (gateway={existing.gateway_url}). " + f"Use --force to replace it." + ) + if force and (live["gateway_pid_alive"] or live["router_pid_alive"]): + _kill_state(existing, grace=5.0) + existing.remove() + + +def start_service( + *, + name: str, + gateway_host: str, + gateway_port: int, + router_host: str, + router_port: int, + admin_api_key: str, + routing_strategy: str = "round_robin", + poll_interval: float = 5.0, + router_timeout: float = 2.0, + forward_timeout: float = 120.0, + log_level: str = "info", + force: bool = False, + launch_timeout: float = 30.0, +) -> ServiceState: + _refuse_if_active(name, force=force) + + logs = service_logs_dir(name) + + router_proc = _spawn( + _router_cmd( + host=router_host, + port=router_port, + admin_api_key=admin_api_key, + poll_interval=poll_interval, + routing_strategy=routing_strategy, + log_level=log_level, + ), + logs / "router.log", + ) + + time.sleep(0.3) + + gateway_proc = _spawn( + _gateway_cmd( + host=gateway_host, + port=gateway_port, + admin_api_key=admin_api_key, + router_host=router_host if router_host not in ("0.0.0.0", "::") else "127.0.0.1", + router_port=router_port, + router_timeout=router_timeout, + forward_timeout=forward_timeout, + log_level=log_level, + ), + logs / "gateway.log", + ) + + state = ServiceState( + name=name, + gateway_host=gateway_host, + gateway_port=gateway_port, + router_host=router_host, + router_port=router_port, + gateway_pid=gateway_proc.pid, + router_pid=router_proc.pid, + admin_api_key=admin_api_key, + log_level=log_level, + routing_strategy=routing_strategy, + created_at=time.time(), + ) + + client = GatewayClient(state.gateway_url, admin_api_key=admin_api_key, timeout=1.5) + deadline = time.time() + launch_timeout + last_err: Exception | None = None + while time.time() < deadline: + if not pid_alive(gateway_proc.pid) or not pid_alive(router_proc.pid): + _kill_state(state, grace=2.0) + raise SystemExit( + f"Service {name!r} died during launch. Check logs under {logs}." + ) + try: + client.health() + break + except GatewayUnreachable as e: + last_err = e + time.sleep(HEALTH_POLL_INTERVAL_S) + else: + _kill_state(state, grace=2.0) + raise SystemExit( + f"Service {name!r} did not become healthy within {launch_timeout:.0f}s " + f"(last error: {last_err}). Logs: {logs}" + ) + + state.save() + return state diff --git a/areal/experimental/cli/inf_state.py b/areal/experimental/cli/inf_state.py new file mode 100644 index 0000000000..3b0e741c6a --- /dev/null +++ b/areal/experimental/cli/inf_state.py @@ -0,0 +1,110 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""State files for ``areal inf`` services.""" + +from __future__ import annotations + +import json +import os +from dataclasses import asdict, dataclass, field +from pathlib import Path + +from areal.experimental.cli.state import areal_home, pid_alive + + +def inf_dir() -> Path: + d = areal_home() / "inf" + d.mkdir(parents=True, exist_ok=True) + return d + + +def services_dir() -> Path: + d = inf_dir() / "services" + d.mkdir(parents=True, exist_ok=True) + return d + + +def service_logs_dir(name: str) -> Path: + d = inf_dir() / "logs" / name + d.mkdir(parents=True, exist_ok=True) + return d + + +def current_service_file() -> Path: + return inf_dir() / "current-service" + + +def service_state_path(name: str) -> Path: + return services_dir() / f"{name}.json" + + +@dataclass +class ServiceState: + name: str + gateway_host: str + gateway_port: int + router_host: str + router_port: int + gateway_pid: int + router_pid: int + admin_api_key: str + log_level: str = "info" + routing_strategy: str = "round_robin" + created_at: float = 0.0 + extra: dict = field(default_factory=dict) + + @property + def gateway_url(self) -> str: + host = "127.0.0.1" if self.gateway_host in ("0.0.0.0", "::") else self.gateway_host + return f"http://{host}:{self.gateway_port}" + + @property + def router_url(self) -> str: + host = "127.0.0.1" if self.router_host in ("0.0.0.0", "::") else self.router_host + return f"http://{host}:{self.router_port}" + + def save(self) -> None: + p = service_state_path(self.name) + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(p.suffix + ".tmp") + with open(tmp, "w") as f: + json.dump(asdict(self), f, indent=2) + os.replace(tmp, p) + + @classmethod + def load(cls, name: str) -> ServiceState: + p = service_state_path(name) + if not p.exists(): + raise FileNotFoundError(f"No service state for {name!r} at {p}") + with open(p) as f: + return cls(**json.load(f)) + + def remove(self) -> None: + p = service_state_path(self.name) + if p.exists(): + p.unlink() + + +def liveness_summary(state: ServiceState) -> dict[str, bool]: + return { + "gateway_pid_alive": pid_alive(state.gateway_pid), + "router_pid_alive": pid_alive(state.router_pid), + } + + +def get_current_service() -> str | None: + p = current_service_file() + if not p.exists(): + return None + name = p.read_text().strip() + return name or None + + +def set_current_service(name: str | None) -> None: + p = current_service_file() + p.parent.mkdir(parents=True, exist_ok=True) + if name is None: + if p.exists(): + p.unlink() + return + p.write_text(name + "\n") diff --git a/areal/experimental/cli/runner.py b/areal/experimental/cli/runner.py new file mode 100644 index 0000000000..26515b0b53 --- /dev/null +++ b/areal/experimental/cli/runner.py @@ -0,0 +1,141 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Driver resolution and foreground execution for ``areal run``.""" + +from __future__ import annotations + +import importlib +import os +import sys +import time +from collections.abc import Callable +from pathlib import Path +from typing import Any + +import yaml + +from areal.experimental.cli.state import RunState, pid_alive, run_state_path + + +DriverFn = Callable[[list[str]], Any] + + +def _raw_yaml(config_path: Path) -> dict[str, Any]: + with open(config_path) as f: + data = yaml.safe_load(f) or {} + if not isinstance(data, dict): + raise SystemExit(f"Top-level of {config_path} must be a YAML mapping.") + return data + + +def _peek_driver(config_path: Path) -> str | None: + return _raw_yaml(config_path).get("driver") + + +def _peek_scheduler_type(config_path: Path) -> str | None: + sched = _raw_yaml(config_path).get("scheduler") or {} + return sched.get("type") if isinstance(sched, dict) else None + + +def _peek_name(config_path: Path) -> str | None: + raw = _raw_yaml(config_path) + exp = raw.get("experiment_name") + trial = raw.get("trial_name") + return f"{exp}/{trial}" if exp and trial else None + + +def _import_driver(spec: str) -> DriverFn: + if ":" not in spec: + raise SystemExit(f"Invalid driver {spec!r}; expected 'module.path:func'.") + mod_path, func_name = spec.split(":", 1) + try: + mod = importlib.import_module(mod_path) + except ImportError as e: + raise SystemExit(f"Cannot import driver module {mod_path!r}: {e}") from e + fn = getattr(mod, func_name, None) + if fn is None: + raise SystemExit(f"Module {mod_path!r} has no attribute {func_name!r}.") + if not callable(fn): + raise SystemExit(f"{spec!r} is not callable.") + return fn + + +def resolve_driver(config_path: Path, cli_driver: str | None) -> str: + if cli_driver: + return cli_driver + yaml_driver = _peek_driver(config_path) + if yaml_driver: + return yaml_driver + raise SystemExit( + f"No driver specified.\n" + f" Either add a `driver:` field to {config_path}:\n" + f" driver: examples.math.gsm8k_rl:main\n" + f" Or pass --driver on the command line:\n" + f" areal run --config {config_path} --driver examples.math.gsm8k_rl:main" + ) + + +def resolve_name(config_path: Path, cli_name: str | None) -> str: + if cli_name: + return cli_name + n = _peek_name(config_path) + if n: + return n + raise SystemExit( + f"No --name given and `experiment_name`/`trial_name` not both present in {config_path}." + ) + + +def _refuse_if_active(name: str) -> None: + p = run_state_path(name) + if not p.exists(): + return + try: + existing = RunState.load(name) + except (FileNotFoundError, ValueError): + return + if pid_alive(existing.pid): + raise SystemExit( + f"Run {name!r} already active (pid={existing.pid}). " + f"Use `areal stop {name}` first." + ) + + +def run_foreground( + *, name: str, driver_spec: str, config_path: Path, overrides: list[str] +) -> int: + _refuse_if_active(name) + + argv = ["--config", str(config_path)] + list(overrides) + state = RunState( + name=name, + driver=driver_spec, + config_path=str(config_path), + pid=os.getpid(), + started_at=time.time(), + scheduler_type=_peek_scheduler_type(config_path), + overrides=list(overrides), + argv=argv, + ) + state.save() + + rc = 0 + try: + fn = _import_driver(driver_spec) + result = fn(argv) + if isinstance(result, int): + rc = result + except SystemExit as e: + if isinstance(e.code, int): + rc = e.code + elif e.code is not None: + print(str(e.code), file=sys.stderr) + rc = 1 + except BaseException: + state.status = "failed" + state.save() + raise + + state.status = "completed" if rc == 0 else "failed" + state.save() + return rc diff --git a/areal/experimental/cli/state.py b/areal/experimental/cli/state.py index 66a36a20cc..8f7192e5d1 100644 --- a/areal/experimental/cli/state.py +++ b/areal/experimental/cli/state.py @@ -2,54 +2,26 @@ """Cross-cutting state helpers shared by every sub-CLI. -The AReaL CLI persists a small amount of local state under ``~/.areal/`` -(overridable with ``$AREAL_HOME``). Each sub-CLI owns a subdirectory with -the same internal shape, so commands can find services and jobs across -invocations without a background daemon: - - ~/.areal/ - ├── inf/ - │ ├── current-service - │ ├── services/.json - │ └── logs// - ├── agent/ - │ ├── current-service - │ ├── services/.json - │ └── logs// - ├── train/ - │ ├── current-run - │ ├── runs/.json - │ └── logs// - └── weight-update/ - ├── current-service - ├── services/.json - └── logs// - -The helpers in this module are deliberately tiny — atomic file write, -PID liveness, and the ``~/.areal/`` resolver. Per-namespace dataclasses -(``ServiceState``, ``RunState``, etc.) live alongside the verb files -that introduce them, not here. - -This module must stay import-light. Do NOT add dependencies on torch, -aiohttp, fastapi, or any other heavy package — the lightness guard test -in ``tests/experimental/test_cli_lightness.py`` will reject the change. +Layout under ``$AREAL_HOME`` (default ``~/.areal``): + + runs/.json top-level training runs + runs/.log their captured stdout/stderr + inf/services/.json inference service instances + inf/logs// per-service logs + agent/services/.json agent service instances + weight-update/... diagnostics state """ from __future__ import annotations import json import os +from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any def areal_home() -> Path: - """Return the root state directory (``$AREAL_HOME`` or ``~/.areal``). - - Creates the directory if it doesn't exist. Sub-CLIs derive their own - subdirectories from here; they should never bypass this function and - hardcode ``~/.areal`` themselves. - """ env = os.environ.get("AREAL_HOME") root = Path(env).expanduser() if env else Path.home() / ".areal" root.mkdir(parents=True, exist_ok=True) @@ -57,25 +29,12 @@ def areal_home() -> Path: def namespace_dir(namespace: str) -> Path: - """Return ``~/.areal//`` (created on demand). - - ``namespace`` is the on-disk directory name and follows the CLI - verb prefix (``inf``, ``agent``, ``train``, ``weight-update``). The - hyphenated form is used on disk to match the CLI surface exactly. - """ d = areal_home() / namespace d.mkdir(parents=True, exist_ok=True) return d def pid_alive(pid: int) -> bool: - """Cheap liveness probe: does *pid* still exist on this host? - - Uses ``kill(pid, 0)`` which signals nothing but raises ``ProcessLookupError`` - if the process is gone. Returns ``False`` for ``pid <= 0``. Note: a live - PID does not mean the service is healthy — pair this with an HTTP probe - when callers need real health status. - """ if pid <= 0: return False try: @@ -83,17 +42,11 @@ def pid_alive(pid: int) -> bool: except ProcessLookupError: return False except PermissionError: - # The process exists but is owned by another user. return True return True def atomic_write_text(path: Path, content: str) -> None: - """Write *content* to *path* atomically (write-temp + rename). - - Prevents readers from observing a partially written file when two CLI - invocations race on the same state file. - """ path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w") as f: @@ -102,5 +55,54 @@ def atomic_write_text(path: Path, content: str) -> None: def atomic_write_json(path: Path, data: Any, *, indent: int = 2) -> None: - """Convenience wrapper: serialize *data* to JSON and write atomically.""" atomic_write_text(path, json.dumps(data, indent=indent) + "\n") + + +# ---- top-level training run state ---------------------------------------- + +def runs_dir() -> Path: + d = areal_home() / "runs" + d.mkdir(parents=True, exist_ok=True) + return d + + +def sanitize_name(name: str) -> str: + return name.replace("/", "__").replace(" ", "_") + + +def run_state_path(name: str) -> Path: + return runs_dir() / f"{sanitize_name(name)}.json" + + +def run_log_path(name: str) -> Path: + return runs_dir() / f"{sanitize_name(name)}.log" + + +@dataclass +class RunState: + name: str + driver: str + config_path: str + pid: int + started_at: float + status: str = "running" # running | stopped | completed | failed + log_path: str = "" + scheduler_type: str | None = None + overrides: list[str] = field(default_factory=list) + argv: list[str] = field(default_factory=list) + + def save(self) -> None: + atomic_write_json(run_state_path(self.name), asdict(self)) + + @classmethod + def load(cls, name: str) -> RunState: + p = run_state_path(name) + if not p.exists(): + raise FileNotFoundError(f"No run state for {name!r} at {p}") + with open(p) as f: + return cls(**json.load(f)) + + def remove(self) -> None: + p = run_state_path(self.name) + if p.exists(): + p.unlink() From 96ed7bdb6cf39b9eaeed191be5349d84fd305b6d Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 4 Jun 2026 14:50:17 +0800 Subject: [PATCH 09/11] revert(top-level): restore eager `from .infra import (...)` in areal/__init__.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The lazy `__getattr__` form introduced in d839c212 broke an implicit cycle-breaker that v1 / production code relied on. Eager loading of `areal.infra` at `import areal` time pre-populates sys.modules so that `cli_args.py`'s top-level `from areal.engine.fsdp_utils.attn_impl import ...` chain (infra → engine_api → alloc_mode → cli_args) hits the cache and never tries to look up symbols on a partially-loaded cli_args. Without that pre-load, invoking `areal run --driver examples.math.gsm8k_rl:main` triggers the cycle the moment the driver imports `from areal.api.cli_args import GRPOConfig`: cannot import name 'ParallelStrategy' from partially initialized module 'areal.api.alloc_mode' (most likely due to a circular import) The lightness invariant the lazy form was originally protecting was removed in a525e340 (test_cli_lightness.py deletion), so the lazy form no longer earns its keep. Revert to main's eager form. A real fix (deferring the attn_impl / name_resolve imports in cli_args itself + lazy infra/__init__) is the proper long-term direction, but that touches areal/api/cli_args.py and areal/infra/__init__.py and is out of scope for the CLI scaffold PR. --- areal/__init__.py | 49 +++++++++++++++++------------------------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/areal/__init__.py b/areal/__init__.py index e6edb1a6a7..42be212bdb 100644 --- a/areal/__init__.py +++ b/areal/__init__.py @@ -4,41 +4,28 @@ from .version import __version__ # noqa - -_INFRA_NAMES = frozenset( - { - "RolloutController", - "StalenessManager", - "TrainController", - "WorkflowExecutor", - "current_platform", - "workflow_context", - } -) - -_TRAINER_NAMES = frozenset( - { - "DPOTrainer", - "PPOTrainer", - "RWTrainer", - "SFTTrainer", - } +from .infra import ( + RolloutController, + StalenessManager, + TrainController, + WorkflowExecutor, + current_platform, + workflow_context, ) def __getattr__(name: str): - if name in _INFRA_NAMES: - from . import infra as _infra - - value = getattr(_infra, name) - globals()[name] = value - return value - if name in _TRAINER_NAMES: - from . import trainer as _trainer - - value = getattr(_trainer, name) - globals()[name] = value - return value + if name in ("DPOTrainer", "PPOTrainer", "RWTrainer", "SFTTrainer"): + from .trainer import DPOTrainer, PPOTrainer, RWTrainer, SFTTrainer + + _map = { + "DPOTrainer": DPOTrainer, + "PPOTrainer": PPOTrainer, + "RWTrainer": RWTrainer, + "SFTTrainer": SFTTrainer, + } + globals().update(_map) + return _map[name] raise AttributeError(f"module {__name__!r} has no attribute {name!r}") From 8934416fa408c62606d3bb902dbfcd7da689a314 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 4 Jun 2026 15:18:01 +0800 Subject: [PATCH 10/11] feat(cli): detach areal run + heartbeat; drop scheduler_type peek and --name flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit areal run no longer runs the driver in the foreground. It now spawns the driver as a detached subprocess via a thin _exec.py wrapper, then exits. The wrapper: - updates RunState.last_heartbeat every 5 s on a background thread - converts SIGTERM into SystemExit so cleanup runs - writes final status + exit_code on driver exit Status semantics from RunState + heartbeat + PID liveness: status="running" + recent heartbeat + pid alive → healthy status="running" + stale heartbeat + pid alive → hung status="running" + pid dead → crashed status in {completed, failed, stopped} → terminal Other changes: - --name removed; the run name is always experiment_name/trial_name from the yaml (Hydra overrides experiment_name=X / trial_name=Y are honored by re-deriving from overrides). - _peek_scheduler_type and the RunState.scheduler_type field deleted; CLI does not own scheduler.type metadata (driver does internally). - RunState.argv deleted (redundant with config_path + overrides); last_heartbeat and exit_code added. --- areal/experimental/cli/_exec.py | 97 +++++++++++++++++++ areal/experimental/cli/commands/run.py | 30 +++--- areal/experimental/cli/runner.py | 126 ++++++++++++------------- areal/experimental/cli/state.py | 4 +- 4 files changed, 174 insertions(+), 83 deletions(-) create mode 100644 areal/experimental/cli/_exec.py diff --git a/areal/experimental/cli/_exec.py b/areal/experimental/cli/_exec.py new file mode 100644 index 0000000000..fa08cf2293 --- /dev/null +++ b/areal/experimental/cli/_exec.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Detached driver wrapper invoked by ``start_detached``. + +Spawned as ``python -m areal.experimental.cli._exec --name ... --driver +MOD:FUNC --config PATH -- ``. Updates RunState heartbeat on a +background thread, runs the driver in the main thread, writes final +status + exit code on exit. +""" + +from __future__ import annotations + +import argparse +import importlib +import os +import signal +import sys +import threading +import time +import traceback + +from areal.experimental.cli.state import RunState + + +HEARTBEAT_INTERVAL_S = 5.0 + + +def _heartbeat_loop(name: str, stop_event: threading.Event) -> None: + while not stop_event.is_set(): + try: + s = RunState.load(name) + s.last_heartbeat = time.time() + s.save() + except (FileNotFoundError, ValueError, OSError): + pass + stop_event.wait(HEARTBEAT_INTERVAL_S) + + +def _install_sigterm_handler() -> None: + def _handler(signum, frame): + raise SystemExit(143) + signal.signal(signal.SIGTERM, _handler) + + +def main() -> int: + p = argparse.ArgumentParser(prog="areal-exec", add_help=False) + p.add_argument("--name", required=True) + p.add_argument("--driver", required=True) + p.add_argument("--config", required=True) + p.add_argument("overrides", nargs=argparse.REMAINDER) + args = p.parse_args() + + overrides = args.overrides or [] + if overrides and overrides[0] == "--": + overrides = overrides[1:] + + state = RunState.load(args.name) + state.pid = os.getpid() + state.last_heartbeat = time.time() + state.save() + + _install_sigterm_handler() + + stop = threading.Event() + hb = threading.Thread(target=_heartbeat_loop, args=(args.name, stop), daemon=True) + hb.start() + + rc = 0 + try: + mod_path, func_name = args.driver.split(":", 1) + mod = importlib.import_module(mod_path) + fn = getattr(mod, func_name) + argv = ["--config", args.config, *overrides] + result = fn(argv) + if isinstance(result, int): + rc = result + except SystemExit as e: + rc = e.code if isinstance(e.code, int) else (1 if e.code else 0) + except BaseException: + traceback.print_exc(file=sys.stderr) + rc = 1 + finally: + stop.set() + try: + final = RunState.load(args.name) + final.status = "completed" if rc == 0 else "failed" + final.exit_code = rc + final.last_heartbeat = time.time() + final.save() + except Exception: + pass + + return rc + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/areal/experimental/cli/commands/run.py b/areal/experimental/cli/commands/run.py index a78133ee63..c1b3d311e5 100644 --- a/areal/experimental/cli/commands/run.py +++ b/areal/experimental/cli/commands/run.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 -"""``areal run`` — foreground training driver invoker.""" +"""``areal run`` — detached training driver launcher.""" from __future__ import annotations @@ -9,16 +9,21 @@ _DESCRIPTION = """\ -Launch a training driver in the foreground. +Launch a training driver in the background. Resolve the driver entry from --driver or the yaml `driver:` field, then -invoke it in this process. Scheduler dispatch (local / slurm / ray) is -decided inside the driver based on config.scheduler.type. +spawn it as a detached subprocess. The CLI returns immediately after the +child has been launched; the run is tracked under ~/.areal/runs/ via +state + heartbeat (refreshed every 5 s by the wrapper process). + +Run name is derived from `experiment_name`/`trial_name` in the yaml +(or matching Hydra overrides). The yaml MUST contain both, or the +overrides must supply them. Examples: areal run --config experiments/grpo.yaml areal run --config experiments/grpo.yaml --driver examples.math.gsm8k_rl:main - areal run --config experiments/grpo.yaml actor.lr=1e-5 +debug.foo=bar + areal run --config experiments/grpo.yaml actor.lr=1e-5 trial_name=lr-sweep-3 Hydra overrides (key=value, +key=value, ~key) after the parsed flags are forwarded verbatim to the driver. @@ -28,15 +33,11 @@ def add_parser(subparsers: argparse._SubParsersAction) -> None: p = subparsers.add_parser( "run", - help="Launch a training driver in the foreground.", + help="Launch a training driver in the background.", description=_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument("--config", required=True, type=Path) - p.add_argument( - "--name", default=None, - help="Override run name (default: / from yaml).", - ) p.add_argument( "--driver", default=None, help="Driver entry 'module.path:func' (overrides yaml `driver:`).", @@ -52,18 +53,19 @@ def _handle(args: argparse.Namespace) -> int: from areal.experimental.cli.runner import ( resolve_driver, resolve_name, - run_foreground, + start_detached, ) config_path = args.config.expanduser().resolve() if not config_path.exists(): raise SystemExit(f"Config not found: {config_path}") + overrides = args.overrides or [] driver = resolve_driver(config_path, cli_driver=args.driver) - name = resolve_name(config_path, cli_name=args.name) - return run_foreground( + name = resolve_name(config_path, overrides=overrides) + return start_detached( name=name, driver_spec=driver, config_path=config_path, - overrides=args.overrides or [], + overrides=overrides, ) diff --git a/areal/experimental/cli/runner.py b/areal/experimental/cli/runner.py index 26515b0b53..276876a4d9 100644 --- a/areal/experimental/cli/runner.py +++ b/areal/experimental/cli/runner.py @@ -1,23 +1,24 @@ # SPDX-License-Identifier: Apache-2.0 -"""Driver resolution and foreground execution for ``areal run``.""" +"""Driver resolution and detached launch for ``areal run``.""" from __future__ import annotations -import importlib import os +import subprocess import sys import time -from collections.abc import Callable from pathlib import Path from typing import Any import yaml -from areal.experimental.cli.state import RunState, pid_alive, run_state_path - - -DriverFn = Callable[[list[str]], Any] +from areal.experimental.cli.state import ( + RunState, + pid_alive, + run_log_path, + run_state_path, +) def _raw_yaml(config_path: Path) -> dict[str, Any]: @@ -32,32 +33,12 @@ def _peek_driver(config_path: Path) -> str | None: return _raw_yaml(config_path).get("driver") -def _peek_scheduler_type(config_path: Path) -> str | None: - sched = _raw_yaml(config_path).get("scheduler") or {} - return sched.get("type") if isinstance(sched, dict) else None - - -def _peek_name(config_path: Path) -> str | None: - raw = _raw_yaml(config_path) - exp = raw.get("experiment_name") - trial = raw.get("trial_name") - return f"{exp}/{trial}" if exp and trial else None - - -def _import_driver(spec: str) -> DriverFn: - if ":" not in spec: - raise SystemExit(f"Invalid driver {spec!r}; expected 'module.path:func'.") - mod_path, func_name = spec.split(":", 1) - try: - mod = importlib.import_module(mod_path) - except ImportError as e: - raise SystemExit(f"Cannot import driver module {mod_path!r}: {e}") from e - fn = getattr(mod, func_name, None) - if fn is None: - raise SystemExit(f"Module {mod_path!r} has no attribute {func_name!r}.") - if not callable(fn): - raise SystemExit(f"{spec!r} is not callable.") - return fn +def _override_value(overrides: list[str], key: str) -> str | None: + for o in reversed(overrides): + stripped = o.lstrip("+") + if stripped.startswith(f"{key}="): + return stripped.split("=", 1)[1] + return None def resolve_driver(config_path: Path, cli_driver: str | None) -> str: @@ -75,15 +56,18 @@ def resolve_driver(config_path: Path, cli_driver: str | None) -> str: ) -def resolve_name(config_path: Path, cli_name: str | None) -> str: - if cli_name: - return cli_name - n = _peek_name(config_path) - if n: - return n - raise SystemExit( - f"No --name given and `experiment_name`/`trial_name` not both present in {config_path}." - ) +def resolve_name(config_path: Path, overrides: list[str]) -> str: + raw = _raw_yaml(config_path) + exp = _override_value(overrides, "experiment_name") or raw.get("experiment_name") + trial = _override_value(overrides, "trial_name") or raw.get("trial_name") + if not (exp and trial): + raise SystemExit( + f"experiment_name and trial_name are required.\n" + f" Add them to {config_path}, or pass\n" + f" experiment_name= trial_name=\n" + f" as Hydra overrides." + ) + return f"{exp}/{trial}" def _refuse_if_active(name: str) -> None: @@ -101,41 +85,49 @@ def _refuse_if_active(name: str) -> None: ) -def run_foreground( +def start_detached( *, name: str, driver_spec: str, config_path: Path, overrides: list[str] ) -> int: _refuse_if_active(name) - argv = ["--config", str(config_path)] + list(overrides) + log_file = run_log_path(name) + log_file.parent.mkdir(parents=True, exist_ok=True) + + cmd = [ + sys.executable, "-m", "areal.experimental.cli._exec", + "--name", name, + "--driver", driver_spec, + "--config", str(config_path), + "--", + *overrides, + ] + + env = os.environ.copy() + env.setdefault("PYTHONUNBUFFERED", "1") + + with open(log_file, "wb", buffering=0) as lf: + proc = subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=lf, + stderr=subprocess.STDOUT, + start_new_session=True, + env=env, + ) + state = RunState( name=name, driver=driver_spec, config_path=str(config_path), - pid=os.getpid(), + pid=proc.pid, started_at=time.time(), - scheduler_type=_peek_scheduler_type(config_path), + log_path=str(log_file), overrides=list(overrides), - argv=argv, + last_heartbeat=time.time(), ) state.save() - rc = 0 - try: - fn = _import_driver(driver_spec) - result = fn(argv) - if isinstance(result, int): - rc = result - except SystemExit as e: - if isinstance(e.code, int): - rc = e.code - elif e.code is not None: - print(str(e.code), file=sys.stderr) - rc = 1 - except BaseException: - state.status = "failed" - state.save() - raise - - state.status = "completed" if rc == 0 else "failed" - state.save() - return rc + print(f"Started run {name!r} (pid {proc.pid}).") + print(f" log: {log_file}") + print(f" state: {run_state_path(name)}") + return 0 diff --git a/areal/experimental/cli/state.py b/areal/experimental/cli/state.py index 8f7192e5d1..386cdf0798 100644 --- a/areal/experimental/cli/state.py +++ b/areal/experimental/cli/state.py @@ -87,9 +87,9 @@ class RunState: started_at: float status: str = "running" # running | stopped | completed | failed log_path: str = "" - scheduler_type: str | None = None overrides: list[str] = field(default_factory=list) - argv: list[str] = field(default_factory=list) + last_heartbeat: float = 0.0 + exit_code: int | None = None def save(self) -> None: atomic_write_json(run_state_path(self.name), asdict(self)) From 1d24a98eae9feb34dbf130a9f341333dfed3aa22 Mon Sep 17 00:00:00 2001 From: sitabulaixizawaluduo Date: Thu, 4 Jun 2026 15:34:08 +0800 Subject: [PATCH 11/11] feat(cli): make `areal run` attached (foreground); reuse wrapper for both modes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit K8s Job / Slurm sbatch / any orchestrator that infers task status from the entry process's lifecycle requires `areal run` to: - stay alive while the driver runs - exit with the driver's exit code (0 success, non-zero failure) The previous detached-default behaviour broke this contract: CLI exited immediately with 0, container engine marked Pod completed, killed the background driver. Fix: `areal run` now runs the driver in the CLI process. Detached mode moves to `areal start` (already a top-level verb stub in the scaffold). Implementation: extract `run_with_wrapper(name, driver, config, overrides)` from `_exec.main()`. Both modes call it: - `areal run` → CLI process calls it directly (attached) - `areal start` → CLI spawns subprocess running `_exec.main` which in turn calls `run_with_wrapper` (detached) Heartbeat thread, SIGTERM trap, and final-state write are identical in both modes — single implementation. --- areal/experimental/cli/_exec.py | 64 +++++++++++++++----------- areal/experimental/cli/commands/run.py | 56 +++++++++++++++------- areal/experimental/cli/runner.py | 4 +- 3 files changed, 79 insertions(+), 45 deletions(-) diff --git a/areal/experimental/cli/_exec.py b/areal/experimental/cli/_exec.py index fa08cf2293..413f411129 100644 --- a/areal/experimental/cli/_exec.py +++ b/areal/experimental/cli/_exec.py @@ -1,11 +1,13 @@ # SPDX-License-Identifier: Apache-2.0 -"""Detached driver wrapper invoked by ``start_detached``. +"""Driver execution wrapper. -Spawned as ``python -m areal.experimental.cli._exec --name ... --driver -MOD:FUNC --config PATH -- ``. Updates RunState heartbeat on a -background thread, runs the driver in the main thread, writes final -status + exit code on exit. +``run_with_wrapper`` runs a driver in the current process with a heartbeat +thread, SIGTERM trap, and final-state write. Used by both: + + - ``areal run`` (attached / foreground): CLI process is the wrapper + - ``areal start`` (detached / background): wrapper is a child process + spawned via ``python -m areal.experimental.cli._exec`` (entry: ``main``) """ from __future__ import annotations @@ -18,6 +20,7 @@ import threading import time import traceback +from pathlib import Path from areal.experimental.cli.state import RunState @@ -42,35 +45,24 @@ def _handler(signum, frame): signal.signal(signal.SIGTERM, _handler) -def main() -> int: - p = argparse.ArgumentParser(prog="areal-exec", add_help=False) - p.add_argument("--name", required=True) - p.add_argument("--driver", required=True) - p.add_argument("--config", required=True) - p.add_argument("overrides", nargs=argparse.REMAINDER) - args = p.parse_args() - - overrides = args.overrides or [] - if overrides and overrides[0] == "--": - overrides = overrides[1:] - - state = RunState.load(args.name) - state.pid = os.getpid() - state.last_heartbeat = time.time() - state.save() - +def run_with_wrapper( + name: str, + driver_spec: str, + config_path: str | Path, + overrides: list[str], +) -> int: _install_sigterm_handler() stop = threading.Event() - hb = threading.Thread(target=_heartbeat_loop, args=(args.name, stop), daemon=True) + hb = threading.Thread(target=_heartbeat_loop, args=(name, stop), daemon=True) hb.start() rc = 0 try: - mod_path, func_name = args.driver.split(":", 1) + mod_path, func_name = driver_spec.split(":", 1) mod = importlib.import_module(mod_path) fn = getattr(mod, func_name) - argv = ["--config", args.config, *overrides] + argv = ["--config", str(config_path), *overrides] result = fn(argv) if isinstance(result, int): rc = result @@ -82,7 +74,7 @@ def main() -> int: finally: stop.set() try: - final = RunState.load(args.name) + final = RunState.load(name) final.status = "completed" if rc == 0 else "failed" final.exit_code = rc final.last_heartbeat = time.time() @@ -93,5 +85,25 @@ def main() -> int: return rc +def main() -> int: + p = argparse.ArgumentParser(prog="areal-exec", add_help=False) + p.add_argument("--name", required=True) + p.add_argument("--driver", required=True) + p.add_argument("--config", required=True) + p.add_argument("overrides", nargs=argparse.REMAINDER) + args = p.parse_args() + + overrides = args.overrides or [] + if overrides and overrides[0] == "--": + overrides = overrides[1:] + + state = RunState.load(args.name) + state.pid = os.getpid() + state.last_heartbeat = time.time() + state.save() + + return run_with_wrapper(args.name, args.driver, args.config, overrides) + + if __name__ == "__main__": sys.exit(main()) diff --git a/areal/experimental/cli/commands/run.py b/areal/experimental/cli/commands/run.py index c1b3d311e5..caff5c1f29 100644 --- a/areal/experimental/cli/commands/run.py +++ b/areal/experimental/cli/commands/run.py @@ -1,39 +1,45 @@ # SPDX-License-Identifier: Apache-2.0 -"""``areal run`` — detached training driver launcher.""" +"""``areal run`` — attached (foreground) training driver launcher.""" from __future__ import annotations import argparse +import os +import sys +import time from pathlib import Path _DESCRIPTION = """\ -Launch a training driver in the background. +Launch a training driver in the current process (foreground / attached). -Resolve the driver entry from --driver or the yaml `driver:` field, then -spawn it as a detached subprocess. The CLI returns immediately after the -child has been launched; the run is tracked under ~/.areal/runs/ via -state + heartbeat (refreshed every 5 s by the wrapper process). +The CLI process IS the driver process — it stays alive until the driver +exits. Exit code reflects the driver outcome (0 = success, non-zero = +failure). This is the right entry for K8s Jobs / Slurm sbatch scripts / +any orchestrator that uses process lifecycle to detect task status. + +For "fire and forget" launches (login-node convenience), use `areal start` +instead, which spawns a detached background process. Run name is derived from `experiment_name`/`trial_name` in the yaml -(or matching Hydra overrides). The yaml MUST contain both, or the -overrides must supply them. +(Hydra overrides honored). The yaml MUST contain both, or the overrides +must supply them. + +A background heartbeat thread updates RunState.last_heartbeat every 5 s +so other CLIs can detect hung drivers via `areal status` (later stages). Examples: areal run --config experiments/grpo.yaml areal run --config experiments/grpo.yaml --driver examples.math.gsm8k_rl:main areal run --config experiments/grpo.yaml actor.lr=1e-5 trial_name=lr-sweep-3 - -Hydra overrides (key=value, +key=value, ~key) after the parsed flags are -forwarded verbatim to the driver. """ def add_parser(subparsers: argparse._SubParsersAction) -> None: p = subparsers.add_parser( "run", - help="Launch a training driver in the background.", + help="Launch a training driver in the foreground.", description=_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, ) @@ -50,11 +56,13 @@ def add_parser(subparsers: argparse._SubParsersAction) -> None: def _handle(args: argparse.Namespace) -> int: + from areal.experimental.cli._exec import run_with_wrapper from areal.experimental.cli.runner import ( + refuse_if_active, resolve_driver, resolve_name, - start_detached, ) + from areal.experimental.cli.state import RunState, run_state_path config_path = args.config.expanduser().resolve() if not config_path.exists(): @@ -63,9 +71,23 @@ def _handle(args: argparse.Namespace) -> int: overrides = args.overrides or [] driver = resolve_driver(config_path, cli_driver=args.driver) name = resolve_name(config_path, overrides=overrides) - return start_detached( + refuse_if_active(name) + + state = RunState( name=name, - driver_spec=driver, - config_path=config_path, - overrides=overrides, + driver=driver, + config_path=str(config_path), + pid=os.getpid(), + started_at=time.time(), + log_path="", + overrides=list(overrides), + last_heartbeat=time.time(), + ) + state.save() + + print( + f"Run {name!r} starting (pid {os.getpid()}). " + f"State: {run_state_path(name)}", + file=sys.stderr, ) + return run_with_wrapper(name, driver, config_path, overrides) diff --git a/areal/experimental/cli/runner.py b/areal/experimental/cli/runner.py index 276876a4d9..2a31f16260 100644 --- a/areal/experimental/cli/runner.py +++ b/areal/experimental/cli/runner.py @@ -70,7 +70,7 @@ def resolve_name(config_path: Path, overrides: list[str]) -> str: return f"{exp}/{trial}" -def _refuse_if_active(name: str) -> None: +def refuse_if_active(name: str) -> None: p = run_state_path(name) if not p.exists(): return @@ -88,7 +88,7 @@ def _refuse_if_active(name: str) -> None: def start_detached( *, name: str, driver_spec: str, config_path: Path, overrides: list[str] ) -> int: - _refuse_if_active(name) + refuse_if_active(name) log_file = run_log_path(name) log_file.parent.mkdir(parents=True, exist_ok=True)