Skip to content

Commit 03bf20c

Browse files
authored
[autorevert] implement actions layer and logging (#7169)
This pull request: * introduces a final "Signal Actions" layer (responsible for executing side effects of processed Signals, like restarts and reverts) * changes the main entry point for the PyTorch auto-revert Lambda to use the new signals-based autorevert flow by default. * for observability, two CH tables are added: * `autorevert_events_v2` * `autorevert_state` See [the spec](https://github.com/pytorch/test-infra/blob/ff2645443aafb0209d7f546302a5c09d8243cb31/aws/lambda/pytorch-auto-revert/SIGNAL_ACTIONS.md) for more details. ### Testing Tested locally (only restart & state logging): ``` HOURS=18 WORKFLOWS=Lint,trunk,pull,inductor python -m pytorch_auto_revert INFO:root:[v2] Start: workflows=Lint,trunk,pull,inductor hours=18 repo=pytorch/pytorch dry_run=False INFO:root:[v2] Run timestamp (CH log ts) = 2025-09-16T15:51:18.656175 INFO:pytorch_auto_revert.signal_extraction_datasource:[extract] Fetching jobs: repo=pytorch/pytorch workflows=Lint,trunk,pull,inductor lookback=18h INFO:pytorch_auto_revert.signal_extraction_datasource:[extract] Jobs fetched: 6738 rows in 45.70s INFO:pytorch_auto_revert.signal_extraction_datasource:[extract] Fetching tests for 414 job_ids (20 failed jobs) in batches INFO:pytorch_auto_revert.signal_extraction_datasource:[extract] Test batch 1/1 (size=414) INFO:pytorch_auto_revert.signal_extraction_datasource:[extract] Tests fetched: 231 rows for 414 job_ids in 1.95s INFO:root:[v2] Extracted 19 signals INFO:root:[v2][signal] wf=trunk key=inductor/test_cudagraph_trees.py::test_graph_partition outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=test_transformers.py::test_fused_sdp_priority_order_use_compile_False_cuda outcome=Ineligible(reason=<IneligibleReason.NO_SUCCESSES: 'no_successes'>, message='no successful commits present in window') INFO:root:[v2][signal] wf=trunk key=export/test_hop.py::test_retrace_export_local_map_hop_simple_cuda_float32 outcome=Ineligible(reason=<IneligibleReason.NO_SUCCESSES: 'no_successes'>, message='no successful commits present in window') INFO:root:[v2][signal] wf=trunk key=inductor/test_cudagraph_trees_expandable_segments.py::test_forward_backward_not_called_backend_inductor outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=export/test_hop.py::test_pre_dispatch_export_local_map_hop_simple_cuda_float32 outcome=Ineligible(reason=<IneligibleReason.NO_SUCCESSES: 'no_successes'>, message='no successful commits present in window') INFO:root:[v2][signal] wf=trunk key=export/test_hop.py::test_serialize_export_local_map_hop_simple_cuda_float32 outcome=Ineligible(reason=<IneligibleReason.NO_SUCCESSES: 'no_successes'>, message='no successful commits present in window') INFO:root:[v2][signal] wf=trunk key=export/test_hop.py::test_aot_export_local_map_hop_simple_cuda_float32 outcome=Ineligible(reason=<IneligibleReason.NO_SUCCESSES: 'no_successes'>, message='no successful commits present in window') INFO:root:[v2][signal] wf=trunk key=inductor/test_cudagraph_trees_expandable_segments.py::test_graph_partition outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=inductor/test_cudagraph_trees.py::test_forward_backward_not_called_backend_inductor outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=distributed/tensor/debug/test_debug_mode.py::test_debug_mode_backward outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=Lint key=lintrunner-noclang / linux-job outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=pull key=linux-jammy-py3.10-clang12 / test outcome=Ineligible(reason=<IneligibleReason.FLAKY: 'flaky'>, message='signal is flaky (mixed outcomes on same commit)') INFO:root:[v2][signal] wf=trunk key=win-vs2022-cpu-py3 / test outcome=Ineligible(reason=<IneligibleReason.FLAKY: 'flaky'>, message='signal is flaky (mixed outcomes on same commit)') INFO:root:[v2][signal] wf=inductor key=unit-test / inductor-test / test outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=win-vs2022-cpu-py3 / build outcome=RestartCommits(commit_shas={'814338826e0b5cd065f8278c4b9487f13e16a5c7'}) INFO:root:[v2][signal] wf=inductor key=inductor-cpu-test / test outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=trunk key=win-vs2022-cuda12.6-py3 / build outcome=RestartCommits(commit_shas={'814338826e0b5cd065f8278c4b9487f13e16a5c7'}) INFO:root:[v2][signal] wf=inductor key=unit-test / inductor-cpu-build / build outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2][signal] wf=pull key=linux-jammy-py3.13-clang12 / test outcome=Ineligible(reason=<IneligibleReason.FIXED: 'fixed'>, message='signal appears recovered at head') INFO:root:[v2] Candidate action groups: 1 INFO:root:[v2][action] preparing to execute ActionGroup(type='restart', commit_sha='814338826e0b5cd065f8278c4b9487f13e16a5c7', workflow_target='trunk', sources=[SignalMetadata(workflow_name='trunk', key='win-vs2022-cpu-py3 / build'), SignalMetadata(workflow_name='trunk', key='win-vs2022-cuda12.6-py3 / build')]) INFO:root:[v2][action] restart: skipping pacing (delta_sec=-24852) INFO:root:[v2] Executed action groups: 0 INFO:root:[v2] State logged ```
1 parent e181662 commit 03bf20c

File tree

10 files changed

+806
-8
lines changed

10 files changed

+806
-8
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Signal Actions Layer
2+
3+
This document specifies the Actions layer that consumes extracted Signals with their processing outcomes, decides what to do (restart/revert/none), executes allowed actions, and logs actions. Logging the full state of the run is handled by a separate run‑state logger module (see Interfaces).
4+
5+
## Overview
6+
7+
- Inputs (provided by integration code):
8+
- Run parameters: `repo_full_name`, `workflows`, `lookback_hours`, `dry_run`.
9+
- A list of pairs: `List[Tuple[Signal, SignalProcOutcome]]`, where `SignalProcOutcome = Union[AutorevertPattern, RestartCommits, Ineligible]`.
10+
- Decisions: per-signal outcome mapped to a concrete action:
11+
- `AutorevertPattern` → record a global revert intent for the suspected commit
12+
- `RestartCommits` → restart workflow(s) for specific `(workflow, commit)` pairs
13+
- `Ineligible` → no action
14+
- Side effects: workflow restarts (non-dry-run only); append-only logging of actions in ClickHouse.
15+
- Idempotence and dedup: enforced via ClickHouse lookups before acting and logging.
16+
17+
## Run Context
18+
19+
Immutable run-scoped metadata shared by all actions in the same run:
20+
21+
- `ts`: DateTime captured at run start; identical across all rows inserted for the run
22+
- `repo_full_name`: e.g., `pytorch/pytorch`
23+
- `workflows`: list of workflow display names
24+
- `lookback_hours`: window used for extraction
25+
- `dry_run`: bool
26+
27+
## Action Semantics
28+
29+
- `revert` (record-only):
30+
- Scope: global per `commit_sha` across all workflows and signals
31+
- Dedup: if a non-dry-run `revert` exists for the same `repo` and `commit_sha`, do not log another
32+
33+
- `restart` (execute + log):
34+
- Scope: per `(workflow, commit_sha)` pair
35+
- Caps: up to 2 non-dry-run restarts total for the pair
36+
- Pacing: skip if the most recent non-dry-run restart was within 15 minutes before `ts`
37+
- No extra GitHub-side “already restarted” guard; rely on ClickHouse logs for dedup/caps
38+
39+
- `none`:
40+
- Not logged in `autorevert_events_v2` (only actions taken are logged)
41+
42+
- Multiple signals targeting same workflow/commit are coalesced in-memory, then deduped again via ClickHouse checks.
43+
- Dry-run behavior:
44+
- Simulate restarts (no dispatch), log actions with `dry_run=1`
45+
- Dry-run rows do not count toward caps/pacing or revert dedup criteria
46+
47+
## ClickHouse Logging
48+
49+
Two tables, sharing the same `ts` per CLI/lambda run.
50+
51+
### `autorevert_events_v2`
52+
53+
- Purpose: record actions taken during a run & dedup/cap against prior actions
54+
- Notable columns:
55+
- `ts` DateTime — run timestamp
56+
- `workflows` Array(String) — workflows involved in this action
57+
- restart: a single-element array with the target workflow
58+
- revert: one or more workflows whose signals contributed
59+
- `source_signal_keys` Array(String) — signal keys that contributed to this action
60+
- `failed` UInt8 DEFAULT 0 — marks a failed attempt (e.g., restart dispatch failed)
61+
- `notes` String DEFAULT '' — optional free-form metadata
62+
63+
### `autorevert_state` (separate module)
64+
65+
- Purpose: persist the HUD-like state for the whole run for auditability
66+
- Notable columns:
67+
- `ts` DateTime — run timestamp (matches `autorevert_events_v2.ts`)
68+
- `state` String — JSON-encoded model of the HUD grid and outcomes
69+
- `params` String DEFAULT '' — optional, free-form
70+
71+
## Processing Flow
72+
73+
1. Create `RunContext` and capture `ts` at start (integration).
74+
2. Provide the Actions layer with: `(run params, List[Tuple[Signal, SignalProcOutcome]])`.
75+
3. Transform and group the list into coalesced action groups (reusable method):
76+
- Revert groups: `(action=revert, commit_sha, sources: List[SignalMetadata(workflow, key)])`
77+
- Restart groups: `(action=restart, commit_sha, workflow_target, sources: List[SignalMetadata(workflow, key)])`
78+
4. For each group, consult `autorevert_events_v2` (non-dry-run rows) to enforce dedup rules:
79+
- Reverts: skip if any prior recorded `revert` exists for `commit_sha`
80+
- Restarts: skip if ≥2 prior restarts exist for `(workflow_target, commit_sha)`; skip if the latest is within 15 minutes of `ts`
81+
5. Execute eligible actions:
82+
- Restart: if not `dry_run`, dispatch and capture success/failure in `notes`
83+
- Revert: record only
84+
6. Insert one `autorevert_events_v2` row per executed group with aggregated `workflows` and `source_signal_keys` (dry-run rows use `dry_run=1`).
85+
7. Separately (integration), build the full run state and call the run‑state logger to write a single `autorevert_state` row with the same `ts`.

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .clickhouse_client_helper import CHCliFactory
1111
from .github_client_helper import GHClientFactory
1212
from .testers.autorevert import autorevert_checker
13+
from .testers.autorevert_v2 import autorevert_v2
1314
from .testers.hud import run_hud
1415
from .testers.restart_checker import workflow_restart_checker
1516

@@ -71,9 +72,10 @@ def get_opts() -> argparse.Namespace:
7172
# no subcommand runs the lambda flow
7273
subparsers = parser.add_subparsers(dest="subcommand")
7374

74-
# autorevert-checker subcommand
75+
# autorevert-checker subcommand (new default; legacy behind a flag)
7576
workflow_parser = subparsers.add_parser(
76-
"autorevert-checker", help="Analyze workflows looking for autorevert patterns"
77+
"autorevert-checker",
78+
help="Analyze workflows for autorevert using Signals (default), or legacy via flag",
7779
)
7880
workflow_parser.add_argument(
7981
"workflows",
@@ -84,6 +86,11 @@ def get_opts() -> argparse.Namespace:
8486
workflow_parser.add_argument(
8587
"--hours", type=int, default=48, help="Lookback window in hours (default: 48)"
8688
)
89+
workflow_parser.add_argument(
90+
"--repo-full-name",
91+
default=os.environ.get("REPO_FULL_NAME", "pytorch/pytorch"),
92+
help="Full repo name to filter by (owner/repo).",
93+
)
8794
workflow_parser.add_argument(
8895
"--verbose",
8996
"-v",
@@ -105,6 +112,11 @@ def get_opts() -> argparse.Namespace:
105112
action="store_true",
106113
help="Ignore common errors in autorevert patterns (e.g., 'No tests found')",
107114
)
115+
workflow_parser.add_argument(
116+
"--legacy-autorevert",
117+
action="store_true",
118+
help="Run the legacy autorevert behavior instead of the new Signals-based flow",
119+
)
108120

109121
# workflow-restart-checker subcommand
110122
workflow_restart_parser = subparsers.add_parser(
@@ -189,15 +201,37 @@ def main(*args, **kwargs) -> None:
189201
)
190202

191203
if opts.subcommand is None:
192-
autorevert_checker(
204+
# New default without subcommand: run v2 using env defaults
205+
autorevert_v2(
193206
os.environ.get("WORKFLOWS", "Lint,trunk,pull,inductor").split(","),
194-
do_restart=True,
195-
do_revert=True,
196207
hours=int(os.environ.get("HOURS", 16)),
197-
verbose=True,
208+
repo_full_name=os.environ.get("REPO_FULL_NAME", "pytorch/pytorch"),
198209
dry_run=opts.dry_run,
199-
ignore_common_errors=True,
210+
do_restart=True,
211+
do_revert=True,
200212
)
213+
elif opts.subcommand == "autorevert-checker":
214+
if getattr(opts, "legacy_autorevert", False):
215+
# Legacy behavior behind flag
216+
autorevert_checker(
217+
opts.workflows,
218+
do_restart=opts.do_restart,
219+
do_revert=opts.do_revert,
220+
hours=opts.hours,
221+
verbose=opts.verbose,
222+
dry_run=opts.dry_run,
223+
ignore_common_errors=opts.ignore_common_errors,
224+
)
225+
else:
226+
# New default behavior under the same subcommand
227+
autorevert_v2(
228+
opts.workflows,
229+
hours=opts.hours,
230+
repo_full_name=opts.repo_full_name,
231+
dry_run=opts.dry_run,
232+
do_restart=opts.do_restart,
233+
do_revert=opts.do_revert,
234+
)
201235
elif opts.subcommand == "autorevert-checker":
202236
autorevert_checker(
203237
opts.workflows,

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/clickhouse_client_helper.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import threading
3+
from datetime import datetime, timezone
34

45
import clickhouse_connect
56

@@ -85,3 +86,17 @@ def connection_test(self) -> bool:
8586
except Exception as e:
8687
self._logger.warning(f"Connection test failed: {e}")
8788
return False
89+
90+
91+
# ---- Utilities ----
92+
def ensure_utc_datetime(dt: datetime) -> datetime:
93+
"""Coerce a datetime to timezone-aware UTC.
94+
95+
- If naive, assume it represents UTC and set tzinfo accordingly
96+
- If aware, convert to UTC
97+
"""
98+
if not isinstance(dt, datetime):
99+
return dt # type: ignore[return-value]
100+
if dt.tzinfo is None:
101+
return dt.replace(tzinfo=timezone.utc)
102+
return dt.astimezone(timezone.utc)
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from typing import Dict, Iterable, List, Tuple, Union
5+
6+
from .clickhouse_client_helper import CHCliFactory
7+
from .signal import AutorevertPattern, Ineligible, RestartCommits, Signal
8+
from .signal_extraction_types import RunContext
9+
10+
11+
SignalProcOutcome = Union[AutorevertPattern, RestartCommits, Ineligible]
12+
13+
14+
class RunStateLogger:
15+
"""Serialize the run’s HUD-like state and insert a single row into misc.autorevert_state.
16+
17+
The state JSON captures:
18+
- commits (newest→older) and minimal started_at timestamps per commit
19+
- per-signal columns with outcome, human notes, ineligible details, and per-commit events
20+
- run metadata (repo, workflows, lookback_hours, ts, dry_run)
21+
"""
22+
23+
def _build_state_json(
24+
self,
25+
*,
26+
repo: str,
27+
ctx: RunContext,
28+
pairs: Iterable[Tuple[Signal, SignalProcOutcome]],
29+
) -> str:
30+
"""Build a compact JSON string describing the run’s HUD-like grid and outcomes."""
31+
pairs_list = list(pairs)
32+
signals: List[Signal] = [s for s, _ in pairs_list]
33+
34+
# Collect commit order (newest → older) across signals
35+
commits: List[str] = []
36+
seen = set()
37+
for s in signals:
38+
for c in s.commits:
39+
if c.head_sha not in seen:
40+
seen.add(c.head_sha)
41+
commits.append(c.head_sha)
42+
43+
# Compute minimal started_at per commit (for timestamp context)
44+
commit_times: Dict[str, str] = {}
45+
for sha in commits:
46+
tmin_iso: str | None = None
47+
for s in signals:
48+
# find commit in this signal
49+
sc = next((cc for cc in s.commits if cc.head_sha == sha), None)
50+
if not sc or not sc.events:
51+
continue
52+
# events are sorted oldest first
53+
t = sc.events[0].started_at
54+
ts_iso = t.isoformat()
55+
if tmin_iso is None or ts_iso < tmin_iso:
56+
tmin_iso = ts_iso
57+
if tmin_iso is not None:
58+
commit_times[sha] = tmin_iso
59+
60+
# Build columns with outcomes, notes, and per-commit events
61+
cols = []
62+
for sig, outcome in pairs_list:
63+
if isinstance(outcome, AutorevertPattern):
64+
oc = "revert"
65+
note = (
66+
f"Pattern: newer fail {len(outcome.newer_failing_commits)}; "
67+
f"suspect {outcome.suspected_commit[:7]} vs baseline {outcome.older_successful_commit[:7]}"
68+
)
69+
ineligible = None
70+
elif isinstance(outcome, RestartCommits):
71+
oc = "restart"
72+
if outcome.commit_shas:
73+
short = ", ".join(sorted(s[:7] for s in outcome.commit_shas))
74+
note = f"Suggest restart: {short}"
75+
else:
76+
note = "Suggest restart: <none>"
77+
ineligible = None
78+
else:
79+
oc = "ineligible"
80+
note = f"Ineligible: {outcome.reason.value}"
81+
if outcome.message:
82+
note += f" — {outcome.message}"
83+
ineligible = {
84+
"reason": outcome.reason.value,
85+
"message": outcome.message,
86+
}
87+
88+
# Per-commit events for this signal
89+
cells: Dict[str, List[Dict]] = {}
90+
for c in sig.commits:
91+
evs = []
92+
for e in c.events:
93+
ev = {
94+
"status": e.status.value,
95+
"started_at": e.started_at.isoformat(),
96+
"name": e.name,
97+
}
98+
if e.ended_at:
99+
ev["ended_at"] = e.ended_at.isoformat()
100+
evs.append(ev)
101+
if evs:
102+
cells[c.head_sha] = evs
103+
104+
col = {
105+
"workflow": sig.workflow_name,
106+
"key": sig.key,
107+
"outcome": oc,
108+
"note": note,
109+
"cells": cells,
110+
}
111+
if ineligible is not None:
112+
col["ineligible"] = ineligible
113+
cols.append(col)
114+
115+
doc = {
116+
"commits": commits,
117+
"commit_times": commit_times,
118+
"columns": cols,
119+
"meta": {
120+
"repo": repo,
121+
"workflows": ctx.workflows,
122+
"lookback_hours": ctx.lookback_hours,
123+
"ts": ctx.ts.isoformat(),
124+
"dry_run": ctx.dry_run,
125+
},
126+
}
127+
return json.dumps(doc, separators=(",", ":"))
128+
129+
def insert_state(
130+
self,
131+
*,
132+
ctx: RunContext,
133+
pairs: Iterable[Tuple[Signal, SignalProcOutcome]],
134+
params: str = "",
135+
) -> None:
136+
"""Insert one state row into misc.autorevert_state for this run context."""
137+
state_json = self._build_state_json(
138+
repo=ctx.repo_full_name, ctx=ctx, pairs=list(pairs)
139+
)
140+
cols = [
141+
"ts",
142+
"repo",
143+
"state",
144+
"dry_run",
145+
"workflows",
146+
"lookback_hours",
147+
"params",
148+
]
149+
data = [
150+
[
151+
ctx.ts,
152+
ctx.repo_full_name,
153+
state_json,
154+
1 if ctx.dry_run else 0,
155+
ctx.workflows,
156+
int(ctx.lookback_hours),
157+
params or "",
158+
]
159+
]
160+
CHCliFactory().client.insert(
161+
table="autorevert_state", data=data, column_names=cols, database="misc"
162+
)

0 commit comments

Comments
 (0)