Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,15 @@ FSWE_GRADER_API_KEY=your-grader-api-key-here
# standard OPENAI_API_KEY is used as a last resort. Useful when
# agent and grader share the same API.
# OPENAI_API_KEY=sk-...


# Inference smoke driver (inference.py)
# Only FSWE_SPACE_URL is required. Pi inside the Space already has the
# FSWE_AGENT_* / FSWE_GRADER_* keys, so inference.py does not need them.

FSWE_SPACE_URL=https://your-space-host.hf.space

# Optional knobs (defaults shown):
# MAX_STEPS=4
# TASK_COUNT=1
# MESSAGE_TIMEOUT=900
86 changes: 86 additions & 0 deletions .github/workflows/validate-inference.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
name: HF Spaces — Validate Inference

# End-to-end smoke: open a WebSocket session to each deployed Space, run a
# real episode through the pi harness (which calls our MCP tools internally),
# and emit the hackathon-standard [START]/[STEP]/[END] log format.
#
# MANUAL TRIGGER ONLY. Each run costs ~$0.15-0.45 in HF Router tokens and
# 3-10 min wall time per Space, so we do NOT chain this off main pushes.
# Run via the Actions UI ("Run workflow") before submission to confirm the
# full agent loop is working against the live Spaces.

on:
workflow_dispatch:
inputs:
tasks:
description: 'Comma-separated task slugs (notebook,postgres,type-checker,libexpat-to-x86asm)'
required: false
default: 'notebook,postgres,type-checker,libexpat-to-x86asm'

permissions:
contents: read

jobs:
inference:
runs-on: ubuntu-latest
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
task: [notebook, postgres, type-checker, libexpat-to-x86asm]
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.13"

- name: Install uv
uses: astral-sh/setup-uv@v5

- name: Install dependencies
run: uv sync

- name: Resolve Space URL
id: url
env:
HF_OWNER: ${{ vars.HF_OWNER }}
TASK: ${{ matrix.task }}
run: |
if [ -z "${HF_OWNER}" ]; then
echo "::error::HF_OWNER is empty. Configure repository variable HF_OWNER."
exit 1
fi
url="https://${HF_OWNER}-frontier-swe-${TASK}.hf.space"
echo "FSWE_SPACE_URL=${url}" >> "$GITHUB_ENV"
echo "Space URL: ${url}"

- name: Wait for Space /health
run: |
delay=10
max_total=300
elapsed=0
while [ "$elapsed" -lt "$max_total" ]; do
code=$(curl -sS -o /dev/null --max-time 15 -w '%{http_code}' "${FSWE_SPACE_URL}/health" || echo 000)
echo "probe ${FSWE_SPACE_URL}/health -> ${code} (elapsed ${elapsed}s)"
if [ "$code" = "200" ]; then exit 0; fi
sleep "$delay"
elapsed=$((elapsed + delay))
done
echo "::error::Space ${FSWE_SPACE_URL} did not become healthy within ${max_total}s"
exit 1

- name: Run inference smoke
env:
PYTHONPATH: ${{ github.workspace }}
# Pi inside the Space holds its own agent + grader credentials via
# secrets propagated through sync-hf-spaces. Inference.py does not
# need to know them — it only drives /reset and /step over WS.
MAX_STEPS: "4"
TASK_COUNT: "1"
# Pi inside the Space runs an internal multi-turn LLM loop on each
# /step. Observed latency: 17s warm, up to 425s under HF Router
# cold-start. 900s gives ~2x headroom over the worst observed.
MESSAGE_TIMEOUT: "900"
run: |
uv run python inference.py
220 changes: 220 additions & 0 deletions inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
"""Frontier SWE OpenEnv — inference smoke driver.

Drives a real LLM-backed episode against a deployed HF Space and emits a
``[START] / [STEP] / [END]`` log format on stdout.

The Space ships a pi harness behind ``/step`` that holds its own LLM
client and runs a multi-turn loop inside the container. This script keeps
a WebSocket session open, sends a natural-language nudge per outer step,
and reads back the resulting observation. One [STEP] line therefore
corresponds to one outer turn that may have triggered several internal
pi/LLM actions; it is not one LLM tool call per [STEP]. Pi is the agent
we train against in production, so this driver mirrors that path rather
than orchestrating an LLM externally.

A successful [END] line means an LLM ran an episode end-to-end against
the live Space and produced a reward. There are no protocol-only or
state-only fallbacks hidden in this script; the workflow's
``Wait for Space /health`` step is a precondition gate, not a substitute.

Env vars
========
FSWE_SPACE_URL (required) live Space URL
TASK_NAME log label (default: parsed from FSWE_SPACE_URL)
BENCHMARK log label (default: frontier-swe-openenv)
MAX_STEPS outer step budget per episode (default: 4)
TASK_COUNT episodes per run (default: 1)
MESSAGE_TIMEOUT WS recv() timeout, seconds (default: 900)
MIN/MAX_SUBMISSION_SCORE open-interval clamps for [END] score
"""

from __future__ import annotations

import asyncio
import os
import re
import sys
import time
import traceback
from typing import Any
from urllib.parse import urlparse

from frontier_swe_env.client import FrontierSweEnv
from frontier_swe_env.models import FrontierSweAction


SPACE_URL = (os.getenv("FSWE_SPACE_URL") or "").rstrip("/")
TASK_NAME = os.getenv("TASK_NAME") or ""
BENCHMARK = os.getenv("BENCHMARK", "frontier-swe-openenv")
MODEL_NAME = os.getenv("FSWE_AGENT_MODEL", "pi-harness")
MAX_STEPS = max(1, int(os.getenv("MAX_STEPS", "4")))
TASK_COUNT = max(1, int(os.getenv("TASK_COUNT", "1")))
MESSAGE_TIMEOUT = float(os.getenv("MESSAGE_TIMEOUT", "900"))
MIN_SUBMISSION_SCORE = float(os.getenv("MIN_SUBMISSION_SCORE", "0.01"))
MAX_SUBMISSION_SCORE = float(os.getenv("MAX_SUBMISSION_SCORE", "0.99"))

# Default per-step nudge — pi reads this and decides what tools to call.
NUDGE = (
"Make incremental progress on the task. "
"If you have not submitted a plan yet, call submit_plan with one or two "
"small subtasks now. Otherwise, call submit_subtask on the current "
"subtask to record progress. Then call get_status. "
"Keep responses brief; do not edit large amounts of code."
)


def _single_line(value: Any) -> str:
return re.sub(r"\s+", " ", str(value)).strip()


def _clamp_open(score: float) -> float:
"""Clamp to the open interval (0, 1) per hackathon submission spec."""
lo = max(0.01, min(MIN_SUBMISSION_SCORE, MAX_SUBMISSION_SCORE))
hi = min(0.99, max(MIN_SUBMISSION_SCORE, MAX_SUBMISSION_SCORE))
if hi <= lo:
lo, hi = 0.01, 0.99
return min(max(float(score), lo), hi)


def log_start(task: str, env_label: str, model: str) -> None:
print(
f"[START] task={_single_line(task)} env={_single_line(env_label)} "
f"model={_single_line(model)}",
flush=True,
)


def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None:
err_val = _single_line(error) if error else "null"
print(
f"[STEP] step={step} action={_single_line(action)} reward={reward:.2f} "
f"done={str(done).lower()} error={err_val}",
flush=True,
)


def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(
f"[END] success={str(success).lower()} steps={steps} "
f"score={_clamp_open(score):.2f} rewards={rewards_str}",
flush=True,
)


def _infer_task_label(space_url: str) -> str:
"""Pull the task slug from the Space hostname.

Matches ``<owner>-frontier-swe-<task>.hf.space`` and returns ``<task>``.
"""
if TASK_NAME:
return TASK_NAME
host = urlparse(space_url).hostname or ""
m = re.match(r"[^-]+-frontier-swe-(.+)\.hf\.space$", host)
return m.group(1) if m else host or "unknown"


def _episode_score(obs: Any, frozen_scores: dict[str, float], rewards: list[float]) -> float:
"""Pick the most informative score signal from the final observation.

Order of preference:
1. ``observation.episode_reward`` (set on done=True for full episodes)
2. mean of ``observation.frozen_scores`` values (post-submit_subtask)
3. last per-step reward
4. 0.0
"""
ep = getattr(obs, "episode_reward", None)
if ep is not None:
return float(ep)
if frozen_scores:
return sum(frozen_scores.values()) / len(frozen_scores)
if rewards:
return rewards[-1]
return 0.0


async def run_episode(env: FrontierSweEnv, episode_idx: int) -> tuple[bool, int, float, list[float]]:
rewards: list[float] = []
last_obs: Any = None
final_done = False

reset_result = await env.reset()
last_obs = reset_result.observation

for step in range(1, MAX_STEPS + 1):
t0 = time.time()
result = await env.step(FrontierSweAction(message=NUDGE))
elapsed = time.time() - t0

obs = result.observation
last_obs = obs
reward = float(result.reward or 0.0)
rewards.append(reward)

action_summary = (
f'phase={obs.phase} '
f'subtask={getattr(obs, "current_subtask", None)} '
f'plan_score={getattr(obs, "plan_score", None)} '
f'elapsed={elapsed:.1f}s'
)
log_step(
step=step,
action=action_summary,
reward=reward,
done=result.done,
error=None,
)

if result.done:
final_done = True
break

frozen = getattr(last_obs, "frozen_scores", {}) or {}
score = _episode_score(last_obs, frozen, rewards)
success = score > 0.0 or bool(frozen)
return success, len(rewards), score, rewards


async def async_main() -> None:
if not SPACE_URL:
raise SystemExit("FSWE_SPACE_URL must be set to the live Space URL")

task_label = _infer_task_label(SPACE_URL)
print(
f"[PREFLIGHT] space={SPACE_URL} task={task_label} "
f"max_steps={MAX_STEPS} task_count={TASK_COUNT} "
f"message_timeout_s={MESSAGE_TIMEOUT}",
flush=True,
)
caught: Exception | None = None

try:
async with FrontierSweEnv(
base_url=SPACE_URL,
message_timeout_s=MESSAGE_TIMEOUT,
) as env:
for ep_idx in range(1, TASK_COUNT + 1):
run_label = f"{task_label}:run{ep_idx}"
log_start(task=run_label, env_label=BENCHMARK, model=MODEL_NAME)
success, steps, score, rewards = await run_episode(env, ep_idx)
log_end(success=success, steps=steps, score=score, rewards=rewards)
except Exception as exc:
caught = exc
print(
f"[ERROR] type={type(exc).__name__} message={exc}",
file=sys.stderr,
flush=True,
)
print(f"[ERROR] FSWE_SPACE_URL={SPACE_URL}", file=sys.stderr, flush=True)
traceback.print_exc(file=sys.stderr)

if caught is not None:
raise SystemExit(1) from caught


def main() -> None:
asyncio.run(async_main())


if __name__ == "__main__":
main()
Loading