diff --git a/DESIGN.md b/DESIGN.md index 680176e..224c7be 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -439,8 +439,40 @@ enabled for line-oriented doors such as Ether/Tele-Arena. It is appropriate when most commands are text lines submitted with Enter and the two-step `type_text` plus `press_key enter` pattern is just decision overhead. -A future campaign runner should compose activities into fair model-vs-model -schedules instead of replacing these activity runners. +`run-match` composes multiple activity states into one shared environment. Each +participant has a separate terminal session, model adapter, stateful provider +session, recent-step context, campaign memory, and per-agent trace. The +scheduler is policy-driven: `sequential`, `parallel_barrier`, `parallel_race`, +and `continuous` share fixed, seeded shuffle, and rotate order policies. Fixed +order preserves reproducibility, seeded shuffle reduces first-mover bias, and +rotate alternates first position without randomness. `parallel_barrier` splits +each step into a decision phase and a commit phase: active agents decide +concurrently, then actions are committed in the scheduled order. `parallel_race` +uses the scheduled order as launch order but commits actions as soon as model +decisions complete, making latency part of the competition. `continuous` keeps +one decision in flight per active agent and immediately requeues an agent after +its action commits, so faster models can take more initiative within the same +match wall-clock budget. Choose `parallel_barrier` when fairness matters more +than latency. The scheduler writes match events for match start/completion, +per-round or per-tick order, decision completion, each committed action, +disconnects, reconnect attempts, and final stop reasons, while the normal +activity traces remain the source of detailed prompts, actions, observations, +and memory updates. + +Melee runs are the same match abstraction with more participants and richer +configuration. A TOML or JSON match config should own the roster, scheduler +policy, reconnect policy, objective template, disabled action set, budgets, and +per-participant provider settings. `max_wall_seconds` is a match-level budget +shared by every participant; `max_decision_ticks` remains per participant. In +`continuous` mode, `max_rounds` is interpreted as the maximum number of queued +action decisions for the whole match rather than full all-agent rounds. A future +campaign runner should compose activities into longer fair model-vs-model +schedules instead of replacing these activity and match runners. + +Continuous mode does not emit `round_started` or `round_completed`, because +there are no all-agent rounds. Continuous scheduler events use `tick` for the +committed action count; `commit_order` also records `queued_tick` for the +decision request that produced the action. Memory is harness-owned: diff --git a/README.md b/README.md index dce0803..d1b5c6f 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,61 @@ Ether/Tele-Arena where normal commands are submitted with Enter. It keeps `submit_line` available while preserving `press_key` and `type_text` for single-key or partial-input prompts. +`run-match` runs several agents against the same BBS or door server. Each +participant gets its own terminal session, model adapter, stateful provider +session, recent-step context, campaign memory, and per-agent trace; the match +trace records match start/completion, per-round or per-tick order, actions, +disconnects, and reconnects. The default scheduler mode is `sequential`: agents +act one at a time in the chosen per-round order. `parallel_barrier` asks active +agents for decisions concurrently, then commits actions in the chosen order. +`parallel_race` also asks concurrently, but commits each action as soon as that +agent's decision is ready. `continuous` keeps one decision in flight per active +agent and immediately requeues that agent after each committed action; faster +models get more initiative by design. The default order is fixed CLI order, but +competitive runs can use seeded shuffle or rotating first-player order. For +example, a Claude-vs-Codex Tele-Arena smoke can use: + +```bash +uv run bbs-gym run-match \ + --host 127.0.0.1 \ + --port 3000 \ + --transport telnet \ + --telnet-enter lf \ + --no-agents-config \ + --activity bbs-door-line \ + --participant arena-codex:codex:gpt-5.5 \ + --participant arena-claude:claude:sonnet \ + --codex-stateful \ + --claude-stateful \ + --scheduler-mode sequential \ + --match-order shuffle \ + --match-seed 20260519 \ + --disconnect-policy reconnect \ + --disable-action hangup \ + --run-objective "Play Tele-Arena as {agent_id}. If asked for a character name, create or log in as {agent_id}. Stay connected; do not hang up or quit. Other active agents: {opponents}. Survive, gain experience and gold, buy and equip useful supplies, spend gold wisely, recover when hurt, find opponents, and defeat them when prepared." \ + --max-rounds 100 \ + --max-decision-ticks 100 +``` + +For larger melees, put the participant roster and scheduler settings in a +TOML or JSON file: + +```bash +uv run bbs-gym run-match --match-config examples/tele_arena_melee.toml +``` + +`examples/tele_arena_melee.toml` shows a Codex, Claude, and local +OpenAI-compatible model sharing one Tele-Arena server. Config files can set the +activity, transport, budgets, objective template, scheduler mode/order/seed, +disconnect policy, disabled actions, and per-participant provider settings. +Config values are treated as the match definition when `--match-config` is used. +For match runs, `--max-wall-seconds` is a match-level wall-clock budget shared +by all participants, while `--max-decision-ticks` is per participant. In +`continuous` mode, `--max-rounds` caps the number of queued action decisions for +the whole match instead of all-agent rounds. Continuous traces use `tick` +instead of `round` for scheduler events and do not emit `round_started` / +`round_completed` lifecycle events. + Use `--prompt-layout cache_friendly` when comparing local OpenAI-compatible servers with prefix caching. The default `timeline_first` layout preserves the existing trace-oriented prompt order; `cache_friendly` moves stable objectives, diff --git a/TELE_ARENA.md b/TELE_ARENA.md index 08f46bc..faa7cbd 100644 --- a/TELE_ARENA.md +++ b/TELE_ARENA.md @@ -277,6 +277,48 @@ bat, died, recovered in the temple, and continued until the 100-step budget. The same run used `submit_line` for most complete commands and had no action validation failures. +## 10. Let Two Agents Play A Match + +`run-match` opens one telnet session per participant. The default `sequential` +scheduler alternates one decision tick per active agent. `parallel_barrier` +collects decisions concurrently and commits them in the scheduled order; +`parallel_race` commits actions as model decisions finish. `continuous` keeps +one decision in flight per active agent and immediately requeues that agent +after each committed action, so faster models get more chances to act during the +same match wall-clock budget. Inline participant specs use +`agent_id:provider:model`; each participant still gets its own per-agent JSONL +trace and model state. + +```bash +uv run bbs-gym run-match \ + --host 127.0.0.1 \ + --port 3000 \ + --transport telnet \ + --telnet-enter lf \ + --no-agents-config \ + --activity bbs-door-line \ + --participant arena-codex:codex:gpt-5.5 \ + --participant arena-claude:claude:sonnet \ + --codex-stateful \ + --claude-stateful \ + --prompt-layout cache_friendly \ + --log-path runtime/logs/tele-arena-match.jsonl \ + --disable-action hangup \ + --run-objective "Play Tele-Arena as {agent_id}. If asked for a character name, create or log in as {agent_id}. Stay connected; do not hang up or quit. Other active agents: {opponents}. Survive, gain experience and gold, buy and equip useful supplies, spend gold wisely, recover when hurt, find opponents, and defeat them when prepared." \ + --max-rounds 100 \ + --max-decision-ticks 100 +``` + +The match trace goes to `runtime/logs/tele-arena-match.jsonl`. Per-agent traces +use the same stem, for example `tele-arena-match.arena-codex.jsonl` and +`tele-arena-match.arena-claude.jsonl`. + +For match runs, `--max-wall-seconds` is match-level. `--max-decision-ticks` +still applies per participant. In `continuous` mode, `--max-rounds` caps the +total queued action decisions for the whole match rather than full all-agent +rounds. Continuous traces use `tick` instead of `round` for scheduler events and +do not emit `round_started` / `round_completed` lifecycle events. + ## Notes - Use `--telnet-enter lf` for Ether. CR-only caused repeated delayed submits. @@ -286,5 +328,6 @@ validation failures. before Enter. - The wrapper is intentionally thin; pass any extra `bbs-gym run-activity` arguments after the wrapper arguments and they will be forwarded. -- The current objective is conservative. For more exploratory runs, override - `--run-objective`. +- Match-specific objectives should carry game strategy. Add + `--disable-action hangup` for competitive runs so agents cannot leave the + match with the harness-level hangup action. diff --git a/examples/tele_arena_activity.py b/examples/tele_arena_activity.py index 643dd47..8046e35 100644 --- a/examples/tele_arena_activity.py +++ b/examples/tele_arena_activity.py @@ -15,8 +15,9 @@ DEFAULT_RUN_OBJECTIVE = ( "Play Tele-Arena through this telnet session. If asked for a character name, create or log in as " - "ArenaCodex. Explore carefully, learn commands, survive fights, gain experience or gold, buy useful " - "starter supplies, and recover from mistakes." + "ArenaCodex. Stay connected unless the run objective explicitly says to leave. Survive fights, gain " + "experience and gold, buy and equip useful starter supplies, spend gold wisely, recover when hurt, and " + "keep making progress instead of quitting early." ) @@ -109,6 +110,8 @@ def build_bbs_gym_argv(args: argparse.Namespace) -> list[str]: cmd.extend(["--prompt-layout", args.prompt_layout]) if args.recent_steps_to_keep is not None: cmd.extend(["--recent-steps-to-keep", str(args.recent_steps_to_keep)]) + for disabled_action in args.disabled_actions: + cmd.extend(["--disable-action", disabled_action]) return cmd @@ -153,6 +156,7 @@ def parse_args(argv: list[str] | None = None) -> tuple[argparse.Namespace, list[ parser.add_argument("--prompt-mode", choices=["stateless_full", "stateful_delta"]) parser.add_argument("--prompt-layout", choices=["timeline_first", "cache_friendly"]) parser.add_argument("--recent-steps-to-keep", type=int) + parser.add_argument("--disable-action", dest="disabled_actions", action="append", default=["hangup"]) parser.add_argument("--max-decision-ticks", type=int, default=100) parser.add_argument("--max-wall-seconds", type=float, default=2400.0) parser.add_argument("--observe-timeout", type=float, default=8.0) diff --git a/examples/tele_arena_melee.toml b/examples/tele_arena_melee.toml new file mode 100644 index 0000000..9ba7179 --- /dev/null +++ b/examples/tele_arena_melee.toml @@ -0,0 +1,47 @@ +host = "127.0.0.1" +port = 3000 +transport = "telnet" +telnet_enter = "lf" +activity = "bbs-door-line" +prompt_layout = "cache_friendly" +recent_steps_to_keep = 5 +log_path = "runtime/logs/tele-arena-melee.jsonl" +disabled_actions = ["hangup"] +run_objective = "Play Tele-Arena as {agent_id}. If asked for a character name, create or log in as {agent_id}. Stay connected; do not hang up or quit. Other active agents: {opponents}. Survive, gain experience and gold, buy and equip useful supplies, spend gold wisely, recover when hurt, find opponents, and defeat them when prepared." + +[scheduler] +mode = "sequential" +order = "shuffle" +seed = 20260519 +disconnect_policy = "reconnect" +max_reconnects = 3 +reconnect_delay = 2.0 + +[budget] +# Match-level wall-clock cap; decision ticks remain per participant. +max_rounds = 2000 +max_decision_ticks = 2000 +max_wall_seconds = 86400 + +[[participants]] +agent_id = "ArenaCodex" +provider = "codex" +model = "gpt-5.5" +stateful = true +codex_session_file = "runtime/codex-sessions/tele-arena-melee-ArenaCodex.session" + +[[participants]] +agent_id = "ArenaClaude" +provider = "claude" +model = "sonnet" +stateful = true +claude_session_file = "runtime/claude-sessions/tele-arena-melee-ArenaClaude.session" + +[[participants]] +agent_id = "ArenaGemma" +provider = "openai-compatible" +model = "gemma4" +base_url = "http://localhost:8000/v1" +temperature = 0.6 +max_tokens = 4096 +response_filter = "gemma4" diff --git a/packages/bbs-gym/README.md b/packages/bbs-gym/README.md index f0e202c..62cfe84 100644 --- a/packages/bbs-gym/README.md +++ b/packages/bbs-gym/README.md @@ -4,5 +4,5 @@ BBS and door-game environments for LLM terminal agents. This package contains the BBS-specific layer from the Spree workspace: Synchronet connection wiring, account tooling, BBS/TW2 profiles, routed -activities, and the `bbs-gym` CLI. It depends on `tty-agent` for the reusable -terminal-agent runtime. +activities, scheduled multi-agent matches, and the `bbs-gym` CLI. It depends on +`tty-agent` for the reusable terminal-agent runtime. diff --git a/packages/bbs-gym/src/bbs_gym/cli.py b/packages/bbs-gym/src/bbs_gym/cli.py index 112a657..4bd7157 100644 --- a/packages/bbs-gym/src/bbs_gym/cli.py +++ b/packages/bbs-gym/src/bbs_gym/cli.py @@ -7,11 +7,13 @@ import os import subprocess import sys +import tomllib from dataclasses import replace from pathlib import Path from typing import Any from tty_agent.ansi import strip_ansi +from tty_agent.actions import DEFAULT_ALLOWED_ACTIONS from tty_agent.models import ( AnthropicAdapter, ClaudeCliAdapter, @@ -27,12 +29,22 @@ from .accounts import AccountConfigError, AgentRegistry, load_agent_registry from .activities import activity_profile from .env import BbsGym +from .match import ( + MatchParticipantRuntime, + MatchParticipantSpec, + MatchSchedulerConfig, + run_scheduled_match, +) from .profiles import BBS_PROFILE, TW2_PROFILE from .routing import ActivityRouteSet, activity_route_set, activity_route_set_names DEFAULT_AGENTS_CONFIG = Path("config/agents.local.json") DEFAULT_OPENAI_BASE_URL = "http://localhost:11434/v1" +DEFAULT_MATCH_OBJECTIVE = ( + "Play this shared terminal activity as {agent_id}. Other active agents in the match: {opponents}. " + "Explore, survive, improve your position, and interact with opponents when useful." +) def smoke(args: argparse.Namespace) -> int: @@ -179,8 +191,199 @@ def run_routed(args: argparse.Namespace) -> int: return 0 +def run_match(args: argparse.Namespace) -> int: + try: + _apply_match_config(args) + _validate_match_args(args) + registry = None if args.no_agents_config else load_agent_registry(args.agents_config, required=False) + specs = match_participant_specs(args) + participants = build_match_participants(args, specs, registry) + scheduler = build_match_scheduler_config(args) + except (AccountConfigError, ValueError) as exc: + print(str(exc), file=sys.stderr) + return 2 + + match_log_path = Path(args.log_path) + try: + with BbsGym( + host=args.host, + port=args.port, + rlogin_port=args.rlogin_port, + rlogin_terminal=args.rlogin_terminal, + transport=args.transport, + telnet_enter_sequence=args.telnet_enter, + agent_registry=registry, + ) as gym: + match_result = run_scheduled_match(gym, participants, scheduler, match_log_path) + except (OSError, AccountConfigError, ValueError) as exc: + print(f"connection failed: {exc}", file=sys.stderr) + return 1 + + summary = ", ".join( + f"{result.agent_id}:steps={len(result.steps)} stop={result.stop_reason}" + for _, result in match_result.results + ) + print( + f"match participants={len(match_result.results)} commit_count={match_result.commit_count} " + f"scheduler={scheduler.mode} {summary} log={match_log_path}" + ) + return 0 + + +def _apply_match_config(args: argparse.Namespace) -> None: + if getattr(args, "_match_config_applied", False): + return + args._match_config_applied = True + path = getattr(args, "match_config", None) + if not path: + return + + config = _load_match_config(Path(path)) + _set_config_values( + args, + config, + { + "host": "host", + "port": "port", + "rlogin_port": "rlogin_port", + "rlogin_terminal": "rlogin_terminal", + "transport": "transport", + "telnet_enter": "telnet_enter", + "agents_config": "agents_config", + "no_agents_config": "no_agents_config", + "activity": "activity", + "profile_objective": "profile_objective", + "run_objective": "run_objective", + "log_path": "log_path", + "observe_timeout": "observe_timeout", + "stable_ms": "stable_ms", + "byte_quiet_ms": "byte_quiet_ms", + "recent_steps_to_keep": "recent_steps_to_keep", + "model_error_retries": "model_error_retries", + "prompt_mode": "prompt_mode", + "prompt_layout": "prompt_layout", + "disabled_actions": "disabled_actions", + }, + ) + _set_config_values( + args, + _config_mapping(config, "scheduler"), + { + "mode": "scheduler_mode", + "order": "match_order", + "seed": "match_seed", + "disconnect_policy": "disconnect_policy", + "max_reconnects": "max_reconnects", + "reconnect_delay": "reconnect_delay", + "max_workers": "max_workers", + }, + ) + _set_config_values( + args, + _config_mapping(config, "budget"), + { + "max_rounds": "max_rounds", + "max_decision_ticks": "max_decision_ticks", + "max_wall_seconds": "max_wall_seconds", + }, + ) + if "participants" in config: + args._match_participants_config = _match_participant_specs_from_config(config["participants"]) + _validate_match_args(args) + + +def _load_match_config(path: Path) -> dict[str, Any]: + try: + if path.suffix == ".json": + data = json.loads(path.read_text(encoding="utf-8")) + else: + data = tomllib.loads(path.read_text(encoding="utf-8")) + except OSError as exc: + raise ValueError(f"could not read match config {path}: {exc}") from exc + except (json.JSONDecodeError, tomllib.TOMLDecodeError) as exc: + raise ValueError(f"invalid match config {path}: {exc}") from exc + if not isinstance(data, dict): + raise ValueError("match config root must be an object") + return data + + +def _set_config_values(args: argparse.Namespace, config: dict[str, Any], mapping: dict[str, str]) -> None: + for config_key, arg_key in mapping.items(): + if config_key in config: + setattr(args, arg_key, config[config_key]) + + +def _config_mapping(config: dict[str, Any], key: str) -> dict[str, Any]: + value = config.get(key) + if value is None: + return {} + if not isinstance(value, dict): + raise ValueError(f"match config field {key!r} must be an object") + return value + + +def _match_participant_specs_from_config(value: object) -> list[MatchParticipantSpec]: + if not isinstance(value, list): + raise ValueError("match config field 'participants' must be a list") + specs: list[MatchParticipantSpec] = [] + for item in value: + if not isinstance(item, dict): + raise ValueError("each match participant must be an object") + agent_id = _required_config_str(item, "agent_id", "match participant") + provider = _config_str(item, "provider") + model = _config_str(item, "model") + specs.append(MatchParticipantSpec(agent_id=agent_id, provider=provider, model=model, config=dict(item))) + return specs + + +def _validate_match_args(args: argparse.Namespace) -> None: + if args.scheduler_mode not in {"sequential", "parallel_race", "parallel_barrier", "continuous"}: + raise ValueError("scheduler_mode must be one of: sequential, parallel_race, parallel_barrier, continuous") + if args.match_order not in {"fixed", "shuffle", "rotate"}: + raise ValueError("match_order must be one of: fixed, shuffle, rotate") + if args.disconnect_policy not in {"stop", "reconnect"}: + raise ValueError("disconnect_policy must be one of: stop, reconnect") + if args.max_reconnects < 0: + raise ValueError("max_reconnects must be >= 0") + if args.reconnect_delay < 0: + raise ValueError("reconnect_delay must be >= 0") + if args.max_workers is not None and args.max_workers < 1: + raise ValueError("max_workers must be >= 1") + if args.max_rounds < 1: + raise ValueError("max_rounds must be >= 1") + if args.max_decision_ticks < 1: + raise ValueError("max_decision_ticks must be >= 1") + if args.max_wall_seconds <= 0: + raise ValueError("max_wall_seconds must be > 0") + + +def build_match_scheduler_config(args: argparse.Namespace) -> MatchSchedulerConfig: + _apply_match_config(args) + _validate_match_args(args) + return MatchSchedulerConfig( + mode=args.scheduler_mode, + order=args.match_order, + seed=args.match_seed, + disconnect_policy=args.disconnect_policy, + max_reconnects=args.max_reconnects, + reconnect_delay=args.reconnect_delay, + max_rounds=args.max_rounds, + max_decision_ticks=args.max_decision_ticks, + max_wall_seconds=args.max_wall_seconds, + max_workers=args.max_workers, + ) + + +def _required_config_str(config: dict[str, Any], key: str, owner: str) -> str: + value = config.get(key) + if not isinstance(value, str) or not value: + raise ValueError(f"{owner} field {key!r} must be a non-empty string") + return value + + def build_activity_profile(args: argparse.Namespace, registry: AgentRegistry | None = None) -> ActivityProfile: profile = activity_profile(args.activity, args.profile_objective) + profile = _profile_with_action_overrides(profile, args) overrides = build_profile_overrides(args, registry) return replace(profile, **overrides) if overrides else profile @@ -191,16 +394,89 @@ def build_activity_route_set(args: argparse.Namespace, registry: AgentRegistry | default_overrides = dict(overrides) if getattr(args, "profile_objective", None): default_overrides["objective"] = args.profile_objective - default_profile = ( - replace(route_set.default_profile, **default_overrides) if default_overrides else route_set.default_profile - ) + default_profile = _profile_with_action_overrides(route_set.default_profile, args) + default_profile = replace(default_profile, **default_overrides) if default_overrides else default_profile routes = tuple( - replace(route, profile=replace(route.profile, **overrides) if overrides else route.profile) + replace( + route, + profile=replace(_profile_with_action_overrides(route.profile, args), **overrides) + if overrides + else _profile_with_action_overrides(route.profile, args), + ) for route in route_set.routes ) return replace(route_set, default_profile=default_profile, routes=routes) +def _profile_with_action_overrides(profile: ActivityProfile, args: argparse.Namespace) -> ActivityProfile: + disabled_actions = _disabled_actions(args) + if not disabled_actions: + return profile + allowed_actions = frozenset( + action + for action in profile.action_policy.allowed_actions + if action not in disabled_actions + ) + return replace(profile, action_policy=replace(profile.action_policy, allowed_actions=allowed_actions)) + + +def _disabled_actions(args: argparse.Namespace) -> frozenset[str]: + values = getattr(args, "disabled_actions", []) or [] + if isinstance(values, str): + values = [values] + if not all(isinstance(value, str) for value in values): + raise ValueError("disabled_actions must be a list of action names") + disabled = frozenset(values) + unknown = disabled - (DEFAULT_ALLOWED_ACTIONS | {"send_raw"}) + if unknown: + raise ValueError(f"unknown disabled action(s): {', '.join(sorted(unknown))}") + return disabled + + +def match_participant_specs(args: argparse.Namespace) -> list[MatchParticipantSpec]: + _apply_match_config(args) + configured_specs = getattr(args, "_match_participants_config", None) + if configured_specs is not None: + specs = list(configured_specs) + else: + specs = [_parse_match_participant(value) for value in getattr(args, "participant", [])] + specs.extend(MatchParticipantSpec(agent_id=agent_id) for agent_id in getattr(args, "agent_id", [])) + if len(specs) < 2: + raise ValueError("run-match requires at least two --participant or --agent-id values") + + seen: set[str] = set() + for spec in specs: + if spec.agent_id in seen: + raise ValueError(f"duplicate match agent_id: {spec.agent_id}") + seen.add(spec.agent_id) + return specs + + +def build_match_participants( + args: argparse.Namespace, + specs: list[MatchParticipantSpec], + registry: AgentRegistry | None, +) -> list[MatchParticipantRuntime]: + participants: list[MatchParticipantRuntime] = [] + for spec in specs: + participant_args = _participant_args(args, spec) + opponents = [other.agent_id for other in specs if other.agent_id != spec.agent_id] + profile = build_activity_profile(participant_args, registry) + log_path = _agent_log_path(args.log_path, spec.agent_id) + objective = _format_match_objective(args.run_objective or DEFAULT_MATCH_OBJECTIVE, spec.agent_id, opponents) + runner = ActivityRunner(profile, log_path=log_path, run_objective=objective) + participants.append( + MatchParticipantRuntime( + spec=spec, + model=build_model(participant_args, registry), + model_metadata=build_model_metadata(participant_args, registry), + runner=runner, + log_path=log_path, + ) + ) + return participants + + def build_profile_overrides(args: argparse.Namespace, registry: AgentRegistry | None = None) -> dict[str, object]: record = registry.maybe_get(args.agent_id) if registry is not None else None model_config = record.model if record is not None else {} @@ -214,6 +490,8 @@ def build_profile_overrides(args: argparse.Namespace, registry: AgentRegistry | overrides["byte_quiet_ms"] = args.byte_quiet_ms if getattr(args, "recent_steps_to_keep", None) is not None: overrides["recent_steps_to_keep"] = args.recent_steps_to_keep + if getattr(args, "model_error_retries", None) is not None: + overrides["model_error_retries"] = args.model_error_retries if getattr(args, "prompt_mode", None) is not None: overrides["prompt_mode"] = args.prompt_mode elif provider == "codex" and _codex_stateful(args, model_config): @@ -225,6 +503,51 @@ def build_profile_overrides(args: argparse.Namespace, registry: AgentRegistry | return overrides +def _parse_match_participant(value: str) -> MatchParticipantSpec: + parts = value.split(":", 2) + if len(parts) == 1: + agent_id = parts[0].strip() + if not agent_id: + raise ValueError("match participant agent_id must not be empty") + return MatchParticipantSpec(agent_id=agent_id) + if len(parts) != 3: + raise ValueError("match participant must be agent_id or agent_id:provider:model") + agent_id, provider, model = (part.strip() for part in parts) + if not agent_id or not provider or not model: + raise ValueError("match participant must be agent_id or agent_id:provider:model") + return MatchParticipantSpec(agent_id=agent_id, provider=provider, model=model) + + +def _participant_args(args: argparse.Namespace, spec: MatchParticipantSpec) -> argparse.Namespace: + data = vars(args).copy() + data["agent_id"] = spec.agent_id + if spec.config is not None: + for key, value in spec.config.items(): + data[key.replace("-", "_")] = value + if spec.provider is not None: + data["provider"] = spec.provider + if spec.model is not None: + data["model"] = spec.model + if "stateful" in data: + provider = data.get("provider") + if provider == "codex": + data["codex_stateful"] = bool(data["stateful"]) + elif provider == "claude": + data["claude_stateful"] = bool(data["stateful"]) + return argparse.Namespace(**data) + + +def _format_match_objective(template: str, agent_id: str, opponents: list[str]) -> str: + return template.replace("{agent_id}", agent_id).replace("{opponents}", ", ".join(opponents) or "none") + + +def _agent_log_path(match_log_path: str | Path, agent_id: str) -> Path: + path = Path(match_log_path) + safe_agent = "".join(char if char.isalnum() or char in "-_." else "_" for char in agent_id) + suffix = path.suffix or ".jsonl" + return path.with_name(f"{path.stem}.{safe_agent}{suffix}") + + def build_model(args: argparse.Namespace, registry: AgentRegistry | None): record = registry.maybe_get(args.agent_id) if registry is not None else None model_config = record.model if record is not None else {} @@ -250,7 +573,7 @@ def build_model(args: argparse.Namespace, registry: AgentRegistry | None): model=args.model or _config_str(model_config, "model"), profile=getattr(args, "codex_profile", None) or _config_str(model_config, "profile"), executable=getattr(args, "codex_executable", None) or _config_str(model_config, "executable") or "codex", - timeout=_config_float(getattr(args, "codex_timeout", None), model_config, "timeout", 300.0), + timeout=_config_float(getattr(args, "codex_timeout", None), model_config, "timeout", 600.0), sandbox=getattr(args, "codex_sandbox", None) or _config_str(model_config, "sandbox") or "read-only", cwd=getattr(args, "codex_cwd", None) or _config_str(model_config, "cwd"), extra_args=_config_str_list(model_config, "extra_args") + (getattr(args, "codex_arg", []) or []), @@ -266,7 +589,7 @@ def build_model(args: argparse.Namespace, registry: AgentRegistry | None): model = ClaudeCliAdapter( model=args.model or _config_str(model_config, "model"), executable=getattr(args, "claude_executable", None) or _config_str(model_config, "executable") or "claude", - timeout=_config_float(getattr(args, "claude_timeout", None), model_config, "timeout", 300.0), + timeout=_config_float(getattr(args, "claude_timeout", None), model_config, "timeout", 600.0), cwd=getattr(args, "claude_cwd", None) or _config_str(model_config, "cwd"), extra_args=_config_str_list(model_config, "extra_args") + (getattr(args, "claude_arg", []) or []), stateful=_claude_stateful(args, model_config), @@ -333,7 +656,7 @@ def build_model_metadata(args: argparse.Namespace, registry: AgentRegistry | Non "executable": getattr(args, "codex_executable", None) or _config_str(model_config, "executable") or "codex", - "timeout": _config_float(getattr(args, "codex_timeout", None), model_config, "timeout", 300.0), + "timeout": _config_float(getattr(args, "codex_timeout", None), model_config, "timeout", 600.0), "sandbox": getattr(args, "codex_sandbox", None) or _config_str(model_config, "sandbox") or "read-only", "cwd": getattr(args, "codex_cwd", None) or _config_str(model_config, "cwd"), "extra_args": _config_str_list(model_config, "extra_args") + (getattr(args, "codex_arg", []) or []), @@ -352,7 +675,7 @@ def build_model_metadata(args: argparse.Namespace, registry: AgentRegistry | Non "executable": getattr(args, "claude_executable", None) or _config_str(model_config, "executable") or "claude", - "timeout": _config_float(getattr(args, "claude_timeout", None), model_config, "timeout", 300.0), + "timeout": _config_float(getattr(args, "claude_timeout", None), model_config, "timeout", 600.0), "cwd": getattr(args, "claude_cwd", None) or _config_str(model_config, "cwd"), "extra_args": _config_str_list(model_config, "extra_args") + (getattr(args, "claude_arg", []) or []), "stateful": _claude_stateful(args, model_config), @@ -516,10 +839,10 @@ def _claude_bare(args: argparse.Namespace, model_config: dict[str, Any]) -> bool def _claude_tools(args: argparse.Namespace, model_config: dict[str, Any]) -> str | None: value = getattr(args, "claude_tools", None) if value is not None: - return value + return value or None if "tools" in model_config: return _config_str(model_config, "tools") - return "" + return None def _without_empty_values(data: dict[str, object | None]) -> dict[str, object]: @@ -603,7 +926,8 @@ def main(argv: list[str] | None = None) -> int: run_parser.add_argument("--claude-session-id") run_parser.add_argument("--claude-session-file") run_parser.add_argument( - "--claude-permission-mode", choices=["acceptEdits", "auto", "bypassPermissions", "default", "dontAsk", "plan"] + "--claude-permission-mode", + choices=["acceptEdits", "auto", "bypassPermissions", "default", "dontAsk", "plan"], ) run_parser.add_argument("--claude-tools") run_parser.add_argument("--claude-bare", action="store_true") @@ -622,8 +946,16 @@ def main(argv: list[str] | None = None) -> int: run_parser.add_argument("--stable-ms", type=int) run_parser.add_argument("--byte-quiet-ms", type=int) run_parser.add_argument("--recent-steps-to-keep", type=int) + run_parser.add_argument("--model-error-retries", type=int) run_parser.add_argument("--prompt-mode", choices=["stateless_full", "stateful_delta"]) run_parser.add_argument("--prompt-layout", choices=["timeline_first", "cache_friendly"]) + run_parser.add_argument( + "--disable-action", + dest="disabled_actions", + action="append", + default=[], + help="remove an action from the activity schema for this run; repeatable, e.g. --disable-action hangup", + ) run_parser.add_argument("--log-path", default="runtime/logs/activity.jsonl") run_parser.set_defaults(func=run_activity) @@ -683,11 +1015,131 @@ def main(argv: list[str] | None = None) -> int: routed_parser.add_argument("--stable-ms", type=int) routed_parser.add_argument("--byte-quiet-ms", type=int) routed_parser.add_argument("--recent-steps-to-keep", type=int) + routed_parser.add_argument("--model-error-retries", type=int) routed_parser.add_argument("--prompt-mode", choices=["stateless_full", "stateful_delta"]) routed_parser.add_argument("--prompt-layout", choices=["timeline_first", "cache_friendly"]) + routed_parser.add_argument( + "--disable-action", + dest="disabled_actions", + action="append", + default=[], + help="remove an action from the activity schema for this run; repeatable, e.g. --disable-action hangup", + ) routed_parser.add_argument("--log-path", default="runtime/logs/routed-activity.jsonl") routed_parser.set_defaults(func=run_routed) + match_parser = subparsers.add_parser("run-match", help="run a scheduled multi-agent BBS activity") + match_parser.add_argument("--host", default="127.0.0.1") + match_parser.add_argument("--match-config", help="TOML or JSON file describing a multi-agent match") + match_parser.add_argument("--port", type=int, default=2323) + match_parser.add_argument("--rlogin-port", type=int, default=2513) + match_parser.add_argument("--rlogin-terminal", default="ansi") + match_parser.add_argument("--transport", choices=["telnet", "rlogin"], default="telnet") + match_parser.add_argument("--telnet-enter", choices=["cr", "lf", "crlf"], default="cr") + match_parser.add_argument("--agents-config", default=str(DEFAULT_AGENTS_CONFIG)) + match_parser.add_argument( + "--no-agents-config", + action="store_true", + help="ignore the agent registry; useful for standalone telnet games with inline participants", + ) + match_parser.add_argument( + "--participant", + action="append", + default=[], + help="match participant as agent_id or agent_id:provider:model; repeat for each player", + ) + match_parser.add_argument( + "--agent-id", + action="append", + default=[], + help="agent id loaded from --agents-config; repeat for each player", + ) + match_parser.add_argument("--provider", choices=["openai-compatible", "anthropic", "claude", "codex", "scripted"]) + match_parser.add_argument("--base-url") + match_parser.add_argument("--api-key") + match_parser.add_argument("--no-anthropic-cache", action="store_true") + match_parser.add_argument("--model") + match_parser.add_argument("--scripted-response", action="append", default=[]) + match_parser.add_argument("--temperature", type=float) + match_parser.add_argument("--max-tokens", type=int) + match_parser.add_argument("--response-filter", choices=["auto", "default", "gemma4", "none"]) + match_parser.add_argument("--codex-profile") + match_parser.add_argument("--codex-executable") + match_parser.add_argument("--codex-timeout", type=float) + match_parser.add_argument("--codex-sandbox", choices=["read-only", "workspace-write", "danger-full-access"]) + match_parser.add_argument("--codex-cwd") + match_parser.add_argument("--codex-arg", action="append", default=[]) + match_parser.add_argument("--codex-stateful", action="store_true") + match_parser.add_argument("--codex-session-id") + match_parser.add_argument("--codex-session-file") + match_parser.add_argument("--claude-executable") + match_parser.add_argument("--claude-timeout", type=float) + match_parser.add_argument("--claude-cwd") + match_parser.add_argument("--claude-arg", action="append", default=[]) + match_parser.add_argument("--claude-stateful", action="store_true") + match_parser.add_argument("--claude-session-id") + match_parser.add_argument("--claude-session-file") + match_parser.add_argument( + "--claude-permission-mode", + choices=["acceptEdits", "auto", "bypassPermissions", "default", "dontAsk", "plan"], + ) + match_parser.add_argument("--claude-tools") + match_parser.add_argument("--claude-bare", action="store_true") + match_parser.add_argument("--activity", default="bbs-door-line") + match_parser.add_argument( + "--profile-objective", + help="override the selected profile's built-in objective", + ) + match_parser.add_argument( + "--run-objective", + help="match objective template; supports {agent_id} and {opponents}", + ) + match_parser.add_argument( + "--max-rounds", + type=int, + default=50, + help="maximum scheduled rounds; in continuous mode this is the maximum queued action count", + ) + match_parser.add_argument( + "--max-decision-ticks", + type=int, + default=50, + help="maximum committed decision ticks per participant", + ) + match_parser.add_argument( + "--max-wall-seconds", + type=float, + default=600.0, + help="match-level wall-clock budget shared by all participants", + ) + match_parser.add_argument( + "--scheduler-mode", + choices=["sequential", "parallel_race", "parallel_barrier", "continuous"], + default="sequential", + ) + match_parser.add_argument("--match-order", choices=["fixed", "shuffle", "rotate"], default="fixed") + match_parser.add_argument("--match-seed", type=int) + match_parser.add_argument("--disconnect-policy", choices=["stop", "reconnect"], default="stop") + match_parser.add_argument("--max-reconnects", type=int, default=3) + match_parser.add_argument("--reconnect-delay", type=float, default=2.0) + match_parser.add_argument("--max-workers", type=int) + match_parser.add_argument("--observe-timeout", type=float) + match_parser.add_argument("--stable-ms", type=int) + match_parser.add_argument("--byte-quiet-ms", type=int) + match_parser.add_argument("--recent-steps-to-keep", type=int) + match_parser.add_argument("--model-error-retries", type=int) + match_parser.add_argument("--prompt-mode", choices=["stateless_full", "stateful_delta"]) + match_parser.add_argument("--prompt-layout", choices=["timeline_first", "cache_friendly"]) + match_parser.add_argument( + "--disable-action", + dest="disabled_actions", + action="append", + default=[], + help="remove an action from every participant's activity schema; repeatable, e.g. --disable-action hangup", + ) + match_parser.add_argument("--log-path", default="runtime/logs/match.jsonl") + match_parser.set_defaults(func=run_match) + accounts_parser = subparsers.add_parser("accounts", help="manage BBS agent account registry") accounts_subparsers = accounts_parser.add_subparsers(dest="accounts_command", required=True) diff --git a/packages/bbs-gym/src/bbs_gym/match.py b/packages/bbs-gym/src/bbs_gym/match.py new file mode 100644 index 0000000..9c76ac8 --- /dev/null +++ b/packages/bbs-gym/src/bbs_gym/match.py @@ -0,0 +1,780 @@ +"""Scheduled multi-agent match orchestration. + +Log events use ``round`` as the scheduled round number for ``sequential``, +``parallel_barrier``, and ``parallel_race``. In ``continuous`` mode there are no +all-agent rounds, so continuous scheduler events use ``tick`` instead. +Continuous ``commit_order`` events also include ``queued_tick`` for the original +decision request that produced the committed action. +""" + +from __future__ import annotations + +import json +import random +import time +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal, Protocol, runtime_checkable + +from tty_agent.runner import ActivityBudget, ActivityResult, ActivityRunner, PreparedActivityStep + +from .accounts import AccountConfigError +from .env import BbsGym + +MatchSchedulerMode = Literal["sequential", "parallel_race", "parallel_barrier", "continuous"] +MatchOrder = Literal["fixed", "shuffle", "rotate"] +DisconnectPolicy = Literal["stop", "reconnect"] + + +@dataclass(frozen=True) +class MatchParticipantSpec: + agent_id: str + provider: str | None = None + model: str | None = None + config: dict[str, Any] | None = None + + +@dataclass +class MatchParticipantRuntime: + spec: MatchParticipantSpec + model: object + model_metadata: dict[str, object] + runner: ActivityRunner + log_path: Path + # Runtime accounting mutated by the scheduler while a match is active. + reconnects: int = 0 + + +@dataclass(frozen=True) +class MatchSchedulerConfig: + """Scheduler settings for a shared multi-agent match. + + ``parallel_race`` and ``continuous`` intentionally turn model decision + latency into initiative. Use ``parallel_barrier`` when all models should + decide concurrently but commit in a fair scheduled order. + """ + + mode: MatchSchedulerMode = "sequential" + order: MatchOrder = "fixed" + seed: int | None = None + disconnect_policy: DisconnectPolicy = "stop" + max_reconnects: int = 3 + reconnect_delay: float = 2.0 + max_rounds: int = 50 + max_decision_ticks: int = 50 + max_wall_seconds: float = 600.0 + max_workers: int | None = None + + +@dataclass(frozen=True) +class MatchRunResult: + commit_count: int + results: list[tuple[MatchParticipantRuntime, ActivityResult]] + + +@runtime_checkable +class ClosableAgent(Protocol): + def close(self) -> None: ... + + +def run_scheduled_match( + gym: BbsGym, + participants: list[MatchParticipantRuntime], + scheduler: MatchSchedulerConfig, + match_log_path: Path, +) -> MatchRunResult: + match_started_at = time.monotonic() + _write_match_started(match_log_path, participants, scheduler) + states: list[tuple[MatchParticipantRuntime, Any]] = [] + scheduler_count = 0 + try: + for participant in participants: + agent = gym.connect(participant.spec.agent_id, model_metadata=participant.model_metadata) + state = participant.runner.start_state( + agent, + participant.model, + ActivityBudget( + max_decision_ticks=scheduler.max_decision_ticks, + max_wall_seconds=scheduler.max_wall_seconds, + started_at=match_started_at, + ), + ) + states.append((participant, state)) + + if scheduler.mode == "sequential": + scheduler_count = _run_sequential_match(gym, states, scheduler, match_log_path, match_started_at) + elif scheduler.mode == "parallel_barrier": + scheduler_count = _run_parallel_barrier_match(gym, states, scheduler, match_log_path, match_started_at) + elif scheduler.mode == "parallel_race": + scheduler_count = _run_parallel_race_match(gym, states, scheduler, match_log_path, match_started_at) + elif scheduler.mode == "continuous": + scheduler_count = _run_continuous_match(gym, states, scheduler, match_log_path, match_started_at) + else: + raise ValueError(f"unknown match scheduler mode: {scheduler.mode}") + + if _match_time_exhausted(scheduler, match_started_at): + _mark_active_states_completed(states, "match_wall_seconds") + elif scheduler_count >= scheduler.max_rounds: + for _, state in states: + if not state.completed: + state.stop_reason = _match_limit_stop_reason(scheduler) + state.completed = True + + results = [(participant, participant.runner.finish_state(state)) for participant, state in states] + except Exception as exc: + _write_match_completed( + match_log_path, + scheduler_count, + 0, + [], + scheduler, + match_started_at, + clean_exit=False, + error=str(exc), + ) + raise + + commit_count = _commit_count(results) + _write_match_completed( + match_log_path, + scheduler_count, + commit_count, + results, + scheduler, + match_started_at, + ) + return MatchRunResult( + commit_count=commit_count, + results=results, + ) + + +def match_round_order( + states: list[tuple[MatchParticipantRuntime, Any]], + order: str, + rng: random.Random, + round_number: int, +) -> list[tuple[MatchParticipantRuntime, Any]]: + active = [(participant, state) for participant, state in states if not state.completed] + if order == "fixed": + return active + if order == "shuffle": + shuffled = list(active) + rng.shuffle(shuffled) + return shuffled + if order == "rotate": + if not active: + return [] + offset = (round_number - 1) % len(active) + return active[offset:] + active[:offset] + raise ValueError(f"unknown match order: {order}") + + +def handle_match_disconnect( + gym: BbsGym, + participant: MatchParticipantRuntime, + state: Any, + scheduler: MatchSchedulerConfig, + match_log_path: Path, + round_number: int | None, + tick: int | None = None, +) -> None: + _write_match_event( + match_log_path, + { + "type": "participant_disconnected", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "reconnects": participant.reconnects, + "disconnect_policy": scheduler.disconnect_policy, + "timestamp": time.time(), + }, + ) + if scheduler.disconnect_policy == "stop": + return + + _close_agent(state.agent) + for attempt in range(participant.reconnects + 1, scheduler.max_reconnects + 1): + if scheduler.reconnect_delay: + time.sleep(scheduler.reconnect_delay) + try: + state.agent = gym.connect(participant.spec.agent_id, model_metadata=participant.model_metadata) + except (OSError, AccountConfigError, ValueError) as exc: + participant.reconnects = attempt + _write_match_event( + match_log_path, + { + "type": "participant_reconnect_failed", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "attempt": attempt, + "error": str(exc), + "timestamp": time.time(), + }, + ) + continue + + participant.reconnects = attempt + state.completed = False + state.stop_reason = "" + _write_match_event( + match_log_path, + { + "type": "participant_reconnected", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "attempt": attempt, + "timestamp": time.time(), + }, + ) + return + + state.stop_reason = "disconnect_reconnect_failed" + state.completed = True + + +def write_match_event(path: Path, event: dict[str, Any]) -> None: + _write_match_event(path, event) + + +def _run_sequential_match( + gym: BbsGym, + states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, + match_log_path: Path, + match_started_at: float, +) -> int: + rng = random.Random(scheduler.seed) + round_number = 0 + while ( + round_number < scheduler.max_rounds + and any(not state.completed for _, state in states) + and not _match_time_exhausted(scheduler, match_started_at) + ): + round_number += 1 + scheduled_states = match_round_order(states, scheduler.order, rng, round_number) + _write_round_started(match_log_path, round_number, scheduled_states, scheduler) + _write_match_event( + match_log_path, + { + "type": "commit_order", + "round": round_number, + "order": [participant.spec.agent_id for participant, _ in scheduled_states], + "match_order": scheduler.order, + "match_seed": scheduler.seed, + "timestamp": time.time(), + }, + ) + for participant, state in scheduled_states: + if _match_time_exhausted(scheduler, match_started_at): + break + started_at = time.monotonic() + _write_agent_step_started(match_log_path, round_number, participant, phase="started") + step = participant.runner.run_step(state) + _write_match_event( + match_log_path, + { + "type": "agent_step_completed", + "round": round_number, + "agent_id": participant.spec.agent_id, + "elapsed_seconds": time.monotonic() - started_at, + "timestamp": time.time(), + }, + ) + _write_agent_step_event(match_log_path, round_number, participant, state, step) + if state.completed and state.stop_reason == "disconnected": + handle_match_disconnect(gym, participant, state, scheduler, match_log_path, round_number) + _write_round_completed(match_log_path, round_number, states) + return round_number + + +def _run_parallel_barrier_match( + gym: BbsGym, + states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, + match_log_path: Path, + match_started_at: float, +) -> int: + rng = random.Random(scheduler.seed) + round_number = 0 + while ( + round_number < scheduler.max_rounds + and any(not state.completed for _, state in states) + and not _match_time_exhausted(scheduler, match_started_at) + ): + round_number += 1 + scheduled_states = match_round_order(states, scheduler.order, rng, round_number) + _write_round_started(match_log_path, round_number, scheduled_states, scheduler) + preparations = _prepare_parallel_steps(scheduled_states, scheduler, match_log_path, round_number) + _write_match_event( + match_log_path, + { + "type": "commit_order", + "round": round_number, + "order": [participant.spec.agent_id for participant, _ in scheduled_states], + "match_order": scheduler.order, + "match_seed": scheduler.seed, + "commit_policy": "barrier_order", + "timestamp": time.time(), + }, + ) + for participant, state in scheduled_states: + prepared = preparations.get(participant.spec.agent_id) + _commit_parallel_step(gym, participant, state, prepared, scheduler, match_log_path, round_number) + _write_round_completed(match_log_path, round_number, states) + return round_number + + +def _run_parallel_race_match( + gym: BbsGym, + states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, + match_log_path: Path, + match_started_at: float, +) -> int: + rng = random.Random(scheduler.seed) + round_number = 0 + while ( + round_number < scheduler.max_rounds + and any(not state.completed for _, state in states) + and not _match_time_exhausted(scheduler, match_started_at) + ): + round_number += 1 + scheduled_states = match_round_order(states, scheduler.order, rng, round_number) + _write_round_started(match_log_path, round_number, scheduled_states, scheduler) + commit_order: list[str] = [] + futures: dict[Future[PreparedActivityStep | None], tuple[MatchParticipantRuntime, Any, float]] = {} + with ThreadPoolExecutor(max_workers=_max_workers(scheduler, len(scheduled_states))) as executor: + for participant, state in scheduled_states: + _write_agent_step_started(match_log_path, round_number, participant, phase="queued") + futures[executor.submit(participant.runner.prepare_step, state)] = ( + participant, + state, + time.monotonic(), + ) + for future in as_completed(futures): + participant, state, started_at = futures[future] + prepared = _future_preparation(future, participant, state, match_log_path, round_number) + _write_agent_decision_completed( + match_log_path, + round_number, + participant, + elapsed_seconds=time.monotonic() - started_at, + ) + commit_order.append(participant.spec.agent_id) + _commit_parallel_step(gym, participant, state, prepared, scheduler, match_log_path, round_number) + _write_match_event( + match_log_path, + { + "type": "commit_order", + "round": round_number, + "order": commit_order, + "match_order": scheduler.order, + "match_seed": scheduler.seed, + "commit_policy": "race_completion", + "timestamp": time.time(), + }, + ) + _write_round_completed(match_log_path, round_number, states) + return round_number + + +def _run_continuous_match( + gym: BbsGym, + states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, + match_log_path: Path, + match_started_at: float, +) -> int: + rng = random.Random(scheduler.seed) + initial_order = match_round_order(states, scheduler.order, rng, 1) + committed_ticks = 0 + queued_ticks = 0 + futures: dict[Future[PreparedActivityStep | None], tuple[MatchParticipantRuntime, Any, float, int]] = {} + + def queue_step( + executor: ThreadPoolExecutor, + participant: MatchParticipantRuntime, + state: Any, + ) -> None: + nonlocal queued_ticks + queued_ticks += 1 + _write_agent_step_started(match_log_path, None, participant, phase="queued", tick=queued_ticks) + futures[executor.submit(participant.runner.prepare_step, state)] = ( + participant, + state, + time.monotonic(), + queued_ticks, + ) + + with ThreadPoolExecutor(max_workers=_max_workers(scheduler, len(initial_order))) as executor: + for participant, state in initial_order: + if queued_ticks < scheduler.max_rounds and not _match_time_exhausted(scheduler, match_started_at): + queue_step(executor, participant, state) + + while futures and not _match_time_exhausted(scheduler, match_started_at): + done, _ = wait( + futures, + timeout=_match_wall_seconds_remaining(scheduler, match_started_at), + return_when=FIRST_COMPLETED, + ) + if not done: + break + for future in done: + participant, state, started_at, queued_tick = futures.pop(future) + prepared = _future_preparation(future, participant, state, match_log_path, None, tick=queued_tick) + _write_agent_decision_completed( + match_log_path, + None, + participant, + elapsed_seconds=time.monotonic() - started_at, + tick=queued_tick, + ) + committed_ticks += 1 + _write_match_event( + match_log_path, + { + "type": "commit_order", + "tick": committed_ticks, + "order": [participant.spec.agent_id], + "match_order": scheduler.order, + "match_seed": scheduler.seed, + "commit_policy": "continuous_completion", + "queued_tick": queued_tick, + "timestamp": time.time(), + }, + ) + _commit_parallel_step( + gym, + participant, + state, + prepared, + scheduler, + match_log_path, + None, + tick=committed_ticks, + ) + if ( + not state.completed + and state.budget.remaining() + and queued_ticks < scheduler.max_rounds + and not _match_time_exhausted(scheduler, match_started_at) + ): + queue_step(executor, participant, state) + return committed_ticks + + +def _prepare_parallel_steps( + scheduled_states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, + match_log_path: Path, + round_number: int, +) -> dict[str, PreparedActivityStep | None]: + preparations: dict[str, PreparedActivityStep | None] = {} + futures: dict[Future[PreparedActivityStep | None], tuple[MatchParticipantRuntime, Any, float]] = {} + with ThreadPoolExecutor(max_workers=_max_workers(scheduler, len(scheduled_states))) as executor: + for participant, state in scheduled_states: + _write_agent_step_started(match_log_path, round_number, participant, phase="queued") + futures[executor.submit(participant.runner.prepare_step, state)] = ( + participant, + state, + time.monotonic(), + ) + for future in as_completed(futures): + participant, state, started_at = futures[future] + preparations[participant.spec.agent_id] = _future_preparation( + future, + participant, + state, + match_log_path, + round_number, + ) + _write_agent_decision_completed( + match_log_path, + round_number, + participant, + elapsed_seconds=time.monotonic() - started_at, + ) + return preparations + + +def _future_preparation( + future: Future[PreparedActivityStep | None], + participant: MatchParticipantRuntime, + state: Any, + match_log_path: Path, + round_number: int | None, + tick: int | None = None, +) -> PreparedActivityStep | None: + try: + return future.result() + except Exception as exc: + state.stop_reason = "scheduler_error" + state.completed = True + _write_match_event( + match_log_path, + { + "type": "agent_step_failed", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "error": str(exc), + "timestamp": time.time(), + }, + ) + return None + + +def _commit_parallel_step( + gym: BbsGym, + participant: MatchParticipantRuntime, + state: Any, + prepared: PreparedActivityStep | None, + scheduler: MatchSchedulerConfig, + match_log_path: Path, + round_number: int | None, + tick: int | None = None, +) -> None: + started_at = time.monotonic() + step = participant.runner.commit_prepared_step(state, prepared) + _write_match_event( + match_log_path, + { + "type": "agent_step_completed", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "elapsed_seconds": time.monotonic() - started_at, + "timestamp": time.time(), + }, + ) + _write_agent_step_event(match_log_path, round_number, participant, state, step, tick=tick) + if state.completed and state.stop_reason == "disconnected": + handle_match_disconnect(gym, participant, state, scheduler, match_log_path, round_number, tick=tick) + + +def _max_workers(scheduler: MatchSchedulerConfig, active_count: int) -> int: + if active_count <= 0: + return 1 + if scheduler.max_workers is None: + return active_count + return max(1, min(scheduler.max_workers, active_count)) + + +def _match_wall_seconds_remaining(scheduler: MatchSchedulerConfig, match_started_at: float) -> float: + return max(0.0, scheduler.max_wall_seconds - (time.monotonic() - match_started_at)) + + +def _match_time_exhausted(scheduler: MatchSchedulerConfig, match_started_at: float) -> bool: + return _match_wall_seconds_remaining(scheduler, match_started_at) <= 0 + + +def _mark_active_states_completed(states: list[tuple[MatchParticipantRuntime, Any]], stop_reason: str) -> None: + for _, state in states: + if not state.completed: + state.stop_reason = stop_reason + state.completed = True + + +def _match_limit_stop_reason(scheduler: MatchSchedulerConfig) -> str: + return "match_ticks" if scheduler.mode == "continuous" else "match_rounds" + + +def _commit_count(results: list[tuple[MatchParticipantRuntime, ActivityResult]]) -> int: + return sum(len(result.steps) for _, result in results) + + +def _event_clock(round_number: int | None, tick: int | None = None) -> dict[str, int]: + event: dict[str, int] = {} + if round_number is not None: + event["round"] = round_number + if tick is not None: + event["tick"] = tick + return event + + +def _write_round_started( + match_log_path: Path, + round_number: int, + scheduled_states: list[tuple[MatchParticipantRuntime, Any]], + scheduler: MatchSchedulerConfig, +) -> None: + _write_match_event( + match_log_path, + { + "type": "round_started", + "round": round_number, + "agents": [participant.spec.agent_id for participant, _ in scheduled_states], + "scheduler_mode": scheduler.mode, + "match_order": scheduler.order, + "match_seed": scheduler.seed, + "timestamp": time.time(), + }, + ) + + +def _write_match_started( + match_log_path: Path, + participants: list[MatchParticipantRuntime], + scheduler: MatchSchedulerConfig, +) -> None: + _write_match_event( + match_log_path, + { + "type": "match_started", + "scheduler": _scheduler_event_dict(scheduler), + "participants": [ + { + "agent_id": participant.spec.agent_id, + "provider": participant.spec.provider, + "model": participant.spec.model, + "model_metadata": participant.model_metadata, + "agent_log_path": str(participant.log_path), + } + for participant in participants + ], + "timestamp": time.time(), + }, + ) + + +def _write_match_completed( + match_log_path: Path, + scheduler_count: int, + commit_count: int, + results: list[tuple[MatchParticipantRuntime, ActivityResult]], + scheduler: MatchSchedulerConfig, + match_started_at: float, + *, + clean_exit: bool = True, + error: str = "", +) -> None: + _write_match_event( + match_log_path, + { + "type": "match_completed", + "clean_exit": clean_exit, + "commit_count": commit_count, + "elapsed_seconds": time.monotonic() - match_started_at, + "error": error, + "scheduler": _scheduler_event_dict(scheduler), + "scheduler_count": scheduler_count, + "scheduler_count_unit": "ticks" if scheduler.mode == "continuous" else "rounds", + "results": [ + { + "agent_id": result.agent_id, + "steps": len(result.steps), + "stop_reason": result.stop_reason, + "activity": result.activity, + "agent_log_path": str(participant.log_path), + } + for participant, result in results + ], + "timestamp": time.time(), + }, + ) + + +def _scheduler_event_dict(scheduler: MatchSchedulerConfig) -> dict[str, Any]: + return { + "mode": scheduler.mode, + "order": scheduler.order, + "seed": scheduler.seed, + "disconnect_policy": scheduler.disconnect_policy, + "max_reconnects": scheduler.max_reconnects, + "reconnect_delay": scheduler.reconnect_delay, + "max_rounds": scheduler.max_rounds, + "max_decision_ticks": scheduler.max_decision_ticks, + "max_wall_seconds": scheduler.max_wall_seconds, + "max_workers": scheduler.max_workers, + } + + +def _write_agent_step_started( + match_log_path: Path, + round_number: int | None, + participant: MatchParticipantRuntime, + *, + phase: Literal["queued", "started"], + tick: int | None = None, +) -> None: + _write_match_event( + match_log_path, + { + "type": "agent_step_started", + "phase": phase, + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "timestamp": time.time(), + }, + ) + + +def _write_agent_decision_completed( + match_log_path: Path, + round_number: int | None, + participant: MatchParticipantRuntime, + elapsed_seconds: float, + tick: int | None = None, +) -> None: + _write_match_event( + match_log_path, + { + "type": "agent_decision_completed", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "elapsed_seconds": elapsed_seconds, + "timestamp": time.time(), + }, + ) + + +def _write_round_completed( + match_log_path: Path, + round_number: int, + states: list[tuple[MatchParticipantRuntime, Any]], +) -> None: + _write_match_event( + match_log_path, + { + "type": "round_completed", + "round": round_number, + "active_agents": [participant.spec.agent_id for participant, state in states if not state.completed], + "timestamp": time.time(), + }, + ) + + +def _write_agent_step_event( + match_log_path: Path, + round_number: int | None, + participant: MatchParticipantRuntime, + state: Any, + step: Any, + tick: int | None = None, +) -> None: + _write_match_event( + match_log_path, + { + "type": "agent_step", + **_event_clock(round_number, tick), + "agent_id": participant.spec.agent_id, + "step": step.step if step is not None else None, + "completed": state.completed, + "stop_reason": state.stop_reason if state.completed else "", + "active_profile": (state.active_profile.name if state.active_profile is not None else ""), + "action": step.action if step is not None else None, + "agent_log_path": str(participant.log_path), + "timestamp": step.timestamp if step is not None else None, + }, + ) + + +def _write_match_event(path: Path, event: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(event, sort_keys=True) + "\n") + + +def _close_agent(agent: object) -> None: + if isinstance(agent, ClosableAgent): + agent.close() diff --git a/packages/tty-agent/src/tty_agent/__init__.py b/packages/tty-agent/src/tty_agent/__init__.py index da1fb7c..cb76f33 100644 --- a/packages/tty-agent/src/tty_agent/__init__.py +++ b/packages/tty-agent/src/tty_agent/__init__.py @@ -6,6 +6,7 @@ "ActivityBudget", "ActivityProfile", "ActivityRoute", + "ActivityRunState", "ActivityRunner", "AnthropicAdapter", "ClaudeCliAdapter", @@ -16,6 +17,8 @@ "InputModalityProfile", "InputModeRule", "JsonMemoryStore", + "ModelError", + "ModelTimeoutError", "Observation", "ObservationHints", "OpenAICompatibleAdapter", @@ -45,13 +48,22 @@ from .ansi import strip_ansi from .hints import InputModalityProfile, InputModeRule, ObservationHints from .memory import JsonMemoryStore -from .models import AnthropicAdapter, ClaudeCliAdapter, CodexCliAdapter, OpenAICompatibleAdapter, ScriptedModelAdapter +from .models import ( + AnthropicAdapter, + ClaudeCliAdapter, + CodexCliAdapter, + ModelError, + ModelTimeoutError, + OpenAICompatibleAdapter, + ScriptedModelAdapter, +) from .prompt_modules import GENERIC_TERMINAL_MODULES, FullScreenModule, PromptModule, PromptRenderContext from .profiles import EMPTY_PROFILE, SHELL_PROFILE, TEXT_ADVENTURE_PROFILE, PromptProfile from .runner import ( ActivityBudget, ActivityProfile, ActivityRoute, + ActivityRunState, ActivityRunner, PromptLayout, PromptMode, diff --git a/packages/tty-agent/src/tty_agent/models.py b/packages/tty-agent/src/tty_agent/models.py index 1b5b180..8ba1195 100644 --- a/packages/tty-agent/src/tty_agent/models.py +++ b/packages/tty-agent/src/tty_agent/models.py @@ -28,6 +28,27 @@ def to_dict(self) -> dict[str, str]: return {"role": self.role, "content": self.content} +class ModelError(RuntimeError): + """Raised when a model provider fails before producing a parseable response.""" + + def __init__( + self, + message: str, + *, + command: list[str] | tuple[str, ...] = (), + stdout: str | bytes | None = "", + stderr: str | bytes | None = "", + ) -> None: + super().__init__(message) + self.command = tuple(command) + self.stdout = _subprocess_text(stdout) + self.stderr = _subprocess_text(stderr) + + +class ModelTimeoutError(ModelError): + """Raised when a model provider command times out.""" + + @dataclass(frozen=True) class DecisionPrompt: system: str @@ -284,7 +305,7 @@ def __init__( model: str | None = None, profile: str | None = None, executable: str = "codex", - timeout: float = 300.0, + timeout: float = 600.0, sandbox: str = "read-only", cwd: str | Path | None = None, extra_args: list[str] | None = None, @@ -327,16 +348,34 @@ def chat(self, messages: list[ModelMessage]) -> str: check=False, ) except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"codex exec timed out after {self.timeout:g}s") from exc + stdout = _subprocess_text(exc.stdout or exc.output) + stderr = _subprocess_text(exc.stderr) + detail = _command_failure_detail(stdout, stderr) + raise ModelTimeoutError( + f"codex exec timed out after {self.timeout:g}s: {detail}", + command=command, + stdout=stdout, + stderr=stderr, + ) from exc except OSError as exc: - raise RuntimeError(f"failed to run codex executable {self.executable!r}: {exc}") from exc + raise ModelError(f"failed to run codex executable {self.executable!r}: {exc}", command=command) from exc if result.returncode != 0: detail = _command_failure_detail(result.stdout, result.stderr) - raise RuntimeError(f"codex exec failed with exit code {result.returncode}: {detail}") + raise ModelError( + f"codex exec failed with exit code {result.returncode}: {detail}", + command=command, + stdout=result.stdout, + stderr=result.stderr, + ) if self.stateful and self.session_id is None: self.session_id = _extract_codex_session_id(result.stdout) if self.session_id is None: - raise RuntimeError("codex exec did not report a session id in --json output") + raise ModelError( + "codex exec did not report a session id in --json output", + command=command, + stdout=result.stdout, + stderr=result.stderr, + ) self._write_session_file() if output_path.exists(): output = output_path.read_text(encoding="utf-8").strip() @@ -408,14 +447,14 @@ def __init__( self, model: str | None = None, executable: str = "claude", - timeout: float = 300.0, + timeout: float = 600.0, cwd: str | Path | None = None, extra_args: list[str] | None = None, stateful: bool = False, session_id: str | None = None, session_file: str | Path | None = None, permission_mode: str | None = "dontAsk", - tools: str | None = "", + tools: str | None = None, bare: bool = False, name: str | None = None, output_filters: tuple[OutputFilter, ...] | None = None, @@ -452,18 +491,36 @@ def chat(self, messages: list[ModelMessage]) -> str: check=False, ) except subprocess.TimeoutExpired as exc: - raise RuntimeError(f"claude -p timed out after {self.timeout:g}s") from exc + stdout = _subprocess_text(exc.stdout or exc.output) + stderr = _subprocess_text(exc.stderr) + detail = _command_failure_detail(stdout, stderr) + raise ModelTimeoutError( + f"claude -p timed out after {self.timeout:g}s: {detail}", + command=command, + stdout=stdout, + stderr=stderr, + ) from exc except OSError as exc: - raise RuntimeError(f"failed to run claude executable {self.executable!r}: {exc}") from exc + raise ModelError(f"failed to run claude executable {self.executable!r}: {exc}", command=command) from exc if result.returncode != 0: detail = _command_failure_detail(result.stdout, result.stderr) - raise RuntimeError(f"claude -p failed with exit code {result.returncode}: {detail}") + raise ModelError( + f"claude -p failed with exit code {result.returncode}: {detail}", + command=command, + stdout=result.stdout, + stderr=result.stderr, + ) output, session_id = _parse_claude_json_result(result.stdout) if self.stateful and self.session_id is None: self.session_id = session_id if self.session_id is None: - raise RuntimeError("claude -p did not report a session id in JSON output") + raise ModelError( + "claude -p did not report a session id in JSON output", + command=command, + stdout=result.stdout, + stderr=result.stderr, + ) self._write_session_file() return output or result.stdout.strip() @@ -486,7 +543,7 @@ def _command(self) -> list[str]: command.extend(["--model", self.model]) if self.permission_mode: command.extend(["--permission-mode", self.permission_mode]) - if self.tools is not None: + if self.tools and self.tools.strip(): command.extend(["--tools", self.tools]) command.extend(self.extra_args) return command @@ -614,6 +671,14 @@ def _claude_prompt_text(messages: list[ModelMessage]) -> str: return "\n".join(lines).rstrip() + "\n" +def _subprocess_text(value: str | bytes | None) -> str: + if value is None: + return "" + if isinstance(value, bytes): + return value.decode("utf-8", errors="replace") + return value + + def _command_failure_detail(stdout: str, stderr: str) -> str: detail = "\n".join(part for part in (stderr.strip(), stdout.strip()) if part) if not detail: diff --git a/packages/tty-agent/src/tty_agent/runner.py b/packages/tty-agent/src/tty_agent/runner.py index 2e8bd41..0be1ca4 100644 --- a/packages/tty-agent/src/tty_agent/runner.py +++ b/packages/tty-agent/src/tty_agent/runner.py @@ -12,7 +12,15 @@ from .actions import Action, ActionError, ActionPolicy, render_action_schema from .hints import InputModalityProfile, ObservationHints from .memory import JsonMemoryStore -from .models import CompactionPrompt, DecisionPrompt, MemoryCommitPrompt, ModelAdapter, SessionSummary +from .models import ( + CompactionPrompt, + DecisionPrompt, + MemoryCommitPrompt, + MemoryPatch, + ModelAdapter, + ModelError, + SessionSummary, +) from .prompt_modules import ( GENERIC_TERMINAL_MODULES, PROMPT_MODULES_SCHEMA_VERSION, @@ -88,6 +96,7 @@ class ActivityProfile: compact_every_steps: int = 20 compact_recent_chars: int = 12_000 invalid_json_retries: int = 1 + model_error_retries: int = 1 include_model_responses_in_context: bool = False prompt_mode: PromptMode = "stateless_full" prompt_layout: PromptLayout = "timeline_first" @@ -155,6 +164,37 @@ class ActivityRoute: reason: str = "" +@dataclass +class ActivityRunState: + agent: TerminalAgent + model: ModelAdapter + budget: ActivityBudget + agent_id: str + campaign_memory: dict[str, Any] + session_summary: SessionSummary = field(default_factory=SessionSummary) + recent_steps: list[StepRecord] = field(default_factory=list) + all_steps: list[StepRecord] = field(default_factory=list) + stop_reason: str = "budget" + last_observation: Observation | None = None + previous_observation: Observation | None = None + last_action_for_hints: Action | None = None + active_profile: ActivityProfile | None = None + decision_prompts_sent: dict[str, int] = field(default_factory=dict) + completed: bool = False + + +@dataclass +class PreparedActivityStep: + observation: Observation | None = None + active_profile: ActivityProfile | None = None + route_events: list[dict[str, Any]] = field(default_factory=list) + prompt: DecisionPrompt | None = None + prompt_module_results: list[PromptModuleResult] = field(default_factory=list) + action: Action | None = None + validation: dict[str, Any] = field(default_factory=dict) + terminal_step: StepRecord | None = None + + class ActivityRunner: def __init__( self, @@ -171,180 +211,270 @@ def __init__( def run(self, agent: TerminalAgent, model: ModelAdapter, budget: ActivityBudget | None = None) -> ActivityResult: return self._run(agent, model, budget, stop_on_completion=True) - def _run( + def start_state( self, agent: TerminalAgent, model: ModelAdapter, budget: ActivityBudget | None = None, + ) -> ActivityRunState: + agent_id = getattr(agent, "agent_id", "agent") + return ActivityRunState( + agent=agent, + model=model, + budget=budget or ActivityBudget(), + agent_id=agent_id, + campaign_memory=self.memory_store.load(agent_id), + active_profile=self.profile, + ) + + def run_step( + self, + state: ActivityRunState, profile_selector: Callable[[Observation, ActivityProfile], tuple[ActivityProfile, list[dict[str, Any]]]] | None = None, stop_on_completion: bool = True, - ) -> ActivityResult: - budget = budget or ActivityBudget() - agent_id = getattr(agent, "agent_id", "agent") - campaign_memory = self.memory_store.load(agent_id) - session_summary = SessionSummary() - recent_steps: list[StepRecord] = [] - all_steps: list[StepRecord] = [] - stop_reason = "budget" - last_observation: Observation | None = None - previous_observation: Observation | None = None - last_action_for_hints: Action | None = None - active_profile = self.profile - decision_prompts_sent: dict[str, int] = {} - - while budget.remaining(): - try: - observation = agent.observe_turn( - timeout=active_profile.observe_timeout, - stable_ms=active_profile.stable_ms, - byte_quiet_ms=active_profile.byte_quiet_ms, - poll_interval=active_profile.poll_interval, - prompt_fast_path=active_profile.prompt_fast_path, - ) - except SessionDisconnected: - stop_reason = "disconnected" - break - - route_events: list[dict[str, Any]] = [] - if profile_selector is not None: - selected_profile, route_events = profile_selector(observation, active_profile) - active_profile = selected_profile - self.profile = active_profile - last_observation = observation - - if stop_on_completion and active_profile.should_exit(observation, None, budget): - stop_reason = "profile_complete" - step = self._terminal_step_record( - step_number=len(all_steps) + 1, + ) -> StepRecord | None: + prepared = self.prepare_step( + state, + profile_selector=profile_selector, + stop_on_completion=stop_on_completion, + ) + return self.commit_prepared_step(state, prepared, stop_on_completion=stop_on_completion) + + def prepare_step( + self, + state: ActivityRunState, + profile_selector: Callable[[Observation, ActivityProfile], tuple[ActivityProfile, list[dict[str, Any]]]] + | None = None, + stop_on_completion: bool = True, + ) -> PreparedActivityStep | None: + if state.completed: + return None + if not state.budget.remaining(): + state.stop_reason = "budget" + state.completed = True + return None + + active_profile = state.active_profile or self.profile + try: + observation = state.agent.observe_turn( + timeout=active_profile.observe_timeout, + stable_ms=active_profile.stable_ms, + byte_quiet_ms=active_profile.byte_quiet_ms, + poll_interval=active_profile.poll_interval, + prompt_fast_path=active_profile.prompt_fast_path, + ) + except SessionDisconnected: + state.stop_reason = "disconnected" + state.completed = True + return None + + route_events: list[dict[str, Any]] = [] + if profile_selector is not None: + selected_profile, route_events = profile_selector(observation, active_profile) + active_profile = selected_profile + state.active_profile = active_profile + state.last_observation = observation + + if stop_on_completion and active_profile.should_exit(observation, None, state.budget): + state.stop_reason = "profile_complete" + state.completed = True + return PreparedActivityStep( + terminal_step=self._record_terminal_step( + state, observation=observation, - budget=budget, - stop_reason=stop_reason, + stop_reason=state.stop_reason, active_profile=active_profile, events=route_events, ) - all_steps.append(step) - recent_steps.append(step) - self._write_step(step) - break - - if not budget.remaining(): - stop_reason = "budget" - step = self._terminal_step_record( - step_number=len(all_steps) + 1, + ) + + if not state.budget.remaining(): + state.stop_reason = "budget" + state.completed = True + return PreparedActivityStep( + terminal_step=self._record_terminal_step( + state, observation=observation, - budget=budget, - stop_reason=stop_reason, + stop_reason=state.stop_reason, active_profile=active_profile, events=route_events, ) - all_steps.append(step) - recent_steps.append(step) - self._write_step(step) - break - - if self._should_compact(all_steps, recent_steps): - session_summary = self._compact(model, session_summary, recent_steps, observation) - recent_steps = recent_steps[-self.profile.recent_steps_to_keep:] - - hints = ObservationHints.from_observation( - observation=observation, - previous_observation=previous_observation, - last_action=last_action_for_hints, - modality_profile=self.profile.input_modality_profile, - ) - prompt_module_results = self._prompt_module_results( - agent_id=agent_id, - observation=observation, - hints=hints, - campaign_memory=campaign_memory, - session_summary=session_summary, - recent_steps=recent_steps, - budget=budget, ) - profile_prompt_count = decision_prompts_sent.get(active_profile.name, 0) - prompt_stage = self._prompt_stage(profile_prompt_count) - prompt = self._build_decision_prompt( - agent_id=agent_id, - campaign_memory=campaign_memory, - session_summary=session_summary, - recent_steps=recent_steps, - budget=budget, - prompt_module_results=prompt_module_results, - prompt_stage=prompt_stage, + + if self._should_compact(active_profile, state.all_steps, state.recent_steps): + state.session_summary = self._compact( + active_profile, + state.model, + state.session_summary, + state.recent_steps, + observation, ) + state.recent_steps = state.recent_steps[-active_profile.recent_steps_to_keep:] - action, validation = self._decide_with_retry(model, prompt) - decision_prompts_sent[active_profile.name] = profile_prompt_count + 1 - executed_action = action - execution: dict[str, Any] = {} - if action is None: - budget.record_validation_failure() + hints = ObservationHints.from_observation( + observation=observation, + previous_observation=state.previous_observation, + last_action=state.last_action_for_hints, + modality_profile=active_profile.input_modality_profile, + ) + prompt_module_results = self._prompt_module_results( + active_profile, + agent_id=state.agent_id, + observation=observation, + hints=hints, + campaign_memory=state.campaign_memory, + session_summary=state.session_summary, + recent_steps=state.recent_steps, + budget=state.budget, + ) + profile_prompt_count = state.decision_prompts_sent.get(active_profile.name, 0) + prompt_stage = self._prompt_stage(active_profile, profile_prompt_count) + prompt = self._build_decision_prompt( + active_profile, + agent_id=state.agent_id, + campaign_memory=state.campaign_memory, + session_summary=state.session_summary, + recent_steps=state.recent_steps, + budget=state.budget, + prompt_module_results=prompt_module_results, + prompt_stage=prompt_stage, + ) - budget.consume_tick() + action, validation = self._decide_with_retry(active_profile, state.model, prompt) + state.decision_prompts_sent[active_profile.name] = profile_prompt_count + 1 + return PreparedActivityStep( + observation=observation, + active_profile=active_profile, + route_events=route_events, + prompt=prompt, + prompt_module_results=prompt_module_results, + action=action, + validation=validation, + ) - if action is not None: - try: - execution = self._execution_record(agent.act_action(action)) - except ActionError as exc: - executed_action = None - budget.record_validation_failure() - validation = self._execution_error_validation(validation, str(exc)) - - step = StepRecord( - step=budget.decision_ticks, - observation=observation.as_dict(), - prompt={ - "system": prompt.system, - "user": prompt.user, - "mode": prompt.mode, - "stage": prompt.stage, - "layout": active_profile.prompt_layout, - }, - action=action.to_dict() if action else None, - validation=validation, - execution=execution, - budget=budget.to_dict(), - prompt_modules=prompt_module_trace(prompt_module_results), - active_profile=active_profile.name, - run_objective=self.run_objective, - events=route_events, + def commit_prepared_step( + self, + state: ActivityRunState, + prepared: PreparedActivityStep | None, + stop_on_completion: bool = True, + ) -> StepRecord | None: + if prepared is None: + return None + if prepared.terminal_step is not None: + return prepared.terminal_step + if prepared.observation is None or prepared.active_profile is None or prepared.prompt is None: + return None + + observation = prepared.observation + active_profile = prepared.active_profile + prompt = prepared.prompt + action = prepared.action + validation = prepared.validation + route_events = prepared.route_events + prompt_module_results = prepared.prompt_module_results + executed_action = action + execution: dict[str, Any] = {} + if action is None: + state.budget.record_validation_failure() + + state.budget.consume_tick() + + if action is not None: + try: + execution = self._execution_record(state.agent.act_action(action)) + except ActionError as exc: + executed_action = None + state.budget.record_validation_failure() + validation = self._execution_error_validation(validation, str(exc)) + + step = StepRecord( + step=state.budget.decision_ticks, + observation=observation.as_dict(), + prompt={ + "system": prompt.system, + "user": prompt.user, + "mode": prompt.mode, + "stage": prompt.stage, + "layout": active_profile.prompt_layout, + }, + action=action.to_dict() if action else None, + validation=validation, + execution=execution, + budget=state.budget.to_dict(), + prompt_modules=prompt_module_trace(prompt_module_results), + active_profile=active_profile.name, + run_objective=self.run_objective, + events=route_events, + ) + state.all_steps.append(step) + state.recent_steps.append(step) + state.recent_steps = state.recent_steps[-active_profile.recent_steps_to_keep:] + self._write_step(step) + state.previous_observation = observation + state.last_action_for_hints = executed_action + + if state.budget.too_many_validation_failures(): + state.stop_reason = "validation_failures" + state.completed = True + elif executed_action and executed_action.action == "hangup": + state.stop_reason = "hangup" + state.completed = True + elif stop_on_completion and active_profile.should_exit(observation, None, state.budget): + state.stop_reason = "profile_complete" + state.completed = True + elif not state.budget.remaining(): + state.stop_reason = "budget" + state.completed = True + return step + + def finish_state(self, state: ActivityRunState) -> ActivityResult: + if self._has_decision_steps(state.all_steps) and state.last_observation is not None: + active_profile = state.active_profile or self.profile + patch = self._commit_memory( + active_profile, + state.model, + state.campaign_memory, + state.session_summary, + state.recent_steps, + state.last_observation, ) - all_steps.append(step) - recent_steps.append(step) - recent_steps = recent_steps[-self.profile.recent_steps_to_keep:] - self._write_step(step) - previous_observation = observation - last_action_for_hints = executed_action - - if budget.too_many_validation_failures(): - stop_reason = "validation_failures" - break - if executed_action and executed_action.action == "hangup": - stop_reason = "hangup" - break - if stop_on_completion and active_profile.should_exit(observation, None, budget): - stop_reason = "profile_complete" - break - if not budget.remaining(): - stop_reason = "budget" - break - - if self._has_decision_steps(all_steps) and last_observation is not None: - patch = self._commit_memory(model, campaign_memory, session_summary, recent_steps, last_observation) - self.memory_store.save_patch(agent_id, patch) + self.memory_store.save_patch(state.agent_id, patch) + active_profile = state.active_profile or self.profile return ActivityResult( - activity=self.profile.name, - agent_id=agent_id, - steps=all_steps, - session_summary=session_summary, - stop_reason=stop_reason, + activity=active_profile.name, + agent_id=state.agent_id, + steps=state.all_steps, + session_summary=state.session_summary, + stop_reason=state.stop_reason, run_objective=self.run_objective, ) + def _run( + self, + agent: TerminalAgent, + model: ModelAdapter, + budget: ActivityBudget | None = None, + profile_selector: Callable[[Observation, ActivityProfile], tuple[ActivityProfile, list[dict[str, Any]]]] + | None = None, + stop_on_completion: bool = True, + ) -> ActivityResult: + state = self.start_state(agent, model, budget) + while not state.completed and state.budget.remaining(): + self.run_step( + state, + profile_selector=profile_selector, + stop_on_completion=stop_on_completion, + ) + if not state.completed and not state.budget.remaining(): + state.stop_reason = "budget" + state.completed = True + return self.finish_state(state) + def _build_decision_prompt( self, + profile: ActivityProfile, agent_id: str, campaign_memory: dict[str, Any], session_summary: SessionSummary, @@ -353,8 +483,9 @@ def _build_decision_prompt( prompt_module_results: list[PromptModuleResult], prompt_stage: PromptStage, ) -> DecisionPrompt: - if self.profile.prompt_mode == "stateful_delta" and prompt_stage == "delta": + if profile.prompt_mode == "stateful_delta" and prompt_stage == "delta": return self._build_stateful_delta_prompt( + profile, agent_id=agent_id, session_summary=session_summary, recent_steps=recent_steps, @@ -362,6 +493,7 @@ def _build_decision_prompt( prompt_module_results=prompt_module_results, ) return self._build_stateless_full_prompt( + profile, agent_id=agent_id, campaign_memory=campaign_memory, session_summary=session_summary, @@ -373,6 +505,7 @@ def _build_decision_prompt( def _build_stateless_full_prompt( self, + profile: ActivityProfile, agent_id: str, campaign_memory: dict[str, Any], session_summary: SessionSummary, @@ -381,9 +514,10 @@ def _build_stateless_full_prompt( prompt_module_results: list[PromptModuleResult], prompt_stage: PromptStage, ) -> DecisionPrompt: - system = self._build_full_system_prompt(prompt_stage) - if self.profile.prompt_layout == "cache_friendly": + system = self._build_full_system_prompt(profile, prompt_stage) + if profile.prompt_layout == "cache_friendly": user = self._build_cache_friendly_user_prompt( + profile, agent_id=agent_id, campaign_memory=campaign_memory, session_summary=session_summary, @@ -393,6 +527,7 @@ def _build_stateless_full_prompt( ) else: user = self._build_timeline_first_user_prompt( + profile, agent_id=agent_id, campaign_memory=campaign_memory, session_summary=session_summary, @@ -400,26 +535,27 @@ def _build_stateless_full_prompt( budget=budget, prompt_module_results=prompt_module_results, ) - return DecisionPrompt(system=system, user=user, mode=self.profile.prompt_mode, stage=prompt_stage) + return DecisionPrompt(system=system, user=user, mode=profile.prompt_mode, stage=prompt_stage) - def _build_full_system_prompt(self, prompt_stage: PromptStage) -> str: + def _build_full_system_prompt(self, profile: ActivityProfile, prompt_stage: PromptStage) -> str: system_parts = [ "You are controlling an interactive terminal session.", "You may make mistakes and recover from them.", "Return only a JSON action object.", - render_action_schema(self.profile.action_policy), + render_action_schema(profile.action_policy), ] - if self.profile.prompt_mode == "stateful_delta" and prompt_stage == "bootstrap": + if profile.prompt_mode == "stateful_delta" and prompt_stage == "bootstrap": system_parts.append( "This is the stateful session bootstrap. Future prompts may omit stable instructions, campaign " "memory, and full recent-step history; keep this context active across resumed calls." ) - if self.profile.system_guidance: - system_parts.append(f"Activity-specific guidance:\n{self.profile.system_guidance}") + if profile.system_guidance: + system_parts.append(f"Activity-specific guidance:\n{profile.system_guidance}") return "\n".join(system_parts) def _build_timeline_first_user_prompt( self, + profile: ActivityProfile, agent_id: str, campaign_memory: dict[str, Any], session_summary: SessionSummary, @@ -429,14 +565,14 @@ def _build_timeline_first_user_prompt( ) -> str: module_text = render_prompt_modules(prompt_module_results) return "\n\n".join( - self._objective_prompt_lines() + self._objective_prompt_lines(profile) + [ f"Agent: {agent_id}", - f"Activity: {self.profile.name}", + f"Activity: {profile.name}", f"Budget: {json.dumps(budget.to_dict(), sort_keys=True)}", f"Campaign memory: {json.dumps(campaign_memory, indent=2, sort_keys=True)}", f"Session summary: {self._summary_text(session_summary)}", - f"Recent steps:\n{self._recent_steps_text(recent_steps)}", + f"Recent steps:\n{self._recent_steps_text(profile, recent_steps)}", "---", f"Current step: {budget.decision_ticks + 1}", module_text, @@ -446,6 +582,7 @@ def _build_timeline_first_user_prompt( def _build_cache_friendly_user_prompt( self, + profile: ActivityProfile, agent_id: str, campaign_memory: dict[str, Any], session_summary: SessionSummary, @@ -455,13 +592,13 @@ def _build_cache_friendly_user_prompt( ) -> str: stable_module_text = render_prompt_modules(prompt_module_results, levels=STATIC_PROMPT_MODULE_LEVELS) tactical_module_text = render_prompt_modules(prompt_module_results, levels=TACTICAL_PROMPT_MODULE_LEVELS) - sections = self._objective_prompt_lines() + [ + sections = self._objective_prompt_lines(profile) + [ f"Agent: {agent_id}", - f"Activity: {self.profile.name}", + f"Activity: {profile.name}", stable_module_text, f"Campaign memory: {json.dumps(campaign_memory, indent=2, sort_keys=True)}", f"Session summary: {self._summary_text(session_summary)}", - f"Recent steps:\n{self._recent_steps_text(recent_steps)}", + f"Recent steps:\n{self._recent_steps_text(profile, recent_steps)}", "---", f"Current step: {budget.decision_ticks + 1}", f"Budget: {json.dumps(budget.to_dict(), sort_keys=True)}", @@ -472,6 +609,7 @@ def _build_cache_friendly_user_prompt( def _build_stateful_delta_prompt( self, + profile: ActivityProfile, agent_id: str, session_summary: SessionSummary, recent_steps: list[StepRecord], @@ -488,13 +626,13 @@ def _build_stateful_delta_prompt( ) module_text = render_prompt_modules(prompt_module_results) user = "\n\n".join( - self._objective_prompt_lines(reminder=True) + self._objective_prompt_lines(profile, reminder=True) + [ f"Agent: {agent_id}", - f"Activity: {self.profile.name}", + f"Activity: {profile.name}", f"Budget: {json.dumps(budget.to_dict(), sort_keys=True)}", f"Session summary update: {self._summary_text(session_summary)}", - f"Previous step:\n{self._previous_step_delta_text(recent_steps)}", + f"Previous step:\n{self._previous_step_delta_text(profile, recent_steps)}", "---", f"Current step: {budget.decision_ticks + 1}", module_text, @@ -502,23 +640,25 @@ def _build_stateful_delta_prompt( "Return exactly one JSON action.", ] ) - return DecisionPrompt(system=system, user=user, mode=self.profile.prompt_mode, stage="delta") + return DecisionPrompt(system=system, user=user, mode=profile.prompt_mode, stage="delta") - def _objective_prompt_lines(self, *, reminder: bool = False) -> list[str]: + def _objective_prompt_lines(self, profile: ActivityProfile | None = None, *, reminder: bool = False) -> list[str]: + profile = profile or self.profile suffix = " reminder" if reminder else "" lines: list[str] = [] if self.run_objective: lines.append(f"Run objective{suffix}: {self.run_objective}") - lines.append(f"Profile objective{suffix}: {self.profile.objective}") + lines.append(f"Profile objective{suffix}: {profile.objective}") return lines - def _prompt_stage(self, decision_prompts_sent: int) -> PromptStage: - if self.profile.prompt_mode == "stateful_delta": + def _prompt_stage(self, profile: ActivityProfile, decision_prompts_sent: int) -> PromptStage: + if profile.prompt_mode == "stateful_delta": return "bootstrap" if decision_prompts_sent == 0 else "delta" return "full" def _prompt_module_results( self, + profile: ActivityProfile, agent_id: str, observation: Observation, hints: ObservationHints, @@ -529,8 +669,8 @@ def _prompt_module_results( ) -> list[PromptModuleResult]: context = PromptRenderContext( agent_id=agent_id, - activity_name=self.profile.name, - objective=self.profile.objective, + activity_name=profile.name, + objective=profile.objective, observation=observation, hints=hints, recent_steps=tuple(recent_steps), @@ -539,33 +679,106 @@ def _prompt_module_results( budget=budget, run_objective=self.run_objective, ) - return collect_prompt_module_results(self.profile.prompt_modules, context) + return collect_prompt_module_results(profile.prompt_modules, context) - def _decide_with_retry(self, model: ModelAdapter, prompt: DecisionPrompt) -> tuple[Action | None, dict[str, Any]]: + def _decide_with_retry( + self, + profile: ActivityProfile, + model: ModelAdapter, + prompt: DecisionPrompt, + ) -> tuple[Action | None, dict[str, Any]]: invalid_responses: list[dict[str, str]] = [] - try: - action = model.decide(prompt, self.profile.action_policy) - return action, {"accepted": True, "notes": [], "model_response": self._model_response_record(model)} - except ActionError as first_exc: - first_error = str(first_exc) - invalid_responses.append(self._invalid_response_record(model, "initial", first_error)) + model_errors: list[dict[str, Any]] = [] + + action, first_error, attempt_errors = self._try_model_decision(profile, model, prompt, "initial") + model_errors.extend(attempt_errors) + if action is not None: + return action, self._accepted_validation(model, model_errors=model_errors) + if first_error is None: + return None, self._model_error_validation(model_errors) + invalid_responses.append(self._invalid_response_record(model, "initial", first_error)) retry_prompt = prompt - for retry in range(1, self.profile.invalid_json_retries + 1): + for retry in range(1, profile.invalid_json_retries + 1): retry_prompt = self._build_retry_prompt(prompt, first_error, retry) + action, retry_error, attempt_errors = self._try_model_decision( + profile, + model, + retry_prompt, + f"repair-{retry}", + ) + model_errors.extend(attempt_errors) + if action is not None: + return action, self._accepted_validation( + model, + notes=[f"repaired_after_error: {first_error}"], + invalid_responses=invalid_responses, + model_errors=model_errors, + ) + if retry_error is None: + return None, self._model_error_validation(model_errors, invalid_responses=invalid_responses) + first_error = retry_error + invalid_responses.append(self._invalid_response_record(model, f"repair-{retry}", first_error)) + + validation: dict[str, Any] = { + "accepted": False, + "notes": [first_error], + "invalid_responses": invalid_responses, + } + if model_errors: + validation["model_errors"] = model_errors + return None, validation + + def _try_model_decision( + self, + profile: ActivityProfile, + model: ModelAdapter, + prompt: DecisionPrompt, + stage: str, + ) -> tuple[Action | None, str | None, list[dict[str, Any]]]: + model_errors: list[dict[str, Any]] = [] + for provider_attempt in range(0, profile.model_error_retries + 1): try: - action = model.decide(retry_prompt, self.profile.action_policy) - return action, { - "accepted": True, - "notes": [f"repaired_after_error: {first_error}"], - "model_response": self._model_response_record(model), - "invalid_responses": invalid_responses, - } - except ActionError as retry_exc: - first_error = str(retry_exc) - invalid_responses.append(self._invalid_response_record(model, f"repair-{retry}", first_error)) - - return None, {"accepted": False, "notes": [first_error], "invalid_responses": invalid_responses} + return model.decide(prompt, profile.action_policy), None, model_errors + except ModelError as exc: + model_errors.append(self._model_error_record(exc, stage, provider_attempt)) + except ActionError as exc: + return None, str(exc), model_errors + return None, None, model_errors + + def _accepted_validation( + self, + model: ModelAdapter, + notes: list[str] | None = None, + invalid_responses: list[dict[str, str]] | None = None, + model_errors: list[dict[str, Any]] | None = None, + ) -> dict[str, Any]: + validation: dict[str, Any] = { + "accepted": True, + "notes": list(notes or []), + "model_response": self._model_response_record(model), + } + if invalid_responses: + validation["invalid_responses"] = invalid_responses + if model_errors: + validation["model_errors"] = model_errors + validation["notes"].append("recovered_after_model_error") + return validation + + def _model_error_validation( + self, + model_errors: list[dict[str, Any]], + invalid_responses: list[dict[str, str]] | None = None, + ) -> dict[str, Any]: + last_error = model_errors[-1]["message"] if model_errors else "model provider failed" + validation: dict[str, Any] = { + "accepted": False, + "notes": [f"model_error: {last_error}"], + "model_errors": model_errors, + } + if invalid_responses: + validation["invalid_responses"] = invalid_responses + return validation def _execution_error_validation(self, validation: dict[str, Any], error: str) -> dict[str, Any]: notes = list(validation.get("notes", [])) @@ -602,6 +815,27 @@ def _terminal_step_record( events=list(events or []), ) + def _record_terminal_step( + self, + state: ActivityRunState, + observation: Observation, + stop_reason: str, + active_profile: ActivityProfile | None = None, + events: list[dict[str, Any]] | None = None, + ) -> StepRecord: + step = self._terminal_step_record( + step_number=len(state.all_steps) + 1, + observation=observation, + budget=state.budget, + stop_reason=stop_reason, + active_profile=active_profile, + events=events, + ) + state.all_steps.append(step) + state.recent_steps.append(step) + self._write_step(step) + return step + def _execution_record(self, result: ActionExecution) -> dict[str, Any]: return result.to_dict() @@ -626,6 +860,7 @@ def _build_retry_prompt(self, prompt: DecisionPrompt, error: str, attempt: int) def _compact( self, + profile: ActivityProfile, model: ModelAdapter, session_summary: SessionSummary, recent_steps: list[StepRecord], @@ -639,19 +874,30 @@ def _compact( "be a list of strings." ), user="\n\n".join( - self._objective_prompt_lines() + self._objective_prompt_lines(profile) + [ f"Previous summary:\n{self._summary_text(session_summary)}", - f"Steps to summarize:\n{self._recent_steps_text(recent_steps)}", + f"Steps to summarize:\n{self._recent_steps_text(profile, recent_steps)}", f"Current screen:\n{observation.model_text}", "Preserve observed facts, failed commands, exact error messages, and unresolved goals.", ] ), ) - return model.compact(prompt) + try: + return model.compact(prompt) + except ModelError as exc: + return SessionSummary( + current_state=session_summary.current_state, + last_error=f"compaction_model_error: {exc}", + open_subgoals=session_summary.open_subgoals, + discovered_facts=session_summary.discovered_facts, + failed_actions=session_summary.failed_actions, + strategy_notes=session_summary.strategy_notes, + ) def _commit_memory( self, + profile: ActivityProfile, model: ModelAdapter, campaign_memory: dict[str, Any], session_summary: SessionSummary, @@ -661,17 +907,20 @@ def _commit_memory( prompt = MemoryCommitPrompt( system="Return a JSON memory patch for durable campaign memory.", user="\n\n".join( - self._objective_prompt_lines() + self._objective_prompt_lines(profile) + [ f"Existing campaign memory:\n{json.dumps(campaign_memory, indent=2, sort_keys=True)}", f"Session summary:\n{self._summary_text(session_summary)}", - f"Recent steps:\n{self._recent_steps_text(recent_steps)}", + f"Recent steps:\n{self._recent_steps_text(profile, recent_steps)}", f"Final screen:\n{observation.model_text}", "Return JSON with durable_facts, strategy_notes, open_tasks, and errors_to_avoid when applicable.", ] ), ) - return model.commit_memory(prompt) + try: + return model.commit_memory(prompt) + except ModelError: + return MemoryPatch() def _summary_text(self, summary: SessionSummary) -> str: return "(empty)" if summary.is_empty() else json.dumps(summary.to_dict(), indent=2, sort_keys=True) @@ -686,6 +935,17 @@ def _invalid_response_record(self, model: ModelAdapter, attempt: str, error: str ) return record + def _model_error_record(self, error: ModelError, stage: str, provider_attempt: int) -> dict[str, Any]: + return { + "stage": stage, + "provider_attempt": provider_attempt, + "type": error.__class__.__name__, + "message": self._truncate(str(error), 2_000), + "command": list(error.command), + "stdout": self._truncate(error.stdout, 2_000), + "stderr": self._truncate(error.stderr, 2_000), + } + def _model_response_record(self, model: ModelAdapter) -> dict[str, str]: raw = getattr(model, "last_response", "") parsed = getattr(model, "last_parsed_response", raw) @@ -703,13 +963,18 @@ def _truncate(self, text: str, limit: int) -> str: return text return text[:limit] + f"...[truncated {len(text) - limit} chars]" - def _should_compact(self, all_steps: list[StepRecord], recent_steps: list[StepRecord]) -> bool: - every = self.profile.compact_every_steps + def _should_compact( + self, + profile: ActivityProfile, + all_steps: list[StepRecord], + recent_steps: list[StepRecord], + ) -> bool: + every = profile.compact_every_steps if every > 0 and len(all_steps) > 0 and len(all_steps) % every == 0: return True - return self._steps_char_count(recent_steps) >= self.profile.compact_recent_chars + return self._steps_char_count(profile, recent_steps) >= profile.compact_recent_chars - def _steps_char_count(self, steps: list[StepRecord]) -> int: + def _steps_char_count(self, profile: ActivityProfile, steps: list[StepRecord]) -> int: total = 0 for step in steps: obs = step.observation @@ -720,53 +985,53 @@ def _steps_char_count(self, steps: list[StepRecord]) -> int: if isinstance(value, str): total += len(value) action = json.dumps(step.action or {}, sort_keys=True) - validation = json.dumps(self._validation_for_context(step.validation), sort_keys=True) + validation = json.dumps(self._validation_for_context(profile, step.validation), sort_keys=True) total += len(action) + len(validation) return total - def _recent_steps_text(self, steps: list[StepRecord]) -> str: + def _recent_steps_text(self, profile: ActivityProfile, steps: list[StepRecord]) -> str: if not steps: return "(none)" lines = [] for index, step in enumerate(steps): after_text = "Current terminal observation below." if index + 1 < len(steps): - after_text = self._observation_effect_text(steps[index + 1].observation) - lines.append(self._step_context_text(step, after_text)) + after_text = self._observation_effect_text(profile, steps[index + 1].observation) + lines.append(self._step_context_text(profile, step, after_text)) return "\n\n".join(lines) - def _previous_step_delta_text(self, steps: list[StepRecord]) -> str: + def _previous_step_delta_text(self, profile: ActivityProfile, steps: list[StepRecord]) -> str: if not steps: return "(none)" step = steps[-1] - return self._step_context_text(step, "Current terminal observation below.") + return self._step_context_text(profile, step, "Current terminal observation below.") - def _step_context_text(self, step: StepRecord, after_text: str) -> str: + def _step_context_text(self, profile: ActivityProfile, step: StepRecord, after_text: str) -> str: action = step.action or {"action": "terminal_observation" if self._is_terminal_step(step) else "invalid"} - validation = self._validation_for_context(step.validation) + validation = self._validation_for_context(profile, step.validation) return "\n".join( [ f"Step {step.step}", - f"Observed before action:\n{self._screen_tail(step.observation.get('model_text', ''))}", + f"Observed before action:\n{self._screen_tail(profile, step.observation.get('model_text', ''))}", f"Action chosen:\n{json.dumps(action, sort_keys=True)}", f"Validation: {json.dumps(validation, sort_keys=True)}", f"Observed after action:\n{after_text}", ] ) - def _screen_tail(self, screen: Any) -> str: - if not isinstance(screen, str) or self.profile.screen_tail_chars <= 0: + def _screen_tail(self, profile: ActivityProfile, screen: Any) -> str: + if not isinstance(screen, str) or profile.screen_tail_chars <= 0: return "" - return screen[-self.profile.screen_tail_chars:] + return screen[-profile.screen_tail_chars:] - def _observation_effect_text(self, observation: dict[str, Any]) -> str: - new_text = self._screen_tail(observation.get("new_text", "")) + def _observation_effect_text(self, profile: ActivityProfile, observation: dict[str, Any]) -> str: + new_text = self._screen_tail(profile, observation.get("new_text", "")) if new_text: return new_text - return self._screen_tail(observation.get("model_text", "")) + return self._screen_tail(profile, observation.get("model_text", "")) - def _validation_for_context(self, validation: dict[str, Any]) -> dict[str, Any]: - if self.profile.include_model_responses_in_context: + def _validation_for_context(self, profile: ActivityProfile, validation: dict[str, Any]) -> dict[str, Any]: + if profile.include_model_responses_in_context: return validation context: dict[str, Any] = {} diff --git a/tests/test_cli.py b/tests/test_cli.py index 4886045..0b57942 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,7 +1,22 @@ import argparse from bbs_gym.accounts import AgentRecord, AgentRegistry -from bbs_gym.cli import build_activity_profile, build_activity_route_set, build_model, build_model_metadata +from bbs_gym.cli import ( + build_match_scheduler_config, + build_activity_profile, + build_activity_route_set, + build_match_participants, + build_model, + build_model_metadata, + match_participant_specs, +) +from bbs_gym.match import ( + MatchParticipantRuntime, + MatchParticipantSpec, + MatchSchedulerConfig, + handle_match_disconnect, + match_round_order, +) from tty_agent.models import ClaudeCliAdapter, CodexCliAdapter, OpenAICompatibleAdapter @@ -223,6 +238,7 @@ def test_build_activity_profile_uses_stateful_delta_for_stateful_codex(): prompt_mode=None, prompt_layout=None, codex_stateful=False, + disabled_actions=[], ) profile = build_activity_profile(args, registry) @@ -257,6 +273,7 @@ def test_build_activity_profile_uses_stateful_delta_for_stateful_claude(): prompt_layout=None, codex_stateful=False, claude_stateful=False, + disabled_actions=[], ) profile = build_activity_profile(args, registry) @@ -274,6 +291,7 @@ def test_build_activity_profile_applies_named_profile_overrides(): recent_steps_to_keep=5, prompt_mode="stateful_delta", prompt_layout="cache_friendly", + disabled_actions=[], ) profile = build_activity_profile(args) @@ -288,6 +306,28 @@ def test_build_activity_profile_applies_named_profile_overrides(): assert profile.prompt_layout == "cache_friendly" +def test_build_activity_profile_can_disable_actions(): + args = argparse.Namespace( + activity="bbs-door-line", + profile_objective=None, + agent_id="agent", + provider=None, + observe_timeout=None, + stable_ms=None, + byte_quiet_ms=None, + recent_steps_to_keep=None, + prompt_mode=None, + prompt_layout=None, + codex_stateful=False, + disabled_actions=["hangup"], + ) + + profile = build_activity_profile(args) + + assert "hangup" not in profile.action_policy.allowed_actions + assert "submit_line" in profile.action_policy.allowed_actions + + def test_build_activity_route_set_applies_profile_overrides(): args = argparse.Namespace( agent_id="agent-001", @@ -301,6 +341,7 @@ def test_build_activity_route_set_applies_profile_overrides(): prompt_mode="stateful_delta", prompt_layout="cache_friendly", codex_stateful=False, + disabled_actions=[], ) route_set = build_activity_route_set(args) @@ -318,3 +359,265 @@ def test_build_activity_route_set_applies_profile_overrides(): assert route_set.routes[0].profile.recent_steps_to_keep == 5 assert route_set.routes[0].profile.prompt_mode == "stateful_delta" assert route_set.routes[0].profile.prompt_layout == "cache_friendly" + + +def test_match_participant_specs_parse_inline_provider_and_model(): + args = argparse.Namespace( + match_config=None, + participant=["codex-blue:codex:gpt-5.5", "claude-red:claude:sonnet"], + agent_id=[], + ) + + specs = match_participant_specs(args) + + assert [spec.agent_id for spec in specs] == ["codex-blue", "claude-red"] + assert specs[0].provider == "codex" + assert specs[0].model == "gpt-5.5" + assert specs[1].provider == "claude" + assert specs[1].model == "sonnet" + + +def test_build_match_participants_formats_objectives_and_logs(tmp_path): + args = argparse.Namespace( + match_config=None, + participant=["codex-blue:scripted:unused", "claude-red:scripted:unused"], + agent_id=[], + provider=None, + scripted_response=['{"action": "wait", "arguments": {}}'], + model=None, + base_url=None, + api_key=None, + temperature=None, + max_tokens=None, + response_filter=None, + no_anthropic_cache=False, + activity="bbs-door-line", + profile_objective=None, + run_objective="{agent_id} should find {opponents}", + observe_timeout=None, + stable_ms=None, + byte_quiet_ms=None, + recent_steps_to_keep=None, + prompt_mode=None, + prompt_layout=None, + codex_stateful=False, + claude_stateful=False, + disabled_actions=[], + log_path=str(tmp_path / "match.jsonl"), + ) + + participants = build_match_participants(args, match_participant_specs(args), registry=None) + + assert [participant.spec.agent_id for participant in participants] == ["codex-blue", "claude-red"] + assert participants[0].runner.run_objective == "codex-blue should find claude-red" + assert participants[1].runner.run_objective == "claude-red should find codex-blue" + assert participants[0].runner.profile.name == "bbs-door-line" + assert participants[0].log_path == tmp_path / "match.codex-blue.jsonl" + assert participants[1].log_path == tmp_path / "match.claude-red.jsonl" + + +def test_match_config_toml_supplies_scheduler_budget_and_participants(tmp_path): + config_path = tmp_path / "melee.toml" + config_path.write_text( + """ +activity = "bbs-door-line" +transport = "telnet" +telnet_enter = "lf" +run_objective = "Play as {agent_id}; opponents: {opponents}" +disabled_actions = ["hangup"] +log_path = "runtime/logs/melee.jsonl" + +[scheduler] +mode = "sequential" +order = "shuffle" +seed = 17 +disconnect_policy = "reconnect" +max_reconnects = 2 +reconnect_delay = 0.0 +max_workers = 4 + +[budget] +max_rounds = 250 +max_decision_ticks = 125 +max_wall_seconds = 3600 + +[[participants]] +agent_id = "codex-blue" +provider = "codex" +model = "gpt-5.5" +stateful = true +codex_session_file = "runtime/codex-blue.session" + +[[participants]] +agent_id = "gemma-green" +provider = "openai-compatible" +model = "gemma4" +base_url = "http://localhost:8000/v1" +temperature = 0.6 +""".strip() + + "\n", + encoding="utf-8", + ) + args = argparse.Namespace( + match_config=str(config_path), + participant=[], + agent_id=[], + provider=None, + scripted_response=[], + model=None, + base_url=None, + api_key=None, + temperature=None, + max_tokens=None, + response_filter=None, + no_anthropic_cache=False, + codex_profile=None, + codex_executable=None, + codex_timeout=None, + codex_sandbox=None, + codex_cwd=None, + codex_arg=[], + codex_stateful=False, + codex_session_id=None, + codex_session_file=None, + claude_stateful=False, + activity="tw2-game", + profile_objective=None, + run_objective=None, + observe_timeout=None, + stable_ms=None, + byte_quiet_ms=None, + recent_steps_to_keep=None, + model_error_retries=None, + prompt_mode=None, + prompt_layout=None, + disabled_actions=[], + log_path=str(tmp_path / "default.jsonl"), + max_rounds=50, + max_decision_ticks=50, + max_wall_seconds=600.0, + scheduler_mode="sequential", + match_order="fixed", + match_seed=None, + disconnect_policy="stop", + max_reconnects=3, + reconnect_delay=2.0, + max_workers=None, + host="127.0.0.1", + port=2323, + rlogin_port=2513, + rlogin_terminal="ansi", + transport="telnet", + telnet_enter="cr", + agents_config="config/agents.local.json", + no_agents_config=False, + ) + + specs = match_participant_specs(args) + participants = build_match_participants(args, specs, registry=None) + + assert args.match_order == "shuffle" + assert args.scheduler_mode == "sequential" + assert args.match_seed == 17 + assert args.disconnect_policy == "reconnect" + assert args.max_reconnects == 2 + assert args.reconnect_delay == 0.0 + assert args.max_workers == 4 + assert args.max_rounds == 250 + assert args.max_decision_ticks == 125 + assert args.max_wall_seconds == 3600 + assert args.disabled_actions == ["hangup"] + assert [spec.agent_id for spec in specs] == ["codex-blue", "gemma-green"] + assert "hangup" not in participants[0].runner.profile.action_policy.allowed_actions + assert isinstance(participants[0].model, CodexCliAdapter) + assert participants[0].model.stateful is True + assert str(participants[0].model.session_file) == "runtime/codex-blue.session" + assert isinstance(participants[1].model, OpenAICompatibleAdapter) + assert participants[1].model.base_url == "http://localhost:8000/v1" + assert participants[1].model.temperature == 0.6 + scheduler = build_match_scheduler_config(args) + assert scheduler == MatchSchedulerConfig( + mode="sequential", + order="shuffle", + seed=17, + disconnect_policy="reconnect", + max_reconnects=2, + reconnect_delay=0.0, + max_rounds=250, + max_decision_ticks=125, + max_wall_seconds=3600, + max_workers=4, + ) + + +def test_match_round_order_fixed_shuffle_and_rotate_are_deterministic(): + states = [ + (argparse.Namespace(spec=argparse.Namespace(agent_id="a")), argparse.Namespace(completed=False)), + (argparse.Namespace(spec=argparse.Namespace(agent_id="b")), argparse.Namespace(completed=False)), + (argparse.Namespace(spec=argparse.Namespace(agent_id="c")), argparse.Namespace(completed=False)), + ] + + assert [participant.spec.agent_id for participant, _ in match_round_order(states, "fixed", random_rng(3), 1)] == [ + "a", + "b", + "c", + ] + assert [participant.spec.agent_id for participant, _ in match_round_order(states, "rotate", random_rng(3), 2)] == [ + "b", + "c", + "a", + ] + assert [participant.spec.agent_id for participant, _ in match_round_order(states, "shuffle", random_rng(3), 1)] == [ + "b", + "c", + "a", + ] + + +def random_rng(seed: int): + import random + + return random.Random(seed) + + +def test_handle_match_disconnect_reconnects_and_logs(tmp_path): + class FakeAgent: + def __init__(self, agent_id: str) -> None: + self.agent_id = agent_id + self.closed = False + + def close(self) -> None: + self.closed = True + + class FakeGym: + def __init__(self) -> None: + self.connected = [] + + def connect(self, agent_id, model_metadata=None): + self.connected.append((agent_id, model_metadata)) + return FakeAgent(agent_id) + + old_agent = FakeAgent("arena-codex") + state = argparse.Namespace(agent=old_agent, completed=True, stop_reason="disconnected") + participant = MatchParticipantRuntime( + spec=MatchParticipantSpec("arena-codex", "codex", "gpt-5.5"), + model=object(), + model_metadata={"provider": "codex"}, + runner=object(), + log_path=tmp_path / "agent.jsonl", + ) + scheduler = MatchSchedulerConfig(disconnect_policy="reconnect", max_reconnects=2, reconnect_delay=0.0) + match_log = tmp_path / "match.jsonl" + gym = FakeGym() + + handle_match_disconnect(gym, participant, state, scheduler, match_log, round_number=7) + + assert old_agent.closed is True + assert state.completed is False + assert state.stop_reason == "" + assert state.agent.agent_id == "arena-codex" + assert participant.reconnects == 1 + assert gym.connected == [("arena-codex", {"provider": "codex"})] + events = [line for line in match_log.read_text(encoding="utf-8").splitlines() if line] + assert '"type": "participant_disconnected"' in events[0] + assert '"type": "participant_reconnected"' in events[1] diff --git a/tests/test_match.py b/tests/test_match.py new file mode 100644 index 0000000..3f02aa5 --- /dev/null +++ b/tests/test_match.py @@ -0,0 +1,346 @@ +import json +import time +from pathlib import Path + +from bbs_gym.match import ( + MatchParticipantRuntime, + MatchParticipantSpec, + MatchSchedulerConfig, + run_scheduled_match, +) +from tty_agent.actions import Action +from tty_agent.agent import ActionExecution +from tty_agent.models import ScriptedModelAdapter +from tty_agent.runner import ActivityProfile, ActivityRunner +from tty_agent.terminal import Observation +from tty_agent.transports.base import SessionDisconnected + + +class FakeAgent: + def __init__(self, agent_id: str) -> None: + self.agent_id = agent_id + self.actions = [] + self.closed = False + + def observe_turn(self, **_kwargs): + return Observation( + agent_id=self.agent_id, + pretty_screen="Command:", + model_text="Command:", + new_text="Command:", + cursor=(0, 8), + stable_ms=300, + byte_quiet_ms=0, + matched_prompt="menu-choice", + ready_reason="stable", + profile="test", + transcript_path=Path(f"runtime/transcripts/{self.agent_id}.raw"), + transcript_byte_start=0, + transcript_byte_end=8, + bytes_read=8, + timed_out=False, + timestamp=0.0, + metadata={}, + ) + + def act_action(self, action: Action) -> ActionExecution: + self.actions.append(action) + return ActionExecution() + + def close(self) -> None: + self.closed = True + + +class FakeGym: + def __init__(self) -> None: + self.agents = {} + + def connect(self, agent_id: str, model_metadata=None): + del model_metadata + agent = FakeAgent(agent_id) + self.agents[agent_id] = agent + return agent + + +class DisconnectingAgent(FakeAgent): + def observe_turn(self, **_kwargs): + raise SessionDisconnected + + +class DisconnectGym: + def __init__(self, disconnecting: set[str], fail_reconnects: set[str] | None = None) -> None: + self.disconnecting = set(disconnecting) + self.fail_reconnects = set(fail_reconnects or set()) + self.connect_attempts: dict[str, int] = {} + + def connect(self, agent_id: str, model_metadata=None): + del model_metadata + attempts = self.connect_attempts.get(agent_id, 0) + 1 + self.connect_attempts[agent_id] = attempts + if attempts > 1 and agent_id in self.fail_reconnects: + raise OSError("reconnect failed") + if attempts == 1 and agent_id in self.disconnecting: + return DisconnectingAgent(agent_id) + return FakeAgent(agent_id) + + +def test_run_scheduled_match_sequential_writes_order_and_steps(tmp_path): + participants = [ + participant("alpha", tmp_path), + participant("bravo", tmp_path), + ] + match_log = tmp_path / "match.jsonl" + + result = run_scheduled_match( + FakeGym(), + participants, + MatchSchedulerConfig( + mode="sequential", + order="fixed", + max_rounds=1, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + assert result.commit_count == 2 + assert [activity.agent_id for _, activity in result.results] == ["alpha", "bravo"] + assert [activity.stop_reason for _, activity in result.results] == ["match_rounds", "match_rounds"] + assert [len(activity.steps) for _, activity in result.results] == [1, 1] + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + assert [event["type"] for event in events] == [ + "match_started", + "round_started", + "commit_order", + "agent_step_started", + "agent_step_completed", + "agent_step", + "agent_step_started", + "agent_step_completed", + "agent_step", + "round_completed", + "match_completed", + ] + assert events[0]["scheduler"]["mode"] == "sequential" + assert events[2]["order"] == ["alpha", "bravo"] + assert [event["phase"] for event in events if event["type"] == "agent_step_started"] == ["started", "started"] + assert events[-1]["commit_count"] == 2 + assert events[-1]["scheduler_count"] == 1 + assert events[-1]["scheduler_count_unit"] == "rounds" + assert events[-1]["results"][0]["stop_reason"] == "match_rounds" + + +def test_run_scheduled_match_parallel_barrier_commits_configured_order(tmp_path): + participants = [ + participant("alpha", tmp_path, delay=0.03), + participant("bravo", tmp_path, delay=0.0), + ] + match_log = tmp_path / "barrier.jsonl" + + run_scheduled_match( + FakeGym(), + participants, + MatchSchedulerConfig( + mode="parallel_barrier", + order="fixed", + max_rounds=1, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + assert _event(events, "commit_order")["order"] == ["alpha", "bravo"] + assert _agent_step_order(events) == ["alpha", "bravo"] + assert {event["phase"] for event in events if event["type"] == "agent_step_started"} == {"queued"} + assert {event["agent_id"] for event in events if event["type"] == "agent_decision_completed"} == { + "alpha", + "bravo", + } + + +def test_run_scheduled_match_parallel_race_commits_completion_order(tmp_path): + participants = [ + participant("alpha", tmp_path, delay=0.05), + participant("bravo", tmp_path, delay=0.0), + ] + match_log = tmp_path / "race.jsonl" + + run_scheduled_match( + FakeGym(), + participants, + MatchSchedulerConfig( + mode="parallel_race", + order="fixed", + max_rounds=1, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + assert _event(events, "commit_order")["order"] == ["bravo", "alpha"] + assert _agent_step_order(events) == ["bravo", "alpha"] + + +def test_run_scheduled_match_continuous_requeues_fast_agents(tmp_path): + participants = [ + participant("alpha", tmp_path, delay=0.1), + participant("bravo", tmp_path, delay=0.0), + ] + match_log = tmp_path / "continuous.jsonl" + + result = run_scheduled_match( + FakeGym(), + participants, + MatchSchedulerConfig( + mode="continuous", + order="fixed", + max_rounds=3, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + assert result.commit_count == 3 + assert _agent_step_order(events).count("bravo") >= 2 + assert {event["phase"] for event in events if event["type"] == "agent_step_started"} == {"queued"} + assert not any(event["type"] in {"round_started", "round_completed"} for event in events) + continuous_commits = [event for event in events if event["type"] == "commit_order"] + assert [event["tick"] for event in continuous_commits] == [1, 2, 3] + assert all("round" not in event for event in continuous_commits) + assert all( + event.get("commit_policy") == "continuous_completion" + for event in events + if event["type"] == "commit_order" + ) + completed = events[-1] + assert completed["commit_count"] == 3 + assert completed["scheduler_count"] == 3 + assert completed["scheduler_count_unit"] == "ticks" + + +def test_run_scheduled_match_continuous_handles_midflight_disconnect(tmp_path): + participants = [ + participant("alpha", tmp_path), + participant("bravo", tmp_path), + ] + match_log = tmp_path / "continuous-disconnect.jsonl" + + result = run_scheduled_match( + DisconnectGym(disconnecting={"alpha"}), + participants, + MatchSchedulerConfig( + mode="continuous", + order="fixed", + disconnect_policy="stop", + max_rounds=3, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + stops = {activity.agent_id: activity.stop_reason for _, activity in result.results} + assert stops["alpha"] == "disconnected" + assert stops["bravo"] == "match_ticks" + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + disconnected = _event(events, "participant_disconnected") + assert disconnected["agent_id"] == "alpha" + assert "tick" in disconnected + assert "round" not in disconnected + assert _agent_step_order(events).count("alpha") == 1 + + +def test_run_scheduled_match_disconnect_stop_policy_marks_agent_disconnected(tmp_path): + participants = [ + participant("alpha", tmp_path), + participant("bravo", tmp_path), + ] + match_log = tmp_path / "disconnect-stop.jsonl" + + result = run_scheduled_match( + DisconnectGym(disconnecting={"alpha"}), + participants, + MatchSchedulerConfig( + mode="sequential", + disconnect_policy="stop", + max_rounds=1, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + stops = {activity.agent_id: activity.stop_reason for _, activity in result.results} + assert stops["alpha"] == "disconnected" + assert stops["bravo"] == "match_rounds" + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + assert _event(events, "participant_disconnected")["agent_id"] == "alpha" + assert not any(event["type"] == "participant_reconnected" for event in events) + + +def test_run_scheduled_match_reconnect_policy_gives_up_after_failures(tmp_path): + participants = [ + participant("alpha", tmp_path), + participant("bravo", tmp_path), + ] + match_log = tmp_path / "disconnect-reconnect.jsonl" + gym = DisconnectGym(disconnecting={"alpha"}, fail_reconnects={"alpha"}) + + result = run_scheduled_match( + gym, + participants, + MatchSchedulerConfig( + mode="sequential", + disconnect_policy="reconnect", + max_reconnects=2, + reconnect_delay=0.0, + max_rounds=1, + max_decision_ticks=5, + max_wall_seconds=60, + ), + match_log, + ) + + stops = {activity.agent_id: activity.stop_reason for _, activity in result.results} + assert stops["alpha"] == "disconnect_reconnect_failed" + assert gym.connect_attempts["alpha"] == 3 + events = [json.loads(line) for line in match_log.read_text(encoding="utf-8").splitlines()] + failed = [event for event in events if event["type"] == "participant_reconnect_failed"] + assert [event["attempt"] for event in failed] == [1, 2] + assert events[-1]["type"] == "match_completed" + + +def _event(events: list[dict], event_type: str) -> dict: + return next(event for event in events if event["type"] == event_type) + + +def _agent_step_order(events: list[dict]) -> list[str]: + return [event["agent_id"] for event in events if event["type"] == "agent_step"] + + +class DelayedScriptedModelAdapter(ScriptedModelAdapter): + def __init__(self, responses: list[str], delay: float) -> None: + super().__init__(responses) + self.delay = delay + + def decide(self, prompt, policy=None): + if self.delay: + time.sleep(self.delay) + return super().decide(prompt, policy) + + +def participant(agent_id: str, tmp_path: Path, delay: float = 0.0) -> MatchParticipantRuntime: + return MatchParticipantRuntime( + spec=MatchParticipantSpec(agent_id, "scripted", "unused"), + model=DelayedScriptedModelAdapter(['{"action": "wait", "arguments": {}}'], delay), + model_metadata={"provider": "scripted"}, + runner=ActivityRunner(ActivityProfile(name="test", objective="test"), log_path=tmp_path / f"{agent_id}.jsonl"), + log_path=tmp_path / f"{agent_id}.jsonl", + ) diff --git a/tests/test_models.py b/tests/test_models.py index dbb281e..5c4ea23 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,3 +1,5 @@ +import subprocess + from tty_agent.models import ( AnthropicAdapter, ClaudeCliAdapter, @@ -6,6 +8,7 @@ DecisionPrompt, MemoryCommitPrompt, ModelMessage, + ModelTimeoutError, OpenAICompatibleAdapter, SessionSummary, TextChatAdapter, @@ -381,13 +384,33 @@ def fake_run(command, input, text, capture_output, timeout, cwd, check): assert "--bare" not in captured["command"] assert captured["command"][captured["command"].index("--model") + 1] == "claude-sonnet-4-6" assert captured["command"][captured["command"].index("--permission-mode") + 1] == "dontAsk" - assert captured["command"][captured["command"].index("--tools") + 1] == "" + assert "--tools" not in captured["command"] assert captured["command"][-1] == "--debug" assert captured["timeout"] == 42.0 assert "SYSTEM MESSAGE:\nsystem schema" in captured["input"] assert "USER MESSAGE:\ncurrent screen" in captured["input"] +def test_claude_cli_adapter_includes_non_empty_tools(monkeypatch): + captured = {} + + class Result: + returncode = 0 + stderr = "" + stdout = '{"result":"{\\"action\\": \\"wait\\", \\"arguments\\": {}}"}' + + def fake_run(command, *_args, **_kwargs): + captured["command"] = command + return Result() + + monkeypatch.setattr("tty_agent.models.subprocess.run", fake_run) + adapter = ClaudeCliAdapter(tools="Bash,Read") + + adapter.decide(DecisionPrompt("system schema", "current screen")) + + assert captured["command"][captured["command"].index("--tools") + 1] == "Bash,Read" + + def test_claude_cli_adapter_resumes_stateful_session(monkeypatch, tmp_path): commands = [] session_id = "11111111-2222-3333-4444-555555555555" @@ -439,3 +462,28 @@ def fake_run(*_args, **_kwargs): assert "stderr detail" in str(exc) else: raise AssertionError("expected RuntimeError") + + +def test_claude_cli_adapter_timeout_includes_stdout_and_stderr(monkeypatch): + def fake_run(command, input, text, capture_output, timeout, cwd, check): + del input, text, capture_output, cwd, check + raise subprocess.TimeoutExpired( + command, + timeout, + output=b"partial stdout", + stderr=b"partial stderr", + ) + + monkeypatch.setattr("tty_agent.models.subprocess.run", fake_run) + adapter = ClaudeCliAdapter(timeout=12.0) + + try: + adapter.chat([ModelMessage("user", "screen")]) + except ModelTimeoutError as exc: + assert "claude -p timed out after 12s" in str(exc) + assert "partial stdout" in str(exc) + assert "partial stderr" in str(exc) + assert exc.stdout == "partial stdout" + assert exc.stderr == "partial stderr" + else: + raise AssertionError("expected ModelTimeoutError") diff --git a/tests/test_runner.py b/tests/test_runner.py index ce35510..8bc09f1 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -6,7 +6,7 @@ from tty_agent.actions import Action, ActionError, ActionPolicy from tty_agent.agent import ActionExecution from tty_agent.memory import JsonMemoryStore -from tty_agent.models import ScriptedModelAdapter +from tty_agent.models import ModelTimeoutError, ScriptedModelAdapter from tty_agent.prompt_modules import GENERIC_TERMINAL_MODULES, StaticPromptModule from tty_agent.runner import ActivityBudget, ActivityProfile, ActivityRoute, ActivityRunner, RoutedActivityRunner from tty_agent.terminal import Observation @@ -170,6 +170,64 @@ def test_activity_runner_sends_actions_and_logs_memory(tmp_path): assert (tmp_path / "steps.jsonl").read_text(encoding="utf-8").count("\n") == 2 +def test_activity_runner_can_step_state_incrementally(tmp_path): + agent = FakeAgent() + model = ScriptedModelAdapter( + [ + '{"action": "submit_line", "arguments": {"text": "look"}}', + '{"action": "hangup", "arguments": {}}', + '{"durable_facts": ["Stepped manually."]}', + ] + ) + memory = JsonMemoryStore(tmp_path / "memory") + runner = ActivityRunner( + ActivityProfile(name="bbs-menu", objective="test stepping"), + memory_store=memory, + log_path=tmp_path / "steps.jsonl", + ) + state = runner.start_state(agent, model, ActivityBudget(max_decision_ticks=5)) + + first = runner.run_step(state) + second = runner.run_step(state) + result = runner.finish_state(state) + + assert first is not None + assert second is not None + assert first.step == 1 + assert second.step == 2 + assert state.completed is True + assert result.stop_reason == "hangup" + assert [action.action for action in agent.actions] == ["submit_line", "hangup"] + assert memory.load("agent-001") == {"durable_facts": ["Stepped manually."]} + + +def test_activity_runner_can_prepare_then_commit_step(tmp_path): + agent = FakeAgent() + model = ScriptedModelAdapter(['{"action": "submit_line", "arguments": {"text": "look"}}']) + runner = ActivityRunner( + ActivityProfile(name="bbs-menu", objective="test split stepping"), + memory_store=JsonMemoryStore(tmp_path / "memory"), + log_path=tmp_path / "steps.jsonl", + ) + state = runner.start_state(agent, model, ActivityBudget(max_decision_ticks=5)) + + prepared = runner.prepare_step(state) + + assert prepared is not None + assert prepared.action is not None + assert prepared.action.action == "submit_line" + assert agent.actions == [] + assert state.budget.decision_ticks == 0 + + step = runner.commit_prepared_step(state, prepared) + + assert step is not None + assert step.step == 1 + assert [action.action for action in agent.actions] == ["submit_line"] + assert state.budget.decision_ticks == 1 + assert (tmp_path / "steps.jsonl").read_text(encoding="utf-8").count("\n") == 1 + + def test_activity_runner_counts_invalid_model_actions(tmp_path): agent = FakeAgent() model = ScriptedModelAdapter(["not json", "still not json"]) @@ -521,6 +579,77 @@ def test_activity_runner_logs_action_execution_errors_without_crashing(tmp_path) assert "action_error" in result.steps[0].validation["notes"][0] +def test_activity_runner_retries_model_timeout_without_crashing(tmp_path): + class TimeoutThenWaitModel(ScriptedModelAdapter): + def __init__(self): + super().__init__( + [ + '{"action": "wait", "arguments": {}}', + '{"action": "hangup", "arguments": {}}', + '{"durable_facts": ["Recovered from provider timeout."]}', + ] + ) + self.calls = 0 + + def decide(self, prompt, policy=None): + self.calls += 1 + if self.calls == 1: + raise ModelTimeoutError( + "claude -p timed out after 600s: partial stderr", + command=["claude", "-p"], + stdout="partial stdout", + stderr="partial stderr", + ) + return super().decide(prompt, policy) + + agent = FakeAgent() + model = TimeoutThenWaitModel() + runner = ActivityRunner( + ActivityProfile(name="bbs-menu", objective="test provider retry", model_error_retries=1), + memory_store=JsonMemoryStore(tmp_path / "memory"), + ) + + result = runner.run(agent, model, ActivityBudget(max_decision_ticks=5)) + + assert result.stop_reason == "hangup" + assert result.steps[0].validation["accepted"] is True + assert "recovered_after_model_error" in result.steps[0].validation["notes"] + model_error = result.steps[0].validation["model_errors"][0] + assert model_error["type"] == "ModelTimeoutError" + assert model_error["stdout"] == "partial stdout" + assert model_error["stderr"] == "partial stderr" + assert model_error["command"] == ["claude", "-p"] + + +def test_activity_runner_records_model_timeout_failure_without_crashing(tmp_path): + class AlwaysTimeoutModel(ScriptedModelAdapter): + def __init__(self): + super().__init__([]) + + def decide(self, _prompt, _policy=None): + raise ModelTimeoutError( + "claude -p timed out after 600s: partial stderr", + command=["claude", "-p"], + stdout="partial stdout", + stderr="partial stderr", + ) + + agent = FakeAgent() + runner = ActivityRunner( + ActivityProfile(name="bbs-menu", objective="test provider failure", model_error_retries=1), + memory_store=JsonMemoryStore(tmp_path / "memory"), + ) + + result = runner.run(agent, AlwaysTimeoutModel(), ActivityBudget(max_decision_ticks=5, max_validation_failures=1)) + + assert result.stop_reason == "validation_failures" + assert result.steps[0].action is None + assert result.steps[0].validation["accepted"] is False + assert result.steps[0].validation["model_errors"][0]["stdout"] == "partial stdout" + assert result.steps[0].validation["model_errors"][1]["stderr"] == "partial stderr" + assert "model_error:" in result.steps[0].validation["notes"][0] + + def test_activity_runner_logs_raw_and_filtered_model_responses(tmp_path): agent = FakeAgent() model = ScriptedModelAdapter( diff --git a/tests/test_tele_arena_example.py b/tests/test_tele_arena_example.py index ceb26a7..29b1498 100644 --- a/tests/test_tele_arena_example.py +++ b/tests/test_tele_arena_example.py @@ -31,6 +31,7 @@ def test_tele_arena_example_defaults_to_bbs_door_line_with_lf_enter(): assert _option(argv, "--provider") == "codex" assert _option(argv, "--model") == "gpt-5.5" assert _option(argv, "--run-objective") == DEFAULT_RUN_OBJECTIVE + assert _option(argv, "--disable-action") == "hangup" def test_tele_arena_example_can_select_safe_activity():