diff --git a/swe_af/app.py b/swe_af/app.py index e35b6f1..37aea81 100644 --- a/swe_af/app.py +++ b/swe_af/app.py @@ -45,6 +45,38 @@ app.include_router(router) +# --------------------------------------------------------------------------- +# Auto-inject scoped credentials into every router.harness call. +# +# The environment scout negotiates credentials with the user (via Hax) and +# stashes them in process-local memory keyed by the build's run_id. Without +# this wrapper, downstream reasoners would each have to explicitly call +# ``env=harness_env_for(router)`` at every harness call site (25+ places). +# Patching the Agent's bound ``harness`` method once means every reasoner +# — existing and future — automatically gets the negotiated credentials +# merged into the subprocess env, with zero per-call-site changes. +# +# Precedence: scoped credentials win over the inherited process env so a +# fresh scout token overrides any stale value carried by os.environ. +# Callers MAY still pass an explicit ``env=`` dict; we treat it as the +# base, then merge scoped creds on top. +# --------------------------------------------------------------------------- +_original_harness = app.harness + + +async def _harness_with_scoped_credentials(*args, env=None, **kwargs): + from swe_af.hitl import inject_credentials_into_env # noqa: PLC0415 + + ctx = getattr(app, "ctx", None) + run_id = (getattr(ctx, "run_id", None) if ctx else None) or "" + base_env = dict(os.environ) if env is None else dict(env) + merged_env = inject_credentials_into_env(base_env, run_id) + return await _original_harness(*args, env=merged_env, **kwargs) + + +app.harness = _harness_with_scoped_credentials + + async def _clone_repos( cfg: BuildConfig, artifacts_dir: str, @@ -473,829 +505,842 @@ async def build( app.note(f"Build starting (build_id={build_id})", tags=["build", "start"]) - # Clone if repo_url is set and target doesn't exist yet - git_dir = os.path.join(repo_path, ".git") - if cfg.repo_url and not os.path.exists(git_dir): - app.note(f"Cloning {cfg.repo_url} → {repo_path}", tags=["build", "clone"]) - os.makedirs(repo_path, exist_ok=True) - clone_result = subprocess.run( - ["git", "clone", cfg.repo_url, repo_path], - capture_output=True, - text=True, - ) - if clone_result.returncode != 0: - err = clone_result.stderr.strip() - app.note(f"Clone failed (exit {clone_result.returncode}): {err}", tags=["build", "clone", "error"]) - raise RuntimeError(f"git clone failed (exit {clone_result.returncode}): {err}") - elif cfg.repo_url and os.path.exists(git_dir): - # Repo already exists at this build-scoped path (unlikely but handle gracefully). - # Reset to remote default branch for a clean baseline. - default_branch = cfg.github_pr_base or "main" - app.note( - f"Repo already exists at {repo_path} — resetting to origin/{default_branch}", - tags=["build", "clone", "reset"], - ) - - # Remove stale worktrees on disk before touching branches - worktrees_dir = os.path.join(repo_path, ".worktrees") - if os.path.isdir(worktrees_dir): - import shutil - shutil.rmtree(worktrees_dir, ignore_errors=True) - subprocess.run( - ["git", "worktree", "prune"], - cwd=repo_path, capture_output=True, text=True, - ) + # Scope key for the in-memory credentials store negotiated by the + # environment scout. Shared by every reasoner in this build (run_id + # propagates through every app.call). Cleared in the `finally` below + # so even an exception leaves no secrets in process memory. + _scope_id = (getattr(app.ctx, "run_id", None) if app.ctx else None) or "" - # Fetch latest remote state - fetch = subprocess.run( - ["git", "fetch", "origin"], - cwd=repo_path, capture_output=True, text=True, - ) - if fetch.returncode != 0: - app.note(f"git fetch failed: {fetch.stderr.strip()}", tags=["build", "clone", "error"]) + try: - # Force-checkout default branch (handles dirty working tree from crashed builds) - subprocess.run( - ["git", "checkout", "-f", default_branch], - cwd=repo_path, capture_output=True, text=True, - ) - reset = subprocess.run( - ["git", "reset", "--hard", f"origin/{default_branch}"], - cwd=repo_path, capture_output=True, text=True, - ) - if reset.returncode != 0: - # Hard reset failed — nuke and re-clone as last resort - app.note( - f"Reset to origin/{default_branch} failed — re-cloning", - tags=["build", "clone", "reclone"], - ) - import shutil - shutil.rmtree(repo_path, ignore_errors=True) + # Clone if repo_url is set and target doesn't exist yet + git_dir = os.path.join(repo_path, ".git") + if cfg.repo_url and not os.path.exists(git_dir): + app.note(f"Cloning {cfg.repo_url} → {repo_path}", tags=["build", "clone"]) os.makedirs(repo_path, exist_ok=True) clone_result = subprocess.run( ["git", "clone", cfg.repo_url, repo_path], - capture_output=True, text=True, + capture_output=True, + text=True, ) if clone_result.returncode != 0: err = clone_result.stderr.strip() - raise RuntimeError(f"git re-clone failed: {err}") - else: - # Ensure repo_path exists even when no repo_url is provided (fresh init case) - # This is needed because planning agents may need to read the repo in parallel with git_init - os.makedirs(repo_path, exist_ok=True) + app.note(f"Clone failed (exit {clone_result.returncode}): {err}", tags=["build", "clone", "error"]) + raise RuntimeError(f"git clone failed (exit {clone_result.returncode}): {err}") + elif cfg.repo_url and os.path.exists(git_dir): + # Repo already exists at this build-scoped path (unlikely but handle gracefully). + # Reset to remote default branch for a clean baseline. + default_branch = cfg.github_pr_base or "main" + app.note( + f"Repo already exists at {repo_path} — resetting to origin/{default_branch}", + tags=["build", "clone", "reset"], + ) - if execute_fn_target: - cfg.execute_fn_target = execute_fn_target - if permission_mode: - cfg.permission_mode = permission_mode - if enable_learning: - cfg.enable_learning = True - if max_turns > 0: - cfg.agent_max_turns = max_turns - - # Resolve runtime + flat model config once for this build. - resolved = cfg.resolved_models() - - # Compute absolute artifacts directory path for logging - abs_artifacts_dir = os.path.join(os.path.abspath(repo_path), artifacts_dir) - - # Multi-repo path: clone all repos concurrently - manifest: WorkspaceManifest | None = None - if len(cfg.repos) > 1: - app.note( - f"Cloning {len(cfg.repos)} repos concurrently", - tags=["build", "clone", "multi-repo"], - ) - manifest = await _clone_repos(cfg, abs_artifacts_dir) - # Use primary repo as the canonical repo_path - repo_path = manifest.primary_repo.absolute_path - app.note( - f"Multi-repo workspace ready: {manifest.workspace_root}", - tags=["build", "clone", "multi-repo", "complete"], - ) + # Remove stale worktrees on disk before touching branches + worktrees_dir = os.path.join(repo_path, ".worktrees") + if os.path.isdir(worktrees_dir): + import shutil + shutil.rmtree(worktrees_dir, ignore_errors=True) + subprocess.run( + ["git", "worktree", "prune"], + cwd=repo_path, capture_output=True, text=True, + ) - # 1. PLAN + GIT INIT (concurrent — no data dependency between them) - app.note("Phase 1: Planning + Git init (parallel)", tags=["build", "parallel"]) + # Fetch latest remote state + fetch = subprocess.run( + ["git", "fetch", "origin"], + cwd=repo_path, capture_output=True, text=True, + ) + if fetch.returncode != 0: + app.note(f"git fetch failed: {fetch.stderr.strip()}", tags=["build", "clone", "error"]) - plan_coro = app.call( - f"{NODE_ID}.plan", - goal=goal, - repo_path=repo_path, - artifacts_dir=artifacts_dir, - additional_context=additional_context, - max_review_iterations=cfg.max_review_iterations, - pm_model=resolved["pm_model"], - architect_model=resolved["architect_model"], - tech_lead_model=resolved["tech_lead_model"], - sprint_planner_model=resolved["sprint_planner_model"], - issue_writer_model=resolved["issue_writer_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - workspace_manifest=manifest.model_dump() if manifest else None, - ) + # Force-checkout default branch (handles dirty working tree from crashed builds) + subprocess.run( + ["git", "checkout", "-f", default_branch], + cwd=repo_path, capture_output=True, text=True, + ) + reset = subprocess.run( + ["git", "reset", "--hard", f"origin/{default_branch}"], + cwd=repo_path, capture_output=True, text=True, + ) + if reset.returncode != 0: + # Hard reset failed — nuke and re-clone as last resort + app.note( + f"Reset to origin/{default_branch} failed — re-cloning", + tags=["build", "clone", "reclone"], + ) + import shutil + shutil.rmtree(repo_path, ignore_errors=True) + os.makedirs(repo_path, exist_ok=True) + clone_result = subprocess.run( + ["git", "clone", cfg.repo_url, repo_path], + capture_output=True, text=True, + ) + if clone_result.returncode != 0: + err = clone_result.stderr.strip() + raise RuntimeError(f"git re-clone failed: {err}") + else: + # Ensure repo_path exists even when no repo_url is provided (fresh init case) + # This is needed because planning agents may need to read the repo in parallel with git_init + os.makedirs(repo_path, exist_ok=True) - # Git init with retry logic - MAX_GIT_INIT_RETRIES = cfg.git_init_max_retries - git_init = None - previous_error = None - raw_plan = None + if execute_fn_target: + cfg.execute_fn_target = execute_fn_target + if permission_mode: + cfg.permission_mode = permission_mode + if enable_learning: + cfg.enable_learning = True + if max_turns > 0: + cfg.agent_max_turns = max_turns - for attempt in range(1, MAX_GIT_INIT_RETRIES + 1): - app.note( - f"Git init attempt {attempt}/{MAX_GIT_INIT_RETRIES}" - + (f" (previous error: {previous_error})" if previous_error else ""), - tags=["build", "git_init", "retry"], - ) + # Resolve runtime + flat model config once for this build. + resolved = cfg.resolved_models() - git_init_coro = app.call( - f"{NODE_ID}.run_git_init", - repo_path=repo_path, + # Compute absolute artifacts directory path for logging + abs_artifacts_dir = os.path.join(os.path.abspath(repo_path), artifacts_dir) + + # Multi-repo path: clone all repos concurrently + manifest: WorkspaceManifest | None = None + if len(cfg.repos) > 1: + app.note( + f"Cloning {len(cfg.repos)} repos concurrently", + tags=["build", "clone", "multi-repo"], + ) + manifest = await _clone_repos(cfg, abs_artifacts_dir) + # Use primary repo as the canonical repo_path + repo_path = manifest.primary_repo.absolute_path + app.note( + f"Multi-repo workspace ready: {manifest.workspace_root}", + tags=["build", "clone", "multi-repo", "complete"], + ) + + # 1. PLAN + GIT INIT (concurrent — no data dependency between them) + app.note("Phase 1: Planning + Git init (parallel)", tags=["build", "parallel"]) + + plan_coro = app.call( + f"{NODE_ID}.plan", goal=goal, - artifacts_dir=abs_artifacts_dir, - model=resolved["git_model"], + repo_path=repo_path, + artifacts_dir=artifacts_dir, + additional_context=additional_context, + max_review_iterations=cfg.max_review_iterations, + pm_model=resolved["pm_model"], + architect_model=resolved["architect_model"], + tech_lead_model=resolved["tech_lead_model"], + sprint_planner_model=resolved["sprint_planner_model"], + issue_writer_model=resolved["issue_writer_model"], permission_mode=cfg.permission_mode, ai_provider=cfg.ai_provider, - previous_error=previous_error, - build_id=build_id, + workspace_manifest=manifest.model_dump() if manifest else None, ) - # Run planning only on first attempt, then just git_init on retries - if attempt == 1: - raw_plan, raw_git = await asyncio.gather(plan_coro, git_init_coro) - else: - raw_git = await git_init_coro - - # git_init failures are non-fatal — unwrap but don't raise - try: - git_init = _unwrap(raw_git, "run_git_init") - except RuntimeError: - git_init = raw_git if isinstance(raw_git, dict) else {"success": False, "error_message": str(raw_git)} + # Git init with retry logic + MAX_GIT_INIT_RETRIES = cfg.git_init_max_retries + git_init = None + previous_error = None + raw_plan = None - if git_init.get("success"): + for attempt in range(1, MAX_GIT_INIT_RETRIES + 1): app.note( - f"Git init succeeded on attempt {attempt}", - tags=["build", "git_init", "success"], + f"Git init attempt {attempt}/{MAX_GIT_INIT_RETRIES}" + + (f" (previous error: {previous_error})" if previous_error else ""), + tags=["build", "git_init", "retry"], ) - break - else: - previous_error = git_init.get("error_message", "unknown error") - app.note( - f"Git init attempt {attempt} failed: {previous_error}", - tags=["build", "git_init", "failed"], + + git_init_coro = app.call( + f"{NODE_ID}.run_git_init", + repo_path=repo_path, + goal=goal, + artifacts_dir=abs_artifacts_dir, + model=resolved["git_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + previous_error=previous_error, + build_id=build_id, ) - if attempt == MAX_GIT_INIT_RETRIES: + # Run planning only on first attempt, then just git_init on retries + if attempt == 1: + raw_plan, raw_git = await asyncio.gather(plan_coro, git_init_coro) + else: + raw_git = await git_init_coro + + # git_init failures are non-fatal — unwrap but don't raise + try: + git_init = _unwrap(raw_git, "run_git_init") + except RuntimeError: + git_init = raw_git if isinstance(raw_git, dict) else {"success": False, "error_message": str(raw_git)} + + if git_init.get("success"): + app.note( + f"Git init succeeded on attempt {attempt}", + tags=["build", "git_init", "success"], + ) + break + else: + previous_error = git_init.get("error_message", "unknown error") app.note( - f"Git init failed after {MAX_GIT_INIT_RETRIES} attempts — " - "proceeding without git workflow", - tags=["build", "git_init", "exhausted"], + f"Git init attempt {attempt} failed: {previous_error}", + tags=["build", "git_init", "failed"], ) - # Brief delay before retry (except on last attempt) - if attempt < MAX_GIT_INIT_RETRIES: - await asyncio.sleep(cfg.git_init_retry_delay) - - # Unwrap plan result (should have been set on first attempt) - plan_result = _unwrap(raw_plan, "plan") - - git_config = None - if git_init.get("success"): - git_config = { - "integration_branch": git_init["integration_branch"], - "original_branch": git_init["original_branch"], - "initial_commit_sha": git_init["initial_commit_sha"], - "mode": git_init["mode"], - "remote_url": git_init.get("remote_url", ""), - "remote_default_branch": git_init.get("remote_default_branch", ""), - } - app.note( - f"Git init: mode={git_init['mode']}, branch={git_init['integration_branch']}", - tags=["build", "git_init", "complete"], - ) - else: - app.note( - f"Git init failed: {git_init.get('error_message', 'unknown')} — " - "proceeding without git workflow", - tags=["build", "git_init", "error"], - ) + if attempt == MAX_GIT_INIT_RETRIES: + app.note( + f"Git init failed after {MAX_GIT_INIT_RETRIES} attempts — " + "proceeding without git workflow", + tags=["build", "git_init", "exhausted"], + ) - # 1.5 APPROVAL CHECKPOINT — pause for human plan review when HAX_API_KEY is set. - # SWE-AF posts the plan to hax-sdk and pauses on the control plane until - # the reviewer responds. On request_changes, re-runs Architect → Tech Lead - # → Sprint Planner with the feedback and re-requests approval, bounded by - # cfg.max_plan_revision_iterations. - _hax_api_key = os.environ.get("HAX_API_KEY", "").strip() - execution_id = app.ctx.execution_id if app.ctx else "" - if _hax_api_key and execution_id: - import json as _json - from hax import HaxClient - - hax_client = HaxClient( - api_key=_hax_api_key, - base_url=os.environ.get("HAX_SDK_URL", "http://localhost:3000") + "/api/v1", - ) - cp_base_url = (app.agentfield_server or "http://localhost:8080").rstrip("/") - approval_state_path = os.path.join(abs_artifacts_dir, "approval_state.json") - os.makedirs(os.path.dirname(approval_state_path), exist_ok=True) - revision_history: list[dict] = [] + # Brief delay before retry (except on last attempt) + if attempt < MAX_GIT_INIT_RETRIES: + await asyncio.sleep(cfg.git_init_retry_delay) + + # Unwrap plan result (should have been set on first attempt) + plan_result = _unwrap(raw_plan, "plan") - for revision_iter in range(cfg.max_plan_revision_iterations + 1): + git_config = None + if git_init.get("success"): + git_config = { + "integration_branch": git_init["integration_branch"], + "original_branch": git_init["original_branch"], + "initial_commit_sha": git_init["initial_commit_sha"], + "mode": git_init["mode"], + "remote_url": git_init.get("remote_url", ""), + "remote_default_branch": git_init.get("remote_default_branch", ""), + } app.note( - f"Phase 1.5: Requesting plan approval (iteration {revision_iter})", - tags=["build", "approval"], + f"Git init: mode={git_init['mode']}, branch={git_init['integration_branch']}", + tags=["build", "git_init", "complete"], + ) + else: + app.note( + f"Git init failed: {git_init.get('error_message', 'unknown')} — " + "proceeding without git workflow", + tags=["build", "git_init", "error"], ) - plan_summary, prd_md, arch_md, issues_for_template = ( - _format_plan_for_approval(plan_result) + # 1.5 APPROVAL CHECKPOINT — pause for human plan review when HAX_API_KEY is set. + # SWE-AF posts the plan to hax-sdk and pauses on the control plane until + # the reviewer responds. On request_changes, re-runs Architect → Tech Lead + # → Sprint Planner with the feedback and re-requests approval, bounded by + # cfg.max_plan_revision_iterations. + _hax_api_key = os.environ.get("HAX_API_KEY", "").strip() + execution_id = app.ctx.execution_id if app.ctx else "" + if _hax_api_key and execution_id: + import json as _json + from hax import HaxClient + + hax_client = HaxClient( + api_key=_hax_api_key, + base_url=os.environ.get("HAX_SDK_URL", "http://localhost:3000") + "/api/v1", ) + cp_base_url = (app.agentfield_server or "http://localhost:8080").rstrip("/") + approval_state_path = os.path.join(abs_artifacts_dir, "approval_state.json") + os.makedirs(os.path.dirname(approval_state_path), exist_ok=True) + revision_history: list[dict] = [] - title = "SWE-AF Plan Review" - if revision_iter > 0: - title = f"SWE-AF Plan Review (Revision {revision_iter})" - - hax_payload = { - "planSummary": plan_summary, - "issues": issues_for_template, - "architecture": arch_md, - "prd": prd_md, - "metadata": { - "repoUrl": cfg.repo_url, - "goalDescription": goal, - "agentNodeId": NODE_ID, - "executionId": execution_id, - }, - "revisionNumber": revision_iter, - "revisionHistory": revision_history, - } + for revision_iter in range(cfg.max_plan_revision_iterations + 1): + app.note( + f"Phase 1.5: Requesting plan approval (iteration {revision_iter})", + tags=["build", "approval"], + ) - hax_create_kwargs: dict = { - "type": "plan-review-v2", - "title": title, - "description": "Review the proposed implementation plan before execution begins", - "payload": hax_payload, - "webhook_url": f"{cp_base_url}/api/v1/webhooks/approval-response", - "expires_in_seconds": cfg.approval_expires_in_hours * 3600, - } - approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "") - if approval_user_id: - hax_create_kwargs["user_id"] = approval_user_id - - hax_request = await _create_hax_request_with_timeout( - hax_client=hax_client, - hax_create_kwargs=hax_create_kwargs, - revision_iter=revision_iter, - ) + plan_summary, prd_md, arch_md, issues_for_template = ( + _format_plan_for_approval(plan_result) + ) - with open(approval_state_path, "w") as _fp: - _json.dump({ - "decision": "pending", - "feedback": "", - "request_id": hax_request.id, - "request_url": hax_request.url, - "revision_number": revision_iter, - }, _fp, indent=2) - - approval_result = await app.pause( - approval_request_id=hax_request.id, - approval_request_url=hax_request.url, - expires_in_hours=cfg.approval_expires_in_hours, - ) + title = "SWE-AF Plan Review" + if revision_iter > 0: + title = f"SWE-AF Plan Review (Revision {revision_iter})" + + hax_payload = { + "planSummary": plan_summary, + "issues": issues_for_template, + "architecture": arch_md, + "prd": prd_md, + "metadata": { + "repoUrl": cfg.repo_url, + "goalDescription": goal, + "agentNodeId": NODE_ID, + "executionId": execution_id, + }, + "revisionNumber": revision_iter, + "revisionHistory": revision_history, + } - with open(approval_state_path, "w") as _fp: - _json.dump({ - "decision": approval_result.decision, - "feedback": approval_result.feedback, - "request_id": approval_result.approval_request_id, - "request_url": hax_request.url, - "revision_number": revision_iter, - "revision_history": revision_history, - }, _fp, indent=2) - - if approval_result.approved: - app.note( - "Plan approved — proceeding to execution", - tags=["build", "approval", "approved"], + hax_create_kwargs: dict = { + "type": "plan-review-v2", + "title": title, + "description": "Review the proposed implementation plan before execution begins", + "payload": hax_payload, + "webhook_url": f"{cp_base_url}/api/v1/webhooks/approval-response", + "expires_in_seconds": cfg.approval_expires_in_hours * 3600, + } + approval_user_id = os.environ.get("AGENTFIELD_APPROVAL_USER_ID", "") + if approval_user_id: + hax_create_kwargs["user_id"] = approval_user_id + + hax_request = await _create_hax_request_with_timeout( + hax_client=hax_client, + hax_create_kwargs=hax_create_kwargs, + revision_iter=revision_iter, ) - break - if approval_result.changes_requested: - if revision_iter >= cfg.max_plan_revision_iterations: + with open(approval_state_path, "w") as _fp: + _json.dump({ + "decision": "pending", + "feedback": "", + "request_id": hax_request.id, + "request_url": hax_request.url, + "revision_number": revision_iter, + }, _fp, indent=2) + + approval_result = await app.pause( + approval_request_id=hax_request.id, + approval_request_url=hax_request.url, + expires_in_hours=cfg.approval_expires_in_hours, + ) + + with open(approval_state_path, "w") as _fp: + _json.dump({ + "decision": approval_result.decision, + "feedback": approval_result.feedback, + "request_id": approval_result.approval_request_id, + "request_url": hax_request.url, + "revision_number": revision_iter, + "revision_history": revision_history, + }, _fp, indent=2) + + if approval_result.approved: app.note( - f"Max plan revision iterations ({cfg.max_plan_revision_iterations}) reached", - tags=["build", "approval", "exhausted"], + "Plan approved — proceeding to execution", + tags=["build", "approval", "approved"], ) - return BuildResult( - plan_result=plan_result, - dag_state={}, - success=False, - summary=f"Plan revision limit reached after {revision_iter + 1} iterations", - ).model_dump() - - revision_history.append({ - "iteration": revision_iter, - "feedback": approval_result.feedback, - }) + break - app.note( - f"Changes requested (iteration {revision_iter}): " - f"{approval_result.feedback[:200]}", - tags=["build", "approval", "request_changes"], - ) + if approval_result.changes_requested: + if revision_iter >= cfg.max_plan_revision_iterations: + app.note( + f"Max plan revision iterations ({cfg.max_plan_revision_iterations}) reached", + tags=["build", "approval", "exhausted"], + ) + return BuildResult( + plan_result=plan_result, + dag_state={}, + success=False, + summary=f"Plan revision limit reached after {revision_iter + 1} iterations", + ).model_dump() + + revision_history.append({ + "iteration": revision_iter, + "feedback": approval_result.feedback, + }) - # Re-plan with the reviewer feedback. Skip PM (PRD/scope is fixed) - # and re-run Architect → Tech Lead loop → Sprint Planner. - arch = _unwrap(await app.call( - f"{NODE_ID}.run_architect", - prd=plan_result.get("prd", {}), - repo_path=repo_path, - artifacts_dir=artifacts_dir, - feedback=approval_result.feedback, - model=resolved["architect_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - workspace_manifest=manifest.model_dump() if manifest else None, - ), "run_architect (human revision)") + app.note( + f"Changes requested (iteration {revision_iter}): " + f"{approval_result.feedback[:200]}", + tags=["build", "approval", "request_changes"], + ) - review = None - for tl_iter in range(cfg.max_review_iterations + 1): - review = _unwrap(await app.call( - f"{NODE_ID}.run_tech_lead", + # Re-plan with the reviewer feedback. Skip PM (PRD/scope is fixed) + # and re-run Architect → Tech Lead loop → Sprint Planner. + arch = _unwrap(await app.call( + f"{NODE_ID}.run_architect", prd=plan_result.get("prd", {}), repo_path=repo_path, artifacts_dir=artifacts_dir, - revision_number=tl_iter, - model=resolved["tech_lead_model"], + feedback=approval_result.feedback, + model=resolved["architect_model"], permission_mode=cfg.permission_mode, ai_provider=cfg.ai_provider, workspace_manifest=manifest.model_dump() if manifest else None, - ), "run_tech_lead") - if review["approved"]: - break - if tl_iter < cfg.max_review_iterations: - arch = _unwrap(await app.call( - f"{NODE_ID}.run_architect", + ), "run_architect (human revision)") + + review = None + for tl_iter in range(cfg.max_review_iterations + 1): + review = _unwrap(await app.call( + f"{NODE_ID}.run_tech_lead", prd=plan_result.get("prd", {}), repo_path=repo_path, artifacts_dir=artifacts_dir, - feedback=review["feedback"], - model=resolved["architect_model"], + revision_number=tl_iter, + model=resolved["tech_lead_model"], permission_mode=cfg.permission_mode, ai_provider=cfg.ai_provider, workspace_manifest=manifest.model_dump() if manifest else None, - ), "run_architect (tech lead revision)") - - if review and not review["approved"]: - review = ReviewResult( - approved=True, - feedback=review["feedback"], - scope_issues=review.get("scope_issues", []), - complexity_assessment=review.get("complexity_assessment", "appropriate"), - summary=review["summary"] + " [auto-approved after max iterations]", - ).model_dump() - - sprint_result = _unwrap(await app.call( - f"{NODE_ID}.run_sprint_planner", - prd=plan_result.get("prd", {}), - architecture=arch, - repo_path=repo_path, - artifacts_dir=artifacts_dir, - model=resolved["sprint_planner_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - workspace_manifest=manifest.model_dump() if manifest else None, - ), "run_sprint_planner (revision)") - - plan_result = { - **plan_result, - "architecture": arch, - "review": review, - "issues": sprint_result["issues"], - "rationale": sprint_result["rationale"], - } - continue + ), "run_tech_lead") + if review["approved"]: + break + if tl_iter < cfg.max_review_iterations: + arch = _unwrap(await app.call( + f"{NODE_ID}.run_architect", + prd=plan_result.get("prd", {}), + repo_path=repo_path, + artifacts_dir=artifacts_dir, + feedback=review["feedback"], + model=resolved["architect_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_architect (tech lead revision)") + + if review and not review["approved"]: + review = ReviewResult( + approved=True, + feedback=review["feedback"], + scope_issues=review.get("scope_issues", []), + complexity_assessment=review.get("complexity_assessment", "appropriate"), + summary=review["summary"] + " [auto-approved after max iterations]", + ).model_dump() - # Terminal: rejected, expired, or error - reason = approval_result.feedback or approval_result.decision - app.note( - f"Plan {approval_result.decision} by human reviewer: {reason}", - tags=["build", "approval", approval_result.decision], - ) - return BuildResult( - plan_result=plan_result, - dag_state={}, - success=False, - summary=f"Plan {approval_result.decision}: {reason}", - ).model_dump() + sprint_result = _unwrap(await app.call( + f"{NODE_ID}.run_sprint_planner", + prd=plan_result.get("prd", {}), + architecture=arch, + repo_path=repo_path, + artifacts_dir=artifacts_dir, + model=resolved["sprint_planner_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_sprint_planner (revision)") + + plan_result = { + **plan_result, + "architecture": arch, + "review": review, + "issues": sprint_result["issues"], + "rationale": sprint_result["rationale"], + } + continue + + # Terminal: rejected, expired, or error + reason = approval_result.feedback or approval_result.decision + app.note( + f"Plan {approval_result.decision} by human reviewer: {reason}", + tags=["build", "approval", approval_result.decision], + ) + return BuildResult( + plan_result=plan_result, + dag_state={}, + success=False, + summary=f"Plan {approval_result.decision}: {reason}", + ).model_dump() - # 2. EXECUTE - exec_config = cfg.to_execution_config_dict() + # 2. EXECUTE + exec_config = cfg.to_execution_config_dict() - dag_result = _unwrap(await app.call( - f"{NODE_ID}.execute", - plan_result=plan_result, - repo_path=repo_path, - execute_fn_target=cfg.execute_fn_target, - config=exec_config, - git_config=git_config, - build_id=build_id, - workspace_manifest=manifest.model_dump() if manifest else None, - ), "execute") - - # Refresh manifest with git_init_result populated by _init_all_repos() in - # the DAG executor. Must happen before the verify/fix loop which can - # overwrite dag_result with fix-execution results (no workspace_manifest). - if manifest and dag_result.get("workspace_manifest"): - manifest = WorkspaceManifest(**dag_result["workspace_manifest"]) - - # 3. VERIFY - verification = None - for cycle in range(cfg.max_verify_fix_cycles + 1): - app.note(f"Verification cycle {cycle}", tags=["build", "verify"]) - verification = _unwrap(await app.call( - f"{NODE_ID}.run_verifier", - prd=plan_result["prd"], + dag_result = _unwrap(await app.call( + f"{NODE_ID}.execute", + plan_result=plan_result, repo_path=repo_path, - artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), - completed_issues=[r for r in dag_result.get("completed_issues", [])], - failed_issues=[r for r in dag_result.get("failed_issues", [])], - skipped_issues=dag_result.get("skipped_issues", []), - model=resolved["verifier_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, + execute_fn_target=cfg.execute_fn_target, + config=exec_config, + git_config=git_config, + build_id=build_id, workspace_manifest=manifest.model_dump() if manifest else None, - ), "run_verifier") + ), "execute") + + # Refresh manifest with git_init_result populated by _init_all_repos() in + # the DAG executor. Must happen before the verify/fix loop which can + # overwrite dag_result with fix-execution results (no workspace_manifest). + if manifest and dag_result.get("workspace_manifest"): + manifest = WorkspaceManifest(**dag_result["workspace_manifest"]) + + # 3. VERIFY + verification = None + for cycle in range(cfg.max_verify_fix_cycles + 1): + app.note(f"Verification cycle {cycle}", tags=["build", "verify"]) + verification = _unwrap(await app.call( + f"{NODE_ID}.run_verifier", + prd=plan_result["prd"], + repo_path=repo_path, + artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), + completed_issues=[r for r in dag_result.get("completed_issues", [])], + failed_issues=[r for r in dag_result.get("failed_issues", [])], + skipped_issues=dag_result.get("skipped_issues", []), + model=resolved["verifier_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "run_verifier") - if verification.get("passed", False) or cycle >= cfg.max_verify_fix_cycles: - break + if verification.get("passed", False) or cycle >= cfg.max_verify_fix_cycles: + break - # Verification failed — generate targeted fix issues - failed_criteria = [ - c for c in verification.get("criteria_results", []) - if not c.get("passed", True) - ] + # Verification failed — generate targeted fix issues + failed_criteria = [ + c for c in verification.get("criteria_results", []) + if not c.get("passed", True) + ] - if not failed_criteria: - app.note("Verification failed but no specific criteria failures found", tags=["build", "verify"]) - break + if not failed_criteria: + app.note("Verification failed but no specific criteria failures found", tags=["build", "verify"]) + break - app.note( - f"Verification failed ({len(failed_criteria)} criteria), " - f"{cfg.max_verify_fix_cycles - cycle} fix cycles remaining", - tags=["build", "verify", "retry"], - ) + app.note( + f"Verification failed ({len(failed_criteria)} criteria), " + f"{cfg.max_verify_fix_cycles - cycle} fix cycles remaining", + tags=["build", "verify", "retry"], + ) - # Generate fix issues from failed criteria - fix_result = _unwrap(await app.call( - f"{NODE_ID}.generate_fix_issues", - failed_criteria=failed_criteria, - dag_state=dag_result, - prd=plan_result["prd"], - artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), - model=resolved["verifier_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - workspace_manifest=manifest.model_dump() if manifest else None, - ), "generate_fix_issues") - - fix_issues = fix_result.get("fix_issues", []) - fix_debt = fix_result.get("debt_items", []) - - # Record unfixable criteria as debt - for debt in fix_debt: - dag_result.setdefault("accumulated_debt", []).append({ - "type": "unmet_acceptance_criterion", - "criterion": debt.get("criterion", ""), - "reason": debt.get("reason", ""), - "severity": debt.get("severity", "high"), - }) - - if fix_issues: - # Build a mini plan from fix issues and execute them - fix_plan = { - "prd": plan_result["prd"], - "architecture": plan_result.get("architecture", {}), - "review": plan_result.get("review", {}), - "issues": fix_issues, - "levels": [[fi.get("name", f"fix-{i}") for i, fi in enumerate(fix_issues)]], - "file_conflicts": [], - "artifacts_dir": plan_result.get("artifacts_dir", artifacts_dir), - "rationale": f"Fix issues for verification cycle {cycle + 1}", - } - dag_result = _unwrap(await app.call( - f"{NODE_ID}.execute", - plan_result=fix_plan, - repo_path=repo_path, - config=exec_config, - git_config=git_config, + # Generate fix issues from failed criteria + fix_result = _unwrap(await app.call( + f"{NODE_ID}.generate_fix_issues", + failed_criteria=failed_criteria, + dag_state=dag_result, + prd=plan_result["prd"], + artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), + model=resolved["verifier_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, workspace_manifest=manifest.model_dump() if manifest else None, - ), "execute_fixes") - continue # Re-verify - else: - app.note("No fixable issues generated — accepting with debt", tags=["build", "verify"]) - break - - success = verification.get("passed", False) if verification else False - completed = len(dag_result.get("completed_issues", [])) - total = len(dag_result.get("all_issues", [])) + ), "generate_fix_issues") + + fix_issues = fix_result.get("fix_issues", []) + fix_debt = fix_result.get("debt_items", []) + + # Record unfixable criteria as debt + for debt in fix_debt: + dag_result.setdefault("accumulated_debt", []).append({ + "type": "unmet_acceptance_criterion", + "criterion": debt.get("criterion", ""), + "reason": debt.get("reason", ""), + "severity": debt.get("severity", "high"), + }) - app.note( - f"Build {'succeeded' if success else 'completed with issues'}: " - f"{completed}/{total} issues, verification={'passed' if success else 'failed'}", - tags=["build", "complete"], - ) + if fix_issues: + # Build a mini plan from fix issues and execute them + fix_plan = { + "prd": plan_result["prd"], + "architecture": plan_result.get("architecture", {}), + "review": plan_result.get("review", {}), + "issues": fix_issues, + "levels": [[fi.get("name", f"fix-{i}") for i, fi in enumerate(fix_issues)]], + "file_conflicts": [], + "artifacts_dir": plan_result.get("artifacts_dir", artifacts_dir), + "rationale": f"Fix issues for verification cycle {cycle + 1}", + } + dag_result = _unwrap(await app.call( + f"{NODE_ID}.execute", + plan_result=fix_plan, + repo_path=repo_path, + config=exec_config, + git_config=git_config, + workspace_manifest=manifest.model_dump() if manifest else None, + ), "execute_fixes") + continue # Re-verify + else: + app.note("No fixable issues generated — accepting with debt", tags=["build", "verify"]) + break - # Capture plan docs before finalize cleans up .artifacts/ - _plan_dir = os.path.join( - plan_result.get("artifacts_dir", ""), "plan" - ) - prd_markdown = "" - architecture_markdown = "" - for _name, _var in [("prd.md", "prd_markdown"), ("architecture.md", "architecture_markdown")]: - _fpath = os.path.join(_plan_dir, _name) - if os.path.isfile(_fpath): - try: - with open(_fpath, "r", encoding="utf-8") as _f: - if _var == "prd_markdown": - prd_markdown = _f.read() - else: - architecture_markdown = _f.read() - except OSError: - pass + success = verification.get("passed", False) if verification else False + completed = len(dag_result.get("completed_issues", [])) + total = len(dag_result.get("all_issues", [])) - # 3b. FINALIZE — clean up repo artifacts before PR - if manifest and len(manifest.repos) > 1: - # Multi-repo: finalize each repo individually app.note( - f"Phase 3b: Multi-repo finalization ({len(manifest.repos)} repos)", - tags=["build", "finalize", "multi-repo"], + f"Build {'succeeded' if success else 'completed with issues'}: " + f"{completed}/{total} issues, verification={'passed' if success else 'failed'}", + tags=["build", "complete"], + ) + + # Capture plan docs before finalize cleans up .artifacts/ + _plan_dir = os.path.join( + plan_result.get("artifacts_dir", ""), "plan" ) - for ws_repo in manifest.repos: + prd_markdown = "" + architecture_markdown = "" + for _name, _var in [("prd.md", "prd_markdown"), ("architecture.md", "architecture_markdown")]: + _fpath = os.path.join(_plan_dir, _name) + if os.path.isfile(_fpath): + try: + with open(_fpath, "r", encoding="utf-8") as _f: + if _var == "prd_markdown": + prd_markdown = _f.read() + else: + architecture_markdown = _f.read() + except OSError: + pass + + # 3b. FINALIZE — clean up repo artifacts before PR + if manifest and len(manifest.repos) > 1: + # Multi-repo: finalize each repo individually + app.note( + f"Phase 3b: Multi-repo finalization ({len(manifest.repos)} repos)", + tags=["build", "finalize", "multi-repo"], + ) + for ws_repo in manifest.repos: + try: + finalize_result = _unwrap(await app.call( + f"{NODE_ID}.run_repo_finalize", + repo_path=ws_repo.absolute_path, + artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), + model=resolved["git_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + ), f"run_repo_finalize ({ws_repo.repo_name})") + if finalize_result.get("success"): + app.note( + f"Repo finalized ({ws_repo.repo_name}): {finalize_result.get('summary', '')}", + tags=["build", "finalize", "complete"], + ) + else: + app.note( + f"Repo finalize incomplete ({ws_repo.repo_name}): {finalize_result.get('summary', '')}", + tags=["build", "finalize", "warning"], + ) + except Exception as e: + app.note( + f"Repo finalize failed for {ws_repo.repo_name} (non-blocking): {e}", + tags=["build", "finalize", "error"], + ) + else: + # Single-repo: existing finalize logic + app.note("Phase 3b: Repo finalization", tags=["build", "finalize"]) try: finalize_result = _unwrap(await app.call( f"{NODE_ID}.run_repo_finalize", - repo_path=ws_repo.absolute_path, + repo_path=repo_path, artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), model=resolved["git_model"], permission_mode=cfg.permission_mode, ai_provider=cfg.ai_provider, - ), f"run_repo_finalize ({ws_repo.repo_name})") + ), "run_repo_finalize") if finalize_result.get("success"): app.note( - f"Repo finalized ({ws_repo.repo_name}): {finalize_result.get('summary', '')}", + f"Repo finalized: {finalize_result.get('summary', '')}", tags=["build", "finalize", "complete"], ) else: app.note( - f"Repo finalize incomplete ({ws_repo.repo_name}): {finalize_result.get('summary', '')}", + f"Repo finalize incomplete: {finalize_result.get('summary', '')}", tags=["build", "finalize", "warning"], ) except Exception as e: app.note( - f"Repo finalize failed for {ws_repo.repo_name} (non-blocking): {e}", + f"Repo finalize failed (non-blocking): {e}", tags=["build", "finalize", "error"], ) - else: - # Single-repo: existing finalize logic - app.note("Phase 3b: Repo finalization", tags=["build", "finalize"]) - try: - finalize_result = _unwrap(await app.call( - f"{NODE_ID}.run_repo_finalize", - repo_path=repo_path, - artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), - model=resolved["git_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - ), "run_repo_finalize") - if finalize_result.get("success"): - app.note( - f"Repo finalized: {finalize_result.get('summary', '')}", - tags=["build", "finalize", "complete"], - ) - else: - app.note( - f"Repo finalize incomplete: {finalize_result.get('summary', '')}", - tags=["build", "finalize", "warning"], - ) - except Exception as e: - app.note( - f"Repo finalize failed (non-blocking): {e}", - tags=["build", "finalize", "error"], - ) - # 4. PUSH & DRAFT PR (if repo has a remote and PR creation is enabled) - pr_results: list[RepoPRResult] = [] - ci_gate_results: list[dict] = [] - build_summary = ( - f"{'Success' if success else 'Partial'}: {completed}/{total} issues completed" - + (f", verification: {verification.get('summary', '')}" if verification else "") - ) + # 4. PUSH & DRAFT PR (if repo has a remote and PR creation is enabled) + pr_results: list[RepoPRResult] = [] + ci_gate_results: list[dict] = [] + build_summary = ( + f"{'Success' if success else 'Partial'}: {completed}/{total} issues completed" + + (f", verification: {verification.get('summary', '')}" if verification else "") + ) - if manifest and len(manifest.repos) > 1: - # Multi-repo: one PR per repo where create_pr=True - app.note("Phase 4: Multi-repo Push + PRs", tags=["build", "github_pr", "multi-repo"]) - for ws_repo in manifest.repos: - if not ws_repo.create_pr or not cfg.enable_github_pr: - continue - repo_git_init = ws_repo.git_init_result or {} - repo_remote_url = repo_git_init.get("remote_url", "") or ws_repo.repo_url - if not repo_remote_url: - continue - repo_integration_branch = repo_git_init.get("integration_branch", "") - if not repo_integration_branch: - continue - repo_base_branch = ( - cfg.github_pr_base - or repo_git_init.get("remote_default_branch", "") - or "main" - ) - try: - pr_r = _unwrap(await app.call( - f"{NODE_ID}.run_github_pr", - repo_path=ws_repo.absolute_path, - integration_branch=repo_integration_branch, - base_branch=repo_base_branch, - goal=goal, - build_summary=build_summary, - completed_issues=[ - r for r in dag_result.get("completed_issues", []) - if not r.get("repo_name") or r.get("repo_name") == ws_repo.repo_name - ], - accumulated_debt=dag_result.get("accumulated_debt", []), - artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), - model=resolved["git_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - ), "run_github_pr") - pr_results.append(RepoPRResult( - repo_name=ws_repo.repo_name, - repo_url=ws_repo.repo_url, - success=pr_r.get("success", False), - pr_url=pr_r.get("pr_url", ""), - pr_number=pr_r.get("pr_number", 0), - error_message=pr_r.get("error_message", ""), - )) - if pr_r.get("pr_url"): - app.note( - f"PR created for {ws_repo.repo_name}: {pr_r.get('pr_url')}", - tags=["build", "github_pr", "complete"], - ) - if cfg.check_ci and pr_r.get("pr_number"): - gate = await _run_ci_gate( - repo_path=ws_repo.absolute_path, - pr_number=pr_r.get("pr_number", 0), - pr_url=pr_r.get("pr_url", ""), - integration_branch=repo_integration_branch, - base_branch=repo_base_branch, - cfg=cfg, - resolved_models=resolved, - goal=goal, - completed_issues=[ - r for r in dag_result.get("completed_issues", []) - if not r.get("repo_name") or r.get("repo_name") == ws_repo.repo_name - ], - ) - ci_gate_results.append({ - "repo_name": ws_repo.repo_name, - **gate, - }) - except Exception as e: - pr_results.append(RepoPRResult( - repo_name=ws_repo.repo_name, - repo_url=ws_repo.repo_url, - success=False, - error_message=str(e), - )) - app.note( - f"PR creation failed for {ws_repo.repo_name}: {e}", - tags=["build", "github_pr", "error"], + if manifest and len(manifest.repos) > 1: + # Multi-repo: one PR per repo where create_pr=True + app.note("Phase 4: Multi-repo Push + PRs", tags=["build", "github_pr", "multi-repo"]) + for ws_repo in manifest.repos: + if not ws_repo.create_pr or not cfg.enable_github_pr: + continue + repo_git_init = ws_repo.git_init_result or {} + repo_remote_url = repo_git_init.get("remote_url", "") or ws_repo.repo_url + if not repo_remote_url: + continue + repo_integration_branch = repo_git_init.get("integration_branch", "") + if not repo_integration_branch: + continue + repo_base_branch = ( + cfg.github_pr_base + or repo_git_init.get("remote_default_branch", "") + or "main" ) - else: - # Single-repo: existing PR logic, wrap result in RepoPRResult - remote_url = git_config.get("remote_url", "") if git_config else "" - if remote_url and cfg.enable_github_pr: - app.note("Phase 4: Push + PR", tags=["build", "github_pr"]) - base_branch = ( - cfg.github_pr_base - or (git_config.get("remote_default_branch") if git_config else "") - or "main" - ) - pr_url = "" - try: - pr_result = _unwrap(await app.call( - f"{NODE_ID}.run_github_pr", - repo_path=repo_path, - integration_branch=git_config["integration_branch"], - base_branch=base_branch, - goal=goal, - build_summary=build_summary, - completed_issues=dag_result.get("completed_issues", []), - accumulated_debt=dag_result.get("accumulated_debt", []), - artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), - model=resolved["git_model"], - permission_mode=cfg.permission_mode, - ai_provider=cfg.ai_provider, - ), "run_github_pr") - pr_url = pr_result.get("pr_url", "") - if pr_url: - app.note(f"PR created: {pr_url}", tags=["build", "github_pr", "complete"]) - - # Programmatically append plan docs to PR body - if prd_markdown or architecture_markdown: - try: - current_body = subprocess.run( - ["gh", "pr", "view", str(pr_result.get("pr_number", 0)), - "--json", "body", "--jq", ".body"], - cwd=repo_path, capture_output=True, text=True, check=True, - ).stdout.strip() - - plan_sections = "\n\n---\n" - if prd_markdown: - plan_sections += ( - "\n
📋 PRD (Product Requirements Document)" - "\n\n" - + prd_markdown - + "\n\n
\n" - ) - if architecture_markdown: - plan_sections += ( - "\n
🏗️ Architecture\n\n" - + architecture_markdown - + "\n\n
\n" - ) - - new_body = current_body + plan_sections - - subprocess.run( - ["gh", "pr", "edit", str(pr_result.get("pr_number", 0)), - "--body", new_body], - cwd=repo_path, capture_output=True, text=True, check=True, - ) - app.note( - "Plan docs appended to PR body", - tags=["build", "github_pr", "plan_docs"], - ) - except subprocess.CalledProcessError as e: - app.note( - f"Failed to append plan docs to PR (non-fatal): {e}", - tags=["build", "github_pr", "plan_docs", "warning"], + try: + pr_r = _unwrap(await app.call( + f"{NODE_ID}.run_github_pr", + repo_path=ws_repo.absolute_path, + integration_branch=repo_integration_branch, + base_branch=repo_base_branch, + goal=goal, + build_summary=build_summary, + completed_issues=[ + r for r in dag_result.get("completed_issues", []) + if not r.get("repo_name") or r.get("repo_name") == ws_repo.repo_name + ], + accumulated_debt=dag_result.get("accumulated_debt", []), + artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), + model=resolved["git_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + ), "run_github_pr") + pr_results.append(RepoPRResult( + repo_name=ws_repo.repo_name, + repo_url=ws_repo.repo_url, + success=pr_r.get("success", False), + pr_url=pr_r.get("pr_url", ""), + pr_number=pr_r.get("pr_number", 0), + error_message=pr_r.get("error_message", ""), + )) + if pr_r.get("pr_url"): + app.note( + f"PR created for {ws_repo.repo_name}: {pr_r.get('pr_url')}", + tags=["build", "github_pr", "complete"], + ) + if cfg.check_ci and pr_r.get("pr_number"): + gate = await _run_ci_gate( + repo_path=ws_repo.absolute_path, + pr_number=pr_r.get("pr_number", 0), + pr_url=pr_r.get("pr_url", ""), + integration_branch=repo_integration_branch, + base_branch=repo_base_branch, + cfg=cfg, + resolved_models=resolved, + goal=goal, + completed_issues=[ + r for r in dag_result.get("completed_issues", []) + if not r.get("repo_name") or r.get("repo_name") == ws_repo.repo_name + ], ) - else: + ci_gate_results.append({ + "repo_name": ws_repo.repo_name, + **gate, + }) + except Exception as e: + pr_results.append(RepoPRResult( + repo_name=ws_repo.repo_name, + repo_url=ws_repo.repo_url, + success=False, + error_message=str(e), + )) app.note( - f"PR creation failed: {pr_result.get('error_message', 'unknown')}", + f"PR creation failed for {ws_repo.repo_name}: {e}", tags=["build", "github_pr", "error"], ) - if pr_url: - pr_results.append(RepoPRResult( - repo_name=_repo_name_from_url(cfg.repo_url) if cfg.repo_url else "repo", - repo_url=cfg.repo_url, - success=True, - pr_url=pr_url, - pr_number=pr_result.get("pr_number", 0), - )) - if cfg.check_ci and pr_result.get("pr_number"): - gate = await _run_ci_gate( - repo_path=repo_path, - pr_number=pr_result.get("pr_number", 0), - pr_url=pr_url, - integration_branch=git_config["integration_branch"], - base_branch=base_branch, - cfg=cfg, - resolved_models=resolved, - goal=goal, - completed_issues=dag_result.get("completed_issues", []), + else: + # Single-repo: existing PR logic, wrap result in RepoPRResult + remote_url = git_config.get("remote_url", "") if git_config else "" + if remote_url and cfg.enable_github_pr: + app.note("Phase 4: Push + PR", tags=["build", "github_pr"]) + base_branch = ( + cfg.github_pr_base + or (git_config.get("remote_default_branch") if git_config else "") + or "main" + ) + pr_url = "" + try: + pr_result = _unwrap(await app.call( + f"{NODE_ID}.run_github_pr", + repo_path=repo_path, + integration_branch=git_config["integration_branch"], + base_branch=base_branch, + goal=goal, + build_summary=build_summary, + completed_issues=dag_result.get("completed_issues", []), + accumulated_debt=dag_result.get("accumulated_debt", []), + artifacts_dir=plan_result.get("artifacts_dir", artifacts_dir), + model=resolved["git_model"], + permission_mode=cfg.permission_mode, + ai_provider=cfg.ai_provider, + ), "run_github_pr") + pr_url = pr_result.get("pr_url", "") + if pr_url: + app.note(f"PR created: {pr_url}", tags=["build", "github_pr", "complete"]) + + # Programmatically append plan docs to PR body + if prd_markdown or architecture_markdown: + try: + current_body = subprocess.run( + ["gh", "pr", "view", str(pr_result.get("pr_number", 0)), + "--json", "body", "--jq", ".body"], + cwd=repo_path, capture_output=True, text=True, check=True, + ).stdout.strip() + + plan_sections = "\n\n---\n" + if prd_markdown: + plan_sections += ( + "\n
📋 PRD (Product Requirements Document)" + "\n\n" + + prd_markdown + + "\n\n
\n" + ) + if architecture_markdown: + plan_sections += ( + "\n
🏗️ Architecture\n\n" + + architecture_markdown + + "\n\n
\n" + ) + + new_body = current_body + plan_sections + + subprocess.run( + ["gh", "pr", "edit", str(pr_result.get("pr_number", 0)), + "--body", new_body], + cwd=repo_path, capture_output=True, text=True, check=True, + ) + app.note( + "Plan docs appended to PR body", + tags=["build", "github_pr", "plan_docs"], + ) + except subprocess.CalledProcessError as e: + app.note( + f"Failed to append plan docs to PR (non-fatal): {e}", + tags=["build", "github_pr", "plan_docs", "warning"], + ) + else: + app.note( + f"PR creation failed: {pr_result.get('error_message', 'unknown')}", + tags=["build", "github_pr", "error"], ) - ci_gate_results.append({ - "repo_name": ( - _repo_name_from_url(cfg.repo_url) - if cfg.repo_url else "repo" - ), - **gate, - }) - except Exception as e: - app.note(f"PR creation failed: {e}", tags=["build", "github_pr", "error"]) + if pr_url: + pr_results.append(RepoPRResult( + repo_name=_repo_name_from_url(cfg.repo_url) if cfg.repo_url else "repo", + repo_url=cfg.repo_url, + success=True, + pr_url=pr_url, + pr_number=pr_result.get("pr_number", 0), + )) + if cfg.check_ci and pr_result.get("pr_number"): + gate = await _run_ci_gate( + repo_path=repo_path, + pr_number=pr_result.get("pr_number", 0), + pr_url=pr_url, + integration_branch=git_config["integration_branch"], + base_branch=base_branch, + cfg=cfg, + resolved_models=resolved, + goal=goal, + completed_issues=dag_result.get("completed_issues", []), + ) + ci_gate_results.append({ + "repo_name": ( + _repo_name_from_url(cfg.repo_url) + if cfg.repo_url else "repo" + ), + **gate, + }) + except Exception as e: + app.note(f"PR creation failed: {e}", tags=["build", "github_pr", "error"]) + + # 5. WORKSPACE CLEANUP (non-blocking) + if manifest and manifest.workspace_root: + try: + import shutil + shutil.rmtree(manifest.workspace_root, ignore_errors=True) + app.note( + f"Workspace cleaned up: {manifest.workspace_root}", + tags=["build", "cleanup"], + ) + except Exception: + pass # non-blocking - # 5. WORKSPACE CLEANUP (non-blocking) - if manifest and manifest.workspace_root: - try: - import shutil - shutil.rmtree(manifest.workspace_root, ignore_errors=True) - app.note( - f"Workspace cleaned up: {manifest.workspace_root}", - tags=["build", "cleanup"], - ) - except Exception: - pass # non-blocking + return BuildResult( + plan_result=plan_result, + dag_state=dag_result, + verification=verification, + success=success, + summary=f"{'Success' if success else 'Partial'}: {completed}/{total} issues completed" + + (f", verification: {verification.get('summary', '')}" if verification else ""), + pr_results=pr_results, + ci_gate_results=ci_gate_results, + ).model_dump() - return BuildResult( - plan_result=plan_result, - dag_state=dag_result, - verification=verification, - success=success, - summary=f"{'Success' if success else 'Partial'}: {completed}/{total} issues completed" - + (f", verification: {verification.get('summary', '')}" if verification else ""), - pr_results=pr_results, - ci_gate_results=ci_gate_results, - ).model_dump() + finally: + if _scope_id: + from swe_af.hitl import clear_scoped_credentials # noqa: PLC0415 + clear_scoped_credentials(_scope_id) @app.reasoner() @@ -1334,6 +1379,25 @@ async def plan( workspace_manifest=workspace_manifest, ), "run_product_manager") + # 1.5. Environment Scout — negotiate scoped credentials with the user + # before architecture begins. Only engages when HAX is enabled (auto- + # skipped at the reasoner level when HAX_API_KEY is unset). The scout + # stashes negotiated values directly in the in-memory store keyed by + # run_id; subsequent reasoners pull them via get_scoped_credentials. + # No-op when HAX is disabled. + if os.environ.get("HAX_API_KEY", "").strip(): + app.note("Phase 1.5: Environment Scout", tags=["pipeline", "scout"]) + _unwrap(await app.call( + f"{NODE_ID}.run_environment_scout", + prd=prd, + repo_path=repo_path, + artifacts_dir=artifacts_dir, + model=pm_model, + permission_mode=permission_mode, + ai_provider=ai_provider, + workspace_manifest=workspace_manifest, + ), "run_environment_scout") + # 2. Architect designs the solution app.note("Phase 2: Architect", tags=["pipeline", "architect"]) arch = _unwrap(await app.call( diff --git a/swe_af/hitl/__init__.py b/swe_af/hitl/__init__.py index 8e7a6de..eeb6365 100644 --- a/swe_af/hitl/__init__.py +++ b/swe_af/hitl/__init__.py @@ -10,6 +10,19 @@ format_prior_user_responses, request_user_input_and_pause, ) +from swe_af.hitl.credentials_store import ( + clear_scoped_credentials, + get_scoped_credentials, + inject_credentials_into_env, + store_scoped_credentials, +) +from swe_af.hitl.scout_schema import ScoutResult +from swe_af.hitl.services import ( + KNOWN_SERVICES, + ServiceCredentialSpec, + detect_services_from_repo, + known_service_summary_for_prompt, +) from swe_af.hitl.wrapper import AskUserBudget, run_with_ask_user __all__ = [ @@ -17,10 +30,19 @@ "AskUserFormField", "AskUserResponse", "AskUserBudget", + "KNOWN_SERVICES", + "ScoutResult", + "ServiceCredentialSpec", "approval_webhook_url", "build_form_builder", "build_hax_client_from_env", + "clear_scoped_credentials", + "detect_services_from_repo", "format_prior_user_responses", + "get_scoped_credentials", + "inject_credentials_into_env", + "known_service_summary_for_prompt", "request_user_input_and_pause", "run_with_ask_user", + "store_scoped_credentials", ] diff --git a/swe_af/hitl/credentials_store.py b/swe_af/hitl/credentials_store.py new file mode 100644 index 0000000..484516b --- /dev/null +++ b/swe_af/hitl/credentials_store.py @@ -0,0 +1,96 @@ +"""Process-local, execution-scoped store for credentials the scout negotiates. + +Why a module-level dict instead of ``BuildConfig`` or ``app.memory``: + +* ``BuildConfig`` is serialized through ``to_execution_config_dict()`` and + passed to ``execute()`` via ``app.call``. The control plane logs all + ``app.call`` input data, which would persist the credentials. +* ``app.memory`` (scope=``run``) is synced to the control plane DB by design + — also persists. +* Filesystem under ``artifacts_dir`` is written to disk and archived. + +The scout's negotiation produces credentials that should *only* live in the +agent process's memory for the duration of the build, then be cleared. A +module-level dict keyed by execution_id is the simplest way to achieve that +while keeping concurrent builds (which share the Python process) isolated. + +Security boundary: + +* Values are never logged. +* Values are never written to disk. +* Values are not serialized through ``app.call`` (use this store from inside + the receiving reasoner, not as a kwarg). +* The build()'s ``finally`` block MUST call ``clear_scoped_credentials`` — + every error path included. +""" + +from __future__ import annotations + +import threading + +# Module-level. Keyed by execution_id (each build has its own). +_STORE: dict[str, dict[str, str]] = {} +_LOCK = threading.Lock() + + +def store_scoped_credentials(execution_id: str, creds: dict[str, str]) -> None: + """Replace the stored credentials for ``execution_id`` with ``creds``. + + Filters out None/empty values so a partially-filled mega-form (user skipped + some fields) doesn't surface as empty env vars to downstream subprocesses + (which can be confusing — "is the env set or not?"). + """ + if not execution_id: + return + filtered = { + k: v + for k, v in (creds or {}).items() + if isinstance(v, str) and v.strip() + } + with _LOCK: + if filtered: + _STORE[execution_id] = filtered + else: + _STORE.pop(execution_id, None) + + +def get_scoped_credentials(execution_id: str) -> dict[str, str]: + """Return a *copy* of the stored credentials for ``execution_id``. + + Returns an empty dict if nothing is stored — callers should treat that as + "no credentials negotiated; rely on os.environ only". + """ + if not execution_id: + return {} + with _LOCK: + stored = _STORE.get(execution_id) + return dict(stored) if stored else {} + + +def clear_scoped_credentials(execution_id: str) -> None: + """Remove credentials for ``execution_id`` from process memory.""" + if not execution_id: + return + with _LOCK: + _STORE.pop(execution_id, None) + + +def inject_credentials_into_env( + base_env: dict[str, str] | None, execution_id: str +) -> dict[str, str]: + """Return a NEW env dict = ``base_env`` ∪ scoped credentials. + + Scoped credentials WIN over ``base_env`` so a freshly-minted token from + the scout overrides any stale value already in os.environ (e.g. an + expired RAILWAY_TOKEN from a previous build). + + Callers should use this immediately before each ``router.harness(...)`` + call, passing the result as the ``env=`` kwarg. The base is normally + ``dict(os.environ)`` so the subprocess still inherits everything the + parent has — we only ADD/override the scoped creds. + """ + merged: dict[str, str] = dict(base_env or {}) + creds = get_scoped_credentials(execution_id) + if creds: + merged.update(creds) + return merged diff --git a/swe_af/hitl/scout_schema.py b/swe_af/hitl/scout_schema.py new file mode 100644 index 0000000..503f9ea --- /dev/null +++ b/swe_af/hitl/scout_schema.py @@ -0,0 +1,59 @@ +"""Structured output schema for ``run_environment_scout``. + +The scout is a two-pass reasoner driven by ``run_with_ask_user``: + +* Pass 1 (no ``prior_user_responses``): scan the repo, populate + ``detected_services`` and ``ask_user_form`` with one optional text field + per service. ``scoped_credentials`` stays empty. +* Pass 2 (after the user submits): take the values from + ``prior_user_responses[-1]['values']`` and surface them as + ``scoped_credentials``. ``ask_user_form`` is cleared. + +If no services are detected on pass 1, the scout returns +``ask_user_form=None`` immediately and the wrapper short-circuits — no pause, +no second pass. +""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + +from swe_af.hitl.ask_user import AskUserForm +from swe_af.hitl.services import ServiceCredentialSpec + + +class ScoutResult(BaseModel): + """Structured output the scout LLM emits.""" + + detected_services: list[ServiceCredentialSpec] = Field( + default_factory=list, + description=( + "Third-party services the scout believes the PRD work touches. " + "On pass 1 this matches the form's fields one-for-one." + ), + ) + scoped_credentials: dict[str, str] = Field( + default_factory=dict, + description=( + "Populated on pass 2 ONLY. Keys are env var names (matching " + "ServiceCredentialSpec.env_var_name); values are the secrets the " + "user provided. Must NOT be logged or persisted." + ), + ) + skipped_services: list[str] = Field( + default_factory=list, + description=( + "Env var names the user explicitly left blank (informed opt-out). " + "Surfaced so downstream code can warn early if a critical " + "credential is missing." + ), + ) + summary: str = Field( + default="", + description=( + "One-line summary the scout writes — e.g. 'Negotiated 2 " + "credentials: RAILWAY_TOKEN, SENTRY_AUTH_TOKEN. User skipped: " + "DATADOG_API_KEY.' Safe to log; never includes secret values." + ), + ) + ask_user_form: AskUserForm | None = None diff --git a/swe_af/hitl/services.py b/swe_af/hitl/services.py new file mode 100644 index 0000000..46db874 --- /dev/null +++ b/swe_af/hitl/services.py @@ -0,0 +1,169 @@ +"""Knowledge base of common third-party services + how to mint a scoped token. + +Used by ``run_environment_scout`` to recognize signal files in a repo (e.g. +``railway.toml``, ``fly.toml``, ``sentry.properties``) and ask the user for +the matching scoped credential. The LLM inside the scout reasoner consumes +``KNOWN_SERVICES`` as a hint list; ``detect_services_from_repo`` provides a +deterministic pre-pass that the LLM can build on. +""" + +from __future__ import annotations + +import os +from typing import Iterable + +from pydantic import BaseModel, Field + + +class ServiceCredentialSpec(BaseModel): + """One row in the knowledge base; also returned by the scout.""" + + service_name: str = Field(description="Human-readable service name shown to the user.") + env_var_name: str = Field( + description=( + "Env var the build expects (becomes the ask_user_form field id). " + "Match what the service's CLI / SDK looks for by default." + ) + ) + mint_url: str = Field( + description=( + "URL where the user mints a scoped/temporary token. Surfaced in " + "the form description so the user can click through and paste back." + ) + ) + permissions_hint: str = Field( + description=( + "Short hint on what scope / TTL to request when minting. Shown to " + "the user in the form so they don't over-grant access." + ) + ) + signal_files: list[str] = Field( + default_factory=list, + description=( + "Glob-ish filenames whose presence in the repo strongly implies " + "this service is in use. Used by detect_services_from_repo." + ), + ) + evidence_template: str = Field( + default="", + description=( + "Sentence template explaining WHY the build needs this credential, " + "used in the form description. Use {{signal}} as the placeholder." + ), + ) + + +KNOWN_SERVICES: list[ServiceCredentialSpec] = [ + ServiceCredentialSpec( + service_name="Railway", + env_var_name="RAILWAY_TOKEN", + mint_url="https://railway.com/account/tokens", + permissions_hint="Project token, read-only if possible, set expiry to 1 day.", + signal_files=["railway.toml", "railway.json", ".railway/config.json"], + evidence_template="Saw {signal} — build likely needs Railway access to deploy or query services.", + ), + ServiceCredentialSpec( + service_name="Fly.io", + env_var_name="FLY_API_TOKEN", + mint_url="https://fly.io/user/personal_access_tokens", + permissions_hint="Deploy token scoped to this app, 1-day expiry.", + signal_files=["fly.toml", "fly.io.toml", ".fly/config.toml"], + evidence_template="Saw {signal} — build may need Fly.io access for deploys.", + ), + ServiceCredentialSpec( + service_name="Vercel", + env_var_name="VERCEL_TOKEN", + mint_url="https://vercel.com/account/tokens", + permissions_hint="Scope to this team only, 1-day expiry.", + signal_files=["vercel.json", ".vercel/project.json"], + evidence_template="Saw {signal} — build may need Vercel access.", + ), + ServiceCredentialSpec( + service_name="Supabase", + env_var_name="SUPABASE_ACCESS_TOKEN", + mint_url="https://supabase.com/dashboard/account/tokens", + permissions_hint="Personal access token, 1-day expiry — required only if migrations or schema changes are part of the work.", + signal_files=["supabase/config.toml", "supabase/.gitignore", "supabase/migrations"], + evidence_template="Saw {signal} — Supabase project detected.", + ), + ServiceCredentialSpec( + service_name="Sentry", + env_var_name="SENTRY_AUTH_TOKEN", + mint_url="https://sentry.io/settings/account/api/auth-tokens/", + permissions_hint="Auth token scoped to project:read + project:releases, 1-day expiry.", + signal_files=["sentry.properties", ".sentryclirc", "sentry.io.json"], + evidence_template="Saw {signal} — Sentry integration detected.", + ), + ServiceCredentialSpec( + service_name="Datadog", + env_var_name="DATADOG_API_KEY", + mint_url="https://app.datadoghq.com/organization-settings/api-keys", + permissions_hint="Application API key (NOT a client token), restricted to read scopes if possible.", + signal_files=["datadog.yaml", ".datadog/conf.yaml"], + evidence_template="Saw {signal} — Datadog integration detected.", + ), + ServiceCredentialSpec( + service_name="GitHub", + env_var_name="GH_TOKEN", + mint_url="https://github.com/settings/personal-access-tokens/new", + permissions_hint="Fine-grained PAT scoped to THIS repo only, repo:contents+pull-requests, 1-day expiry.", + signal_files=[".github/workflows", "CODEOWNERS"], + evidence_template="Saw {signal} — work likely needs GitHub API beyond what gh CLI provides anonymously.", + ), + ServiceCredentialSpec( + service_name="OpenAI", + env_var_name="OPENAI_API_KEY", + mint_url="https://platform.openai.com/api-keys", + permissions_hint="Restricted API key with low usage cap; 1-day expiry.", + signal_files=[], # Detected via dependency manifests, not signal files. + evidence_template="Project depends on the OpenAI SDK.", + ), + ServiceCredentialSpec( + service_name="Anthropic", + env_var_name="ANTHROPIC_API_KEY", + mint_url="https://console.anthropic.com/settings/keys", + permissions_hint="Restricted API key, set monthly spend limit, 1-day expiry.", + signal_files=[], + evidence_template="Project depends on the Anthropic SDK.", + ), +] + + +def detect_services_from_repo(repo_path: str) -> list[ServiceCredentialSpec]: + """Deterministic pre-pass: look for ``signal_files`` under ``repo_path``. + + Returns the subset of ``KNOWN_SERVICES`` whose signal files exist on disk. + This is a hint to the LLM scout — the final decision on which credentials + to ask for stays with the scout, which can incorporate PRD context the + static scan can't see. + + Notes: + * No recursive glob; checks each ``signal_file`` as a path under + ``repo_path``. ``signal_file`` may be a file or a directory; both + count as a hit. + * Returns an empty list if ``repo_path`` doesn't exist (don't raise). + * Order matches ``KNOWN_SERVICES`` so callers get stable output. + """ + if not repo_path or not os.path.isdir(repo_path): + return [] + hits: list[ServiceCredentialSpec] = [] + for spec in KNOWN_SERVICES: + for signal in spec.signal_files: + candidate = os.path.join(repo_path, signal) + if os.path.exists(candidate): + hits.append(spec) + break + return hits + + +def known_service_summary_for_prompt(specs: Iterable[ServiceCredentialSpec]) -> str: + """Render a markdown bullet list of service specs for inclusion in a prompt.""" + lines: list[str] = [] + for spec in specs: + signals = ", ".join(f"`{s}`" for s in spec.signal_files) or "(no static signal)" + lines.append( + f"- **{spec.service_name}** — env `{spec.env_var_name}`; " + f"signals: {signals}; mint at {spec.mint_url}; " + f"hint: {spec.permissions_hint}" + ) + return "\n".join(lines) diff --git a/swe_af/prompts/environment_scout.py b/swe_af/prompts/environment_scout.py new file mode 100644 index 0000000..9f5c618 --- /dev/null +++ b/swe_af/prompts/environment_scout.py @@ -0,0 +1,151 @@ +"""Prompt builder for the Environment Scout reasoner. + +The scout runs after the Product Manager and before the Architect. Its job +is to look at the PRD and the repo, figure out which third-party services +the build will need credentials for, and ask the user — in a single mega- +form — for scoped/temporary tokens BEFORE downstream phases start running. + +The scout is parsimonious: it only asks for credentials whose absence would +actually block PRD execution. "Project uses Sentry" is not enough; "PRD +requires adding a new Sentry alerting rule" is. +""" + +from __future__ import annotations + +from swe_af.execution.schemas import WorkspaceManifest +from swe_af.hitl.ask_user import format_prior_user_responses +from swe_af.hitl.services import ( + KNOWN_SERVICES, + ServiceCredentialSpec, + known_service_summary_for_prompt, +) +from swe_af.prompts._utils import workspace_context_block + +SYSTEM_PROMPT = """\ +You are an Environment Scout. The build pipeline runs autonomously, and you +have a one-time chance — before the architect designs the solution — to +negotiate any third-party credentials the build will need. + +## Your responsibilities + +1. **Read the PRD** to understand what the build is actually doing. +2. **Read the repo** to identify which third-party services it integrates with. + Look at config files (`railway.toml`, `fly.toml`, `vercel.json`, + `sentry.properties`, `supabase/config.toml`, etc.), dependency manifests + (`package.json`, `pyproject.toml`, `requirements*.txt`, `go.mod`, + `Cargo.toml`), CI workflows (`.github/workflows/`), and Dockerfiles. +3. **Decide which detected services actually need credentials for THIS work.** + Project uses Sentry but PRD never touches alerts/releases? Don't ask. + Project uses Railway and PRD adds a new endpoint that queries the DB? Ask. +4. **Build a single mega-form** with one OPTIONAL text field per service. + Field `id` = the env var name the service's CLI/SDK expects. + `label` = " token" (e.g. "Railway token"). + `description` = brief evidence ("Saw railway.toml; need to query staging DB") + PLUS the mint URL PLUS the permissions hint. + `required` = false (user can skip any field; informed opt-out). + `type` = "input" (NEVER "textarea" for secrets — fixed-height input pill). +5. **Return a one-line summary** describing what you negotiated and what was + skipped. NEVER include the secret values in the summary. + +## When NOT to ask + +- The PRD work is purely local (no network calls, no deploys, no schema + changes against a managed service). +- A service is detected but the work doesn't touch it. +- You've already asked once — `prior_user_responses` is populated. Use those + values; do NOT re-ask the same questions. + +## Pass 2 — after the user submits + +When `prior_user_responses` is non-empty, you are being re-invoked with the +user's answers. DO: + +- Set `scoped_credentials` to the dict of submitted values, filtering blanks. +- Set `skipped_services` to the env var names the user left blank. +- Set `ask_user_form` to `null`. +- Set `summary` to a one-line description (env var names only, never values). +- Leave `detected_services` as the list you produced on pass 1, so the + audit trail is preserved. + +## Security + +- NEVER log, write to a file, or include a secret value in `summary`, + `detected_services`, or anywhere outside `scoped_credentials`. +- ALWAYS use `type: "input"` for credential fields, never `textarea`. +- The credentials you negotiate live in process memory only — they will + never reach git history, build artifacts, or workflow logs.\ +""" + + +def environment_scout_task_prompt( + *, + prd: dict, + repo_path: str, + workspace_manifest: WorkspaceManifest | None = None, + prior_user_responses: list[dict] | None = None, + known_services: list[ServiceCredentialSpec] | None = None, +) -> str: + """Build the per-call task prompt the scout receives.""" + services = known_services or KNOWN_SERVICES + sections: list[str] = [] + + ws_block = workspace_context_block(workspace_manifest) + if ws_block: + sections.append(ws_block) + + prior_block = format_prior_user_responses(prior_user_responses) + if prior_block: + sections.append(prior_block) + + sections.append("## Repository") + sections.append(f"`{repo_path}`") + sections.append("Inspect this tree to confirm which services are actually in use.") + + sections.append("\n## PRD") + description = prd.get("validated_description", "") or "" + must_have = prd.get("must_have", []) or [] + nice_to_have = prd.get("nice_to_have", []) or [] + acceptance = prd.get("acceptance_criteria", []) or [] + if description: + sections.append(f"**Description:** {description}") + if must_have: + sections.append("**Must-have:**") + sections.extend(f" - {item}" for item in must_have) + if nice_to_have: + sections.append("**Nice-to-have:**") + sections.extend(f" - {item}" for item in nice_to_have) + if acceptance: + sections.append("**Acceptance criteria:**") + sections.extend(f" - {item}" for item in acceptance) + + sections.append("\n## Known services (knowledge base)") + sections.append( + "Use these as a starting point. You MAY add services not in this list " + "if you see clear evidence in the repo; in that case, pick a sensible " + "`env_var_name` matching the service's CLI / SDK convention." + ) + sections.append(known_service_summary_for_prompt(services)) + + sections.append("\n## Your task") + if prior_user_responses: + sections.append( + "You are being re-invoked AFTER the user submitted the form. " + "Take the values from `prior_user_responses` above, surface them " + "as `scoped_credentials` (filtering blanks), set `ask_user_form` " + "to null, write a brief `summary`, and return." + ) + else: + sections.append( + "1. Read the PRD and inspect the repo.\n" + "2. Decide which credentials are GENUINELY required for this work.\n" + "3. If none: set `detected_services=[]`, `ask_user_form=null`, " + "`summary='No third-party credentials required for this work.'`, return.\n" + "4. Otherwise: populate `detected_services` AND construct a single " + "`ask_user_form` whose `fields` list has one `input` field per " + "detected service. Field `id` MUST equal the service's " + "`env_var_name`. Mark all fields `required: false`. Include the " + "mint URL in each field's `description` so the user can click " + "through." + ) + + return "\n".join(sections) diff --git a/swe_af/reasoners/pipeline.py b/swe_af/reasoners/pipeline.py index 4fd0ea0..c0d87ca 100644 --- a/swe_af/reasoners/pipeline.py +++ b/swe_af/reasoners/pipeline.py @@ -237,6 +237,123 @@ async def _invoke_pm(prior_user_responses: list[dict] | None) -> PRD | None: return parsed.model_dump() +@router.reasoner() +async def run_environment_scout( + prd: dict, + repo_path: str, + artifacts_dir: str = ".artifacts", + model: str = "sonnet", + max_turns: int = DEFAULT_AGENT_MAX_TURNS, + permission_mode: str = "", + ai_provider: str = "claude", + workspace_manifest: dict | None = None, + prior_user_responses: list[dict] | None = None, +) -> dict: + """Negotiate scoped third-party credentials with the user before architecture. + + Runs once between PM and Architect. Reads the PRD + repo, identifies + third-party services the build will need to talk to, and asks the user + via a single Hax form for scoped/temporary tokens. The negotiated values + are returned in ``scoped_credentials`` (env_var -> value); the caller is + responsible for stashing them in the in-memory store with + ``store_scoped_credentials(execution_id, creds)``. + + Returns an empty ``scoped_credentials`` dict when: + * HAX is disabled (``build_hax_client_from_env`` returns None) + * The LLM decides no credentials are needed (e.g. purely local PRD) + * The user opts out by leaving every form field blank + """ + router.note("Environment scout starting", tags=["scout", "start"]) + + base = os.path.join(os.path.abspath(repo_path), artifacts_dir) + _ensure_paths(base) # ensure paths exist; we don't write artifacts here + + from swe_af.execution.schemas import WorkspaceManifest # noqa: PLC0415 + from swe_af.hitl import ( # noqa: PLC0415 + AskUserBudget, + ScoutResult, + approval_webhook_url, + build_hax_client_from_env, + run_with_ask_user, + store_scoped_credentials, + ) + from swe_af.prompts.environment_scout import ( # noqa: PLC0415 + SYSTEM_PROMPT, + environment_scout_task_prompt, + ) + + ws_manifest = ( + WorkspaceManifest(**workspace_manifest) if workspace_manifest else None + ) + provider = runtime_to_harness_adapter(ai_provider) + + async def _invoke_scout(prior_user_responses: list[dict] | None) -> ScoutResult | None: + task_prompt = environment_scout_task_prompt( + prd=prd, + repo_path=repo_path, + workspace_manifest=ws_manifest, + prior_user_responses=prior_user_responses, + ) + result = await router.harness( + prompt=task_prompt, + schema=ScoutResult, + provider=provider, + model=model, + max_turns=max_turns, + tools=["Read", "Glob", "Grep", "Bash"], + permission_mode=permission_mode or None, + system_prompt=SYSTEM_PROMPT, + cwd=repo_path, + ) + check_fatal_harness_error(result) + return result.parsed + + initial_prior = list(prior_user_responses or []) + parsed = await run_with_ask_user( + reasoner_fn=_invoke_scout, + reasoner_kwargs={"prior_user_responses": initial_prior}, + app=router, + hax_client=build_hax_client_from_env(), + budget=AskUserBudget(remaining=2), + webhook_url=approval_webhook_url(router), + note_label="environment_scout", + ) + + if parsed is None: + router.note( + "Scout produced no parseable result — proceeding without credentials", + tags=["scout", "fallback"], + ) + return ScoutResult( + summary="Scout produced no parseable result; proceeding without credentials.", + ).model_dump(exclude={"scoped_credentials"}) + + # Stash credentials in the process-local in-memory store under the build's + # run_id (shared across every reasoner in this build). This MUST happen + # before we strip them out of the return value — otherwise the build() + # caller has no way to retrieve them. + ctx = getattr(router, "ctx", None) + scope_id = ( + getattr(ctx, "run_id", None) or getattr(ctx, "root_workflow_id", None) or "" + ) + if scope_id and parsed.scoped_credentials: + store_scoped_credentials(scope_id, parsed.scoped_credentials) + + creds_count = len(parsed.scoped_credentials) + skipped_count = len(parsed.skipped_services) + router.note( + f"Scout complete: {creds_count} credential(s) negotiated, " + f"{skipped_count} skipped", + tags=["scout", "complete"], + ) + # SAFETY: scoped_credentials is EXCLUDED from the returned dict. The + # control plane logs reasoner return values, so any value in this dict is + # persisted. The credentials live only in process memory via the call + # above; downstream reasoners retrieve them with get_scoped_credentials + # using the same scope_id (router.ctx.run_id). + return parsed.model_dump(exclude={"scoped_credentials"}) + + @router.reasoner() async def run_architect( prd: dict, diff --git a/tests/test_environment_scout.py b/tests/test_environment_scout.py new file mode 100644 index 0000000..b25a2f6 --- /dev/null +++ b/tests/test_environment_scout.py @@ -0,0 +1,304 @@ +"""Unit tests for the swe_af.hitl environment-scout substrate. + +Three pillars covered: + + 1. ``swe_af.hitl.services`` — static signal-file detection. + 2. ``swe_af.hitl.credentials_store`` — process-local store with isolation + and thread safety. + 3. ``swe_af.hitl.scout_schema`` + the wrapper loop — the LLM closure's + pass-1/pass-2 round-trip through ``run_with_ask_user``. +""" + +from __future__ import annotations + +import threading +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from swe_af.hitl.ask_user import AskUserForm, AskUserFormField +from swe_af.hitl.credentials_store import ( + _STORE, + clear_scoped_credentials, + get_scoped_credentials, + inject_credentials_into_env, + store_scoped_credentials, +) +from swe_af.hitl.scout_schema import ScoutResult +from swe_af.hitl.services import ( + KNOWN_SERVICES, + ServiceCredentialSpec, + detect_services_from_repo, + known_service_summary_for_prompt, +) +from swe_af.hitl.wrapper import AskUserBudget, run_with_ask_user + + +# --------------------------------------------------------------------------- +# services.py — detection +# --------------------------------------------------------------------------- + + +def test_known_services_inventory_covers_baseline(): + """We promised at least 8 services in the spec — pin that here.""" + assert len(KNOWN_SERVICES) >= 8 + names = {s.service_name for s in KNOWN_SERVICES} + assert {"Railway", "Fly.io", "Vercel", "Supabase", "Sentry"}.issubset(names) + + +def test_detect_services_returns_empty_for_missing_path(): + assert detect_services_from_repo("") == [] + assert detect_services_from_repo("/this/path/does/not/exist") == [] + + +def test_detect_services_finds_railway_and_sentry(tmp_path): + (tmp_path / "railway.toml").write_text("[deploy]\n") + (tmp_path / "sentry.properties").write_text("dsn=foo\n") + found = detect_services_from_repo(str(tmp_path)) + names = [s.service_name for s in found] + assert "Railway" in names + assert "Sentry" in names + + +def test_detect_services_signal_can_be_directory(tmp_path): + """A signal file may actually be a directory (e.g. supabase/migrations).""" + (tmp_path / "supabase").mkdir() + (tmp_path / "supabase" / "migrations").mkdir() + found = detect_services_from_repo(str(tmp_path)) + assert "Supabase" in [s.service_name for s in found] + + +def test_known_service_summary_for_prompt_is_markdown_bullets(): + out = known_service_summary_for_prompt(KNOWN_SERVICES[:2]) + assert out.startswith("- **") + assert "env `" in out + assert "mint at " in out + + +# --------------------------------------------------------------------------- +# credentials_store.py — round-trip, filtering, isolation, thread safety +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _wipe_store(): + """Each test starts and ends with an empty store.""" + _STORE.clear() + yield + _STORE.clear() + + +def test_store_and_get_round_trip(): + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "secret"}) + assert get_scoped_credentials("build-A") == {"RAILWAY_TOKEN": "secret"} + + +def test_store_filters_blank_and_none_values(): + store_scoped_credentials( + "build-A", + {"RAILWAY_TOKEN": "ok", "EMPTY": "", "WHITESPACE": " ", "NONE": None}, # type: ignore[dict-item] + ) + got = get_scoped_credentials("build-A") + assert got == {"RAILWAY_TOKEN": "ok"} + + +def test_store_isolation_between_builds(): + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "a"}) + store_scoped_credentials("build-B", {"RAILWAY_TOKEN": "b"}) + assert get_scoped_credentials("build-A") == {"RAILWAY_TOKEN": "a"} + assert get_scoped_credentials("build-B") == {"RAILWAY_TOKEN": "b"} + + +def test_clear_only_removes_the_specified_build(): + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "a"}) + store_scoped_credentials("build-B", {"RAILWAY_TOKEN": "b"}) + clear_scoped_credentials("build-A") + assert get_scoped_credentials("build-A") == {} + assert get_scoped_credentials("build-B") == {"RAILWAY_TOKEN": "b"} + + +def test_get_returns_copy_not_reference(): + """Mutating the returned dict must not affect the stored value.""" + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "a"}) + got = get_scoped_credentials("build-A") + got["RAILWAY_TOKEN"] = "tampered" + got["NEW_VAR"] = "injected" + assert get_scoped_credentials("build-A") == {"RAILWAY_TOKEN": "a"} + + +def test_concurrent_writes_isolate_by_execution_id(): + """Two threads writing under different keys must not race each other.""" + def writer(name: str, value: str): + for _ in range(100): + store_scoped_credentials(name, {"TOKEN": value}) + + t1 = threading.Thread(target=writer, args=("build-A", "a")) + t2 = threading.Thread(target=writer, args=("build-B", "b")) + t1.start() + t2.start() + t1.join() + t2.join() + + assert get_scoped_credentials("build-A") == {"TOKEN": "a"} + assert get_scoped_credentials("build-B") == {"TOKEN": "b"} + + +def test_inject_credentials_returns_new_dict(): + base = {"PATH": "/usr/bin", "RAILWAY_TOKEN": "stale"} + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "fresh"}) + merged = inject_credentials_into_env(base, "build-A") + assert merged == {"PATH": "/usr/bin", "RAILWAY_TOKEN": "fresh"} + # base must be untouched. + assert base == {"PATH": "/usr/bin", "RAILWAY_TOKEN": "stale"} + + +def test_inject_credentials_no_scope_returns_base_only(): + base = {"PATH": "/usr/bin"} + merged = inject_credentials_into_env(base, "") + assert merged == base + assert merged is not base # still a copy + + +def test_inject_credentials_empty_base_works(): + store_scoped_credentials("build-A", {"RAILWAY_TOKEN": "x"}) + merged = inject_credentials_into_env(None, "build-A") + assert merged == {"RAILWAY_TOKEN": "x"} + + +# --------------------------------------------------------------------------- +# scout_schema.py + wrapper closure round-trip +# --------------------------------------------------------------------------- + + +def _scout_result_pass1(spec_form: AskUserForm) -> ScoutResult: + return ScoutResult( + detected_services=[ + ServiceCredentialSpec( + service_name="Railway", + env_var_name="RAILWAY_TOKEN", + mint_url="https://example", + permissions_hint="hint", + signal_files=["railway.toml"], + ) + ], + scoped_credentials={}, + skipped_services=[], + ask_user_form=spec_form, + ) + + +def _scout_result_pass2(values: dict[str, str]) -> ScoutResult: + return ScoutResult( + detected_services=[], + scoped_credentials=values, + skipped_services=[], + summary=f"Got {len(values)} credential(s).", + ask_user_form=None, + ) + + +def _approval_result(values: dict[str, str]): + obj = MagicMock() + obj.decision = "approved" + obj.feedback = "" + obj.raw_response = {"values": values} + return obj + + +def _silent_app(): + app = MagicMock() + app.note = MagicMock() + app.pause = AsyncMock() + return app + + +@pytest.mark.asyncio +async def test_scout_closure_pass1_emits_form_pass2_emits_credentials(): + """Two-pass dance: scout asks once, gets the answers, returns the dict.""" + form = AskUserForm( + title="Pick credentials", + fields=[ + AskUserFormField( + id="RAILWAY_TOKEN", + type="input", + label="Railway token", + required=False, + ), + ], + ) + reasoner = AsyncMock( + side_effect=[ + _scout_result_pass1(form), + _scout_result_pass2({"RAILWAY_TOKEN": "rt_xxx"}), + ] + ) + hax = MagicMock() + hax.create_request = MagicMock(return_value=MagicMock(id="r1", url="u")) + app = _silent_app() + app.pause.return_value = _approval_result({"RAILWAY_TOKEN": "rt_xxx"}) + + parsed = await run_with_ask_user( + reasoner_fn=reasoner, + reasoner_kwargs={"prior_user_responses": []}, + app=app, + hax_client=hax, + budget=AskUserBudget(remaining=3), + ) + + assert isinstance(parsed, ScoutResult) + assert parsed.scoped_credentials == {"RAILWAY_TOKEN": "rt_xxx"} + assert parsed.ask_user_form is None + assert reasoner.await_count == 2 + + # The second invocation should have received the prior values. + second_call_kwargs = reasoner.await_args_list[1].kwargs + prior = second_call_kwargs["prior_user_responses"] + assert len(prior) == 1 + assert prior[0]["values"] == {"RAILWAY_TOKEN": "rt_xxx"} + + +@pytest.mark.asyncio +async def test_scout_closure_skips_pause_when_no_services_detected(): + """If the LLM judges no credentials needed, the wrapper short-circuits.""" + reasoner = AsyncMock( + return_value=ScoutResult( + detected_services=[], + ask_user_form=None, + summary="No third-party credentials needed.", + ) + ) + app = _silent_app() + + parsed = await run_with_ask_user( + reasoner_fn=reasoner, + reasoner_kwargs={"prior_user_responses": []}, + app=app, + hax_client=MagicMock(), + budget=AskUserBudget(remaining=3), + ) + + assert parsed.scoped_credentials == {} + reasoner.assert_awaited_once() + app.pause.assert_not_called() + + +# --------------------------------------------------------------------------- +# Schema serialisation — scoped_credentials must NEVER leak through model_dump +# when excluded. +# --------------------------------------------------------------------------- + + +def test_scout_result_model_dump_can_exclude_scoped_credentials(): + """We exclude the field at the reasoner boundary so it never reaches the + control-plane workflow_execution row.""" + r = ScoutResult( + detected_services=[], + scoped_credentials={"RAILWAY_TOKEN": "secret-do-not-log"}, + summary="ok", + ) + safe = r.model_dump(exclude={"scoped_credentials"}) + assert "scoped_credentials" not in safe + assert safe["summary"] == "ok" + # The full dump still has it (caller's choice). + full = r.model_dump() + assert full["scoped_credentials"] == {"RAILWAY_TOKEN": "secret-do-not-log"}