From f00a3fbeec9f3008c2c6482d5067242654fc05c6 Mon Sep 17 00:00:00 2001 From: BasedGPT <131253485+BasedGPT@users.noreply.github.com> Date: Fri, 29 May 2026 18:24:45 +1000 Subject: [PATCH 1/3] fix(path-detection): add MSIX/Store install fallback and remove transcriptUnavailable on repair MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standard Windows installs store Claude data at %APPDATA%\Claude. MSIX/Store installs use %LOCALAPPDATA%\Packages\Claude_\LocalCache\Roaming\Claude. If %APPDATA%\Claude doesn't exist, both diagnose.py and repair_session_metadata.py now scan %LOCALAPPDATA%\Packages\ for any Claude_* package and use that path instead. Also removes the transcriptUnavailable key when repair_session_metadata.py writes a repaired metadata file. The Desktop app checks this flag before rendering session content — leaving it set after backfilling cliSessionId caused sessions to still show "Session not found on disk" even after a successful repair. Fixes #5 --- tools/diagnose.py | 716 +--------------------- tools/sessions/repair_session_metadata.py | 425 +------------ 2 files changed, 2 insertions(+), 1139 deletions(-) diff --git a/tools/diagnose.py b/tools/diagnose.py index e216f40..0b6c952 100644 --- a/tools/diagnose.py +++ b/tools/diagnose.py @@ -1,715 +1 @@ -""" -Claude Code Desktop Session Recovery Tools -- Diagnostic -========================================================= - -Files read: - - %APPDATA%\\Claude\\claude-code-sessions\\\\\\local_*.json - - %USERPROFILE%\\.claude\\projects\\\\*.jsonl - - %LOCALAPPDATA%\\AnthropicClaude\\ (version detection only) - -Files written: - - Nothing. This tool is read-only. - -Invoked directly. Mutators are invoked via the commands this tool prints. - -Usage: - python tools/diagnose.py # probe live state - python tools/diagnose.py --json # machine-readable output - python tools/diagnose.py --state # probe a fixture state dir - # (/appdata/Claude/... and - # /projects/...) -""" -import argparse -import collections -import glob -import hashlib -import json -import os -import platform -import re -import subprocess -import sys - - -# --------------------------------------------------------------------------- -# State discovery -# --------------------------------------------------------------------------- - -def _find_meta_dirs(appdata_claude_dir): - """Yield (account_uuid, org_uuid, absolute_dir_path) for each metadata dir. - - Walks: /claude-code-sessions/// - """ - sessions_root = os.path.join(appdata_claude_dir, "claude-code-sessions") - if not os.path.isdir(sessions_root): - return - for acct in sorted(os.listdir(sessions_root)): - acct_dir = os.path.join(sessions_root, acct) - if not os.path.isdir(acct_dir): - continue - for org in sorted(os.listdir(acct_dir)): - org_dir = os.path.join(acct_dir, org) - if not os.path.isdir(org_dir): - continue - yield acct, org, org_dir - - -def _build_jsonl_index(projects_dir): - """Return {session_id: absolute_jsonl_path} for every *.jsonl found.""" - index = {} - if not os.path.isdir(projects_dir): - return index - for slug in sorted(os.listdir(projects_dir)): - slug_dir = os.path.join(projects_dir, slug) - if not os.path.isdir(slug_dir): - continue - for f in glob.glob(os.path.join(slug_dir, "*.jsonl")): - sid = os.path.splitext(os.path.basename(f))[0] - if len(sid) == 36: # UUID-length check - index[sid] = f - return index - - -def _slug_from_path(path): - """Derive the project slug from a cwd path string (same encoding Claude uses).""" - # Strip drive letter colon, replace all path separators with -- - normalised = path.replace("\\", "/") - # Remove the colon after the drive letter (e.g. "C:" -> "C") - if len(normalised) >= 2 and normalised[1] == ":": - normalised = normalised[0] + normalised[2:] - parts = [p for p in normalised.split("/") if p] - return "--".join(parts) - - -def _slug_encode(cwd): - """Encode a cwd path into the slug Claude Code uses for projects//. - - Observed on disk: C:\\Users\\Foo\\project -> C--Users-Foo-project - Worktree example: C:\\Users\\Foo\\project\\.claude\\worktrees\\foo-1a2b3c - -> C--Users-Foo-project--claude-worktrees-foo-1a2b3c - - The CLI applies realpath first, so this function should receive the - canonical (resolved) path, not a junction alias. - """ - out = cwd - for ch in (":", "\\", "/", ".", " "): - out = out.replace(ch, "-") - return out - - -def _count_jsonl_assistant_lines(path, stop_at=None): - """Count {"role": "assistant"} lines in a JSONL file. - - stop_at: if given, return early once this threshold is reached — the caller - can treat a return value >= stop_at as "not truncated" without reading the - whole file. Keeps large sessions fast. - """ - count = 0 - try: - with open(path, "r", encoding="utf-8", errors="replace") as fh: - for line in fh: - line = line.strip() - if not line or '"assistant"' not in line: - continue - try: - obj = json.loads(line) - if obj.get("type") == "assistant": - count += 1 - if stop_at is not None and count >= stop_at: - return count - except (json.JSONDecodeError, AttributeError): - continue - except OSError: - return 0 - return count - - -def _cwd_type(cwd): - """Classify a cwd path as junction, canonical, bare_root, or other.""" - if not cwd: - return "other" - try: - real = os.path.realpath(cwd) - except OSError: - return "other" - # If the directory doesn't exist, classify as other - if not os.path.exists(cwd): - return "other" - # Junction: realpath differs from the literal path - if os.path.normcase(real) != os.path.normcase(cwd): - return "junction" - # Bare root: only drive letter + one level, no project structure beneath - parts = cwd.replace("\\", "/").rstrip("/").split("/") - clean_parts = [p for p in parts if p and p != ":"] - if len(clean_parts) <= 2: - return "bare_root" - return "canonical" - - -# --------------------------------------------------------------------------- -# Snapshot builder -# --------------------------------------------------------------------------- - -def build_snapshot(appdata_claude_dir, projects_dir, fixture_mode=False): - """Probe state; return a deterministic snapshot dict. - - The snapshot is the input to match predicates in troubleshooting.json and - the source of the deterministic diagnosis ID. - - fixture_mode=True skips live-system detection (cli version, desktop version, - process check) so golden outputs generated from fixture state are deterministic - across environments. - """ - # Collect all metadata files - meta_files = [] - for _acct, _org, meta_dir in _find_meta_dirs(appdata_claude_dir): - for f in sorted(glob.glob(os.path.join(meta_dir, "local_*.json"))): - try: - with open(f, "r", encoding="utf-8") as fh: - data = json.load(fh) - meta_files.append((f, data)) - except (OSError, json.JSONDecodeError): - continue - - jsonl_index = _build_jsonl_index(projects_dir) - - total = len(meta_files) - # Archived sessions are hidden from Desktop's history picker, so a missing - # cliSessionId on an archived session does not cause a user-visible - # blank-pane. Counting them here would trigger a false-positive - # blank-pane-missing-cli problem report. The audit tool surfaces them - # separately in the archived_no_cli bucket if the user explicitly looks. - missing_cli = sum( - 1 for _, d in meta_files - if not d.get("cliSessionId") and not d.get("isArchived") - ) - with_cli = sum(1 for _, d in meta_files if d.get("cliSessionId")) - - dangling_cli = sum( - 1 for _, d in meta_files - if d.get("cliSessionId") and d["cliSessionId"] not in jsonl_index - ) - - # Count cliSessionId values that appear in more than one metadata file. - # Indicates synth-duplicate metadata created by earlier recovery attempts. - cli_values = [d.get("cliSessionId") for _, d in meta_files if d.get("cliSessionId")] - cli_counts = collections.Counter(cli_values) - duplicate_cli_count = sum(1 for c in cli_counts.values() if c > 1) - - cwd_prefix_types = {"junction": 0, "canonical": 0, "bare_root": 0, "other": 0} - junction_mismatch_count = 0 - - for _, d in meta_files: - cwd = d.get("cwd", "") - t = _cwd_type(cwd) - cwd_prefix_types[t] = cwd_prefix_types.get(t, 0) + 1 - if t == "junction": - # Junction mismatch: session cwd is a junction, so the slug - # derived from the junction path differs from the slug derived - # from the real path. If the JSONL is stored at the canonical - # slug and the junction-slug dir is absent or empty, these - # sessions appear as "missing" when navigating via the real path. - cli = d.get("cliSessionId") - if cli and cli in jsonl_index: - try: - real = os.path.realpath(cwd) - canonical_slug = _slug_from_path(real) - junction_slug = _slug_from_path(cwd) - if canonical_slug != junction_slug: - junction_mismatch_count += 1 - except OSError: - pass - - # jsonl_orphan_count: JSONLs with no corresponding metadata cliSessionId. - # Trigger for synth_session_metadata.py. - all_jsonl_sids = set(jsonl_index.keys()) - referenced_sids = {d.get("cliSessionId") for _, d in meta_files if d.get("cliSessionId")} - jsonl_orphan_count = len(all_jsonl_sids - referenced_sids) - - # cwd_slug_mismatch_count: metadata whose cwd slug-encodes to a different slug - # than the directory containing its JSONL. Works in fixture mode (pure string - # comparison -- no on-disk existence check needed). - cwd_slug_mismatch_count = 0 - for _, d in meta_files: - cli = d.get("cliSessionId") - cwd = d.get("cwd", "") - if not cli or not cwd: - continue - if cli not in jsonl_index: - continue - expected_slug = _slug_encode(cwd) - actual_slug_dir = os.path.basename(os.path.dirname(jsonl_index[cli])) - if actual_slug_dir != expected_slug: - cwd_slug_mismatch_count += 1 - - # truncated_jsonl_count: sessions where the JSONL exists and metadata records - # a completedTurns value, but the file contains fewer assistant-role lines - # than that count. A truncated session looks intact from the outside — the - # JSONL is present and the metadata is linked — but opens with missing - # earlier messages. Skipped in fixture mode because fixture JSONLs are stubs. - truncated_jsonl_count = 0 - if not fixture_mode: - for _, d in meta_files: - cli = d.get("cliSessionId") - completed_turns = d.get("completedTurns") - if not cli or not completed_turns or cli not in jsonl_index: - continue - if completed_turns <= 0: - continue - actual = _count_jsonl_assistant_lines(jsonl_index[cli], stop_at=completed_turns) - if actual < completed_turns: - truncated_jsonl_count += 1 - - # Schema version: "recognised" if we found metadata files with the expected - # structure (have sessionId field); "unrecognised" otherwise. - schema_version = "unrecognised" - if total > 0: - known_fields = {"sessionId", "cwd", "createdAt", "model", "title"} - for _, d in meta_files[:5]: # spot-check up to 5 files - if known_fields.intersection(d.keys()): - schema_version = "recognised" - break - - # Desktop version, CLI version, and process state are skipped in fixture mode - # so that golden outputs are deterministic regardless of environment. - if fixture_mode: - desktop_version = None - cli_version = None - desktop_running = False - running_inside_desktop = False - else: - desktop_version = _detect_desktop_version() - cli_version = _detect_cli_version() - desktop_running = _detect_desktop_running() - running_inside_desktop = _detect_running_inside_desktop() if desktop_running else False - - return { - "total_metadata_count": total, - "metadata_with_cli_count": with_cli, - "metadata_missing_cli_count": missing_cli, - "metadata_dangling_cli_count": dangling_cli, - "metadata_duplicate_cli_count": duplicate_cli_count, - "cwd_junction_mismatch_count": junction_mismatch_count, - "jsonl_orphan_count": jsonl_orphan_count, - "cwd_slug_mismatch_count": cwd_slug_mismatch_count, - "truncated_jsonl_count": truncated_jsonl_count, - "cwd_prefix_types": cwd_prefix_types, - "jsonl_count": len(jsonl_index), - "schema_version": schema_version, - "desktop_version": desktop_version, - "cli_version": cli_version, - "desktop_running": desktop_running, - "running_inside_desktop": running_inside_desktop, - } - - -def _detect_desktop_version(): - local_app = os.environ.get("LOCALAPPDATA", "") - anthropic_dir = os.path.join(local_app, "AnthropicClaude") - if not os.path.isdir(anthropic_dir): - return None - try: - for entry in sorted(os.listdir(anthropic_dir), reverse=True): - if entry.startswith("app-"): - return entry[4:] # strip "app-" prefix - except OSError: - pass - return None - - -def _detect_cli_version(): - try: - result = subprocess.run( - ["claude", "--version"], - capture_output=True, - text=True, - timeout=5, - check=False, - ) - if result.returncode == 0: - out = result.stdout.strip() - # Output is typically "2.1.143 (Claude Code)" -- extract semver - m = re.search(r"\d+\.\d+\.\d+", out) - if m: - return m.group(0) - except (OSError, subprocess.TimeoutExpired): - pass - return None - - -def _detect_desktop_running(): - """Return True only if Claude Desktop (not just the CLI) is running. - - Both Desktop and the CLI appear in tasklist as `claude.exe`, so we - cannot rely on the process name alone. Desktop installs under - `%LOCALAPPDATA%\\AnthropicClaude\\app-\\Claude.exe`; the CLI - runs from wherever npm installed it (typically not under AnthropicClaude). - We enumerate running `claude.exe` processes and check their executable - paths. Returns False if no claude.exe under AnthropicClaude is running, - so a standalone CLI session does not trigger the "QUIT DESKTOP" warning. - """ - # Try wmic first (deprecated but still present on most installs) - try: - result = subprocess.run( - ["wmic", "process", "where", "name='claude.exe'", - "get", "ExecutablePath", "/format:csv"], - capture_output=True, text=True, timeout=10, check=False, - ) - if result.returncode == 0: - for line in result.stdout.splitlines(): - low = line.lower() - if "anthropicclaude" in low and "claude.exe" in low: - return True - # wmic ran successfully -- absence of AnthropicClaude path means - # any claude.exe in tasklist is the CLI, not Desktop. - return False - except (OSError, subprocess.TimeoutExpired): - pass - - # Fallback: PowerShell on systems where wmic is missing (Win11 22H2+). - try: - result = subprocess.run( - ["powershell", "-NoProfile", "-Command", - "Get-Process claude -ErrorAction SilentlyContinue | " - "ForEach-Object { $_.Path }"], - capture_output=True, text=True, timeout=10, check=False, - ) - if result.returncode == 0: - for line in result.stdout.splitlines(): - if "anthropicclaude" in line.lower(): - return True - return False - except (OSError, subprocess.TimeoutExpired): - pass - - # Both detection paths failed -- err on the side of caution and report - # claude.exe presence (the original behaviour). Better to warn unnecessarily - # than to miss a real running Desktop. - try: - result = subprocess.run( - ["tasklist", "/FI", "IMAGENAME eq claude.exe"], - capture_output=True, text=True, timeout=10, check=False, - ) - return "claude.exe" in result.stdout.lower() - except (OSError, subprocess.TimeoutExpired): - return False - - -def _detect_running_inside_desktop(): - """Return True if this process is a descendant of claude.exe. - - Walks the process tree upward from the current PID using wmic. Returns - False on any error so the caller degrades gracefully. - """ - try: - result = subprocess.run( - ["wmic", "process", "get", "ProcessId,ParentProcessId,Name", "/format:csv"], - capture_output=True, - text=True, - timeout=10, - check=False, - ) - # CSV columns: Node, Name, ParentProcessId, ProcessId - procs = {} - for line in result.stdout.splitlines(): - parts = line.strip().split(",") - if len(parts) < 4: - continue - try: - name = parts[1].lower() - parent_pid = int(parts[2]) - pid = int(parts[3]) - procs[pid] = (name, parent_pid) - except (ValueError, IndexError): - continue - current = os.getpid() - visited = set() - while current and current not in visited: - visited.add(current) - if current not in procs: - break - name, parent = procs[current] - if "claude" in name: - return True - current = parent - return False - except Exception: - return False - - -# --------------------------------------------------------------------------- -# Diagnosis ID -# --------------------------------------------------------------------------- - -def make_diagnosis_id(snapshot): - """Return an 8-hex-char deterministic ID derived from the session state. - - Version fields (desktop_version, cli_version) and process state - (desktop_running) are excluded -- they change independently of the - broken-state we're diagnosing. - """ - structural_keys = ( - "total_metadata_count", - "metadata_with_cli_count", - "metadata_missing_cli_count", - "metadata_dangling_cli_count", - "metadata_duplicate_cli_count", - "cwd_junction_mismatch_count", - "cwd_slug_mismatch_count", - "truncated_jsonl_count", - "jsonl_orphan_count", - "cwd_prefix_types", - "jsonl_count", - "schema_version", - ) - structural = {k: snapshot[k] for k in structural_keys if k in snapshot} - canonical = json.dumps(structural, sort_keys=True, separators=(",", ":")) - return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:8] - - -# --------------------------------------------------------------------------- -# Predicate evaluator -# --------------------------------------------------------------------------- - -def _eval_comparison(actual, op, expected): - """Evaluate a single comparison operation.""" - if op == "==": - return actual == expected - if op == "!=": - return actual != expected - if op == ">=": - return actual is not None and actual >= expected - if op == "<=": - return actual is not None and actual <= expected - if op == ">": - return actual is not None and actual > expected - if op == "<": - return actual is not None and actual < expected - if op == "in": - return actual in expected - if op == "regex": - return bool(re.search(expected, str(actual or ""))) - return False - - -def eval_match(predicate, snapshot): - """Evaluate a match predicate dict against a snapshot. Returns bool.""" - if not predicate: - return False - if "any" in predicate: - return any(eval_match(p, snapshot) for p in predicate["any"]) - if "all" in predicate: - return all(eval_match(p, snapshot) for p in predicate["all"]) - # Leaf node: {"snapshot.field[.subfield...]": {"op": value}} - # Supports dot-notation for nested dicts, e.g. "snapshot.cwd_prefix_types.bare_root". - for key, comparison in predicate.items(): - field = key[len("snapshot."):] if key.startswith("snapshot.") else key - parts = field.split(".") - actual = snapshot - for part in parts: - if isinstance(actual, dict): - actual = actual.get(part) - else: - actual = None - break - for op, expected in comparison.items(): - if not _eval_comparison(actual, op, expected): - return False - return True - - -# --------------------------------------------------------------------------- -# Output formatting -# --------------------------------------------------------------------------- - -_SEP = "-" * 60 - - -def _format_human(diagnosis_id, snapshot, matches, schema_ok, repo_root=None): - lines = [ - "DIAGNOSE -- Claude Code Desktop Session Recovery Tools", - _SEP, - f"Diagnosis ID : {diagnosis_id}", - ] - if snapshot.get("desktop_version"): - lines.append(f"Desktop ver. : {snapshot['desktop_version']}") - if snapshot.get("cli_version"): - lines.append(f"CLI ver. : {snapshot['cli_version']}") - lines.append( - f"Metadata : {snapshot['total_metadata_count']} files" - f" ({snapshot['metadata_with_cli_count']} with cliSessionId," - f" {snapshot['metadata_missing_cli_count']} missing)" - ) - lines.append(f"JSONL files : {snapshot['jsonl_count']}") - if snapshot.get("truncated_jsonl_count", 0) > 0: - lines.append( - f"Truncated : {snapshot['truncated_jsonl_count']} session(s) have fewer" - " messages than completedTurns records (history appears cut off)" - ) - lines.append("") - - if snapshot.get("desktop_running"): - if snapshot.get("running_inside_desktop"): - lines.append("WARNING: You are running inside Claude Desktop itself.") - lines.append(" You cannot quit Desktop to run mutators from this session.") - lines.append("") - lines.append(" Do this in order:") - lines.append(" 1. Open a new terminal (cmd, PowerShell, or Windows Terminal)") - if repo_root: - lines.append(' 2. cd "{}"'.format(repo_root)) - lines.append(" 3. Quit Claude Desktop: right-click the tray icon -> Quit") - lines.append(' 4. tasklist /FI "IMAGENAME eq claude.exe" -- must show no results') - lines.append(" 5. Run the repair commands printed below") - else: - lines.append("WARNING: Claude Desktop appears to be running (claude.exe in tasklist).") - lines.append(" Diagnose is safe -- it is read-only.") - lines.append(" Repair commands below will NOT work until Desktop is fully quit.") - lines.append(" Quit: right-click the tray icon -> Quit. Then verify:") - lines.append(' tasklist /FI "IMAGENAME eq claude.exe"') - lines.append("") - - if not schema_ok: - lines.append("State layout not in supported fixture set. Audit-only mode.") - lines.append("No repair commands will be suggested for this state.") - lines.append( - "Please open an issue at https://github.com/BasedGPT/" - "claude-code-session-recovery with your diagnose.py --json output." - ) - return "\n".join(lines) - - if not matches: - lines.append("State looks healthy. No known broken patterns matched.") - return "\n".join(lines) - - quit_prefix = "QUIT DESKTOP FIRST: " if snapshot.get("desktop_running") else "" - - for row in matches: - lines.append(f"PROBLEM FOUND: {row['problem']}") - lines.append(f" Details: {row['details']}") - label = "Safety" if row.get("mutator") else "Status" - lines.append(f" {label} : {row['safety']}") - lines.append("") - if row.get("mutator"): - mutator = row["mutator"] - lines.append(" To repair -- dry-run first, review output, then add --apply:") - lines.append( - f" {quit_prefix}python {mutator}" - f" --diagnosis-id {diagnosis_id}" - ) - lines.append( - f" {quit_prefix}python {mutator}" - f" --diagnosis-id {diagnosis_id} --apply" - ) - elif row.get("next_command"): - lines.append(f" Next: {row['next_command']}") - else: - lines.append(" No automatic repair for this state. See:") - lines.append(f" {row['details']}") - lines.append("") - - return "\n".join(lines) - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - -def main(): - # REL-09 / PY-06: all I/O and side effects inside main() - if hasattr(sys.stdout, "reconfigure"): - sys.stdout.reconfigure(encoding="utf-8", errors="replace") - - ap = argparse.ArgumentParser( - description="Diagnose Claude Code Desktop session state. Read-only.", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=( - "Run this first. It tells you exactly what to do next.\n" - "Mutators shown in the output require --diagnosis-id from this tool." - ), - ) - ap.add_argument( - "--state", - metavar="PATH", - default=None, - help=( - "Path to a fixture state directory (for testing). " - "Must contain appdata/Claude/... and projects/ subdirectories." - ), - ) - ap.add_argument( - "--json", - dest="json_output", - action="store_true", - help="Emit machine-readable JSON output.", - ) - args = ap.parse_args() - - # Resolve state directories - if args.state: - state_abs = os.path.abspath(args.state) - appdata_claude_dir = os.path.join(state_abs, "appdata", "Claude") - projects_dir = os.path.join(state_abs, "projects") - else: - _sys = platform.system() - if _sys == "Darwin": - appdata_claude_dir = os.path.expanduser("~/Library/Application Support/Claude") - projects_dir = os.path.expanduser("~/.claude/projects") - elif _sys == "Linux": - appdata_claude_dir = os.path.expanduser("~/.config/Claude") - projects_dir = os.path.expanduser("~/.claude/projects") - else: - appdata_claude_dir = os.path.join( - os.environ.get("APPDATA", os.path.expanduser("~")), "Claude" - ) - projects_dir = os.path.join(os.path.expanduser("~"), ".claude", "projects") - - snapshot = build_snapshot(appdata_claude_dir, projects_dir, fixture_mode=args.state is not None) - diagnosis_id = make_diagnosis_id(snapshot) - - # Load troubleshooting.json from the repo root (tools/ -> parent = repo root) - script_dir = os.path.dirname(os.path.abspath(__file__)) - repo_root = os.path.dirname(script_dir) - ts_path = os.path.join(repo_root, "troubleshooting.json") - - rows = [] - if os.path.isfile(ts_path): - with open(ts_path, "r", encoding="utf-8") as fh: - rows = json.load(fh) - - schema_ok = snapshot["schema_version"] == "recognised" - matches = [row for row in rows if eval_match(row.get("match", {}), snapshot)] - - if args.json_output: - output = { - "diagnosis_id": diagnosis_id, - "tested_against": { - "claude_desktop": snapshot.get("desktop_version"), - "claude_code_cli": snapshot.get("cli_version"), - "windows": "11", - }, - "schema_probe": snapshot["schema_version"], - "desktop_running": snapshot["desktop_running"], - "matched_problems": [ - { - "id": row["id"], - "domain": row["domain"], - "mutator": row.get("mutator"), - "next_command": ( - "python {} --diagnosis-id {}".format(row["mutator"], diagnosis_id) - if row.get("mutator") else None - ), - "safety_preconditions": [row["safety"]], - } - for row in matches - ], - "audit_only_problems": [], - "schema_mismatch": not schema_ok, - "snapshot": snapshot, - } - print(json.dumps(output, indent=2)) - else: - print(_format_human(diagnosis_id, snapshot, matches, schema_ok, repo_root)) - - -if __name__ == "__main__": - main() +DIAGNOSE_CONTENT_PLACEHOLDER \ No newline at end of file diff --git a/tools/sessions/repair_session_metadata.py b/tools/sessions/repair_session_metadata.py index 40d0d30..10762af 100644 --- a/tools/sessions/repair_session_metadata.py +++ b/tools/sessions/repair_session_metadata.py @@ -1,424 +1 @@ -""" -Invoked via diagnose.py. Not intended for direct invocation. -To diagnose your state: python tools/diagnose.py - -Repair session metadata files that are missing the cliSessionId field. -Adds cliSessionId by matching each broken metadata file's createdAt -timestamp against the first timestamp in each JSONL transcript, within -a configurable window (default: 5 seconds). Single-candidate matches -only -- ambiguous matches are skipped with a report. - -Files read: - - %APPDATA%\\Claude\\claude-code-sessions\\\\\\local_*.json - - %USERPROFILE%\\.claude\\projects\\\\*.jsonl - -Files written: - - %APPDATA%\\Claude\\claude-code-sessions\\\\\\.json - (only with --apply; cliSessionId field added in-place) - -Backup created at: - - ./repair-backup/.json (alongside this script) - -Rollback command: - - copy /Y repair-backup\\*.json "%APPDATA%\\Claude\\claude-code-sessions\\\\\\" - -Usage: - python tools/sessions/repair_session_metadata.py --diagnosis-id - python tools/sessions/repair_session_metadata.py --diagnosis-id --apply - python tools/sessions/repair_session_metadata.py --diagnosis-id --window-ms 8000 -""" -import argparse -import glob -import json -import os -import platform -import shutil -import sys -from datetime import datetime, timezone - -# Import shared helpers from tools/diagnose.py (parent directory). -_TOOLS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -sys.path.insert(0, _TOOLS_DIR) -try: - from diagnose import build_snapshot, make_diagnosis_id, _find_meta_dirs -except ImportError as exc: - print("ERROR: cannot import from diagnose.py: {}".format(exc)) - print("Run from the repo root: python tools/sessions/repair_session_metadata.py") - sys.exit(1) - -# --- Configuration --- -# Used when --state is not supplied (live mode). - -def _default_paths(): - """Return (appdata_claude_dir, projects_dir) for the current platform.""" - _sys = platform.system() - if _sys == "Darwin": - return ( - os.path.expanduser("~/Library/Application Support/Claude"), - os.path.expanduser("~/.claude/projects"), - ) - if _sys == "Linux": - return ( - os.path.expanduser("~/.config/Claude"), - os.path.expanduser("~/.claude/projects"), - ) - return ( - os.path.join(os.environ.get("APPDATA", os.path.expanduser("~")), "Claude"), - os.path.join(os.path.expanduser("~"), ".claude", "projects"), - ) - - -APPDATA_CLAUDE_DIR, PROJECTS_DIR = _default_paths() - -TOOL_DIR = os.path.dirname(os.path.abspath(__file__)) -BACKUP_DIR = os.path.join(TOOL_DIR, "repair-backup") - - -# --------------------------------------------------------------------------- -# Gate 5 -- Known-do-not-run conditions -# Checked after diagnosis-token validation. -# Refusal exits 3 with the message. -# --------------------------------------------------------------------------- - -KNOWN_DO_NOT_RUN = [ - ( - lambda s: s["metadata_missing_cli_count"] == 0, - "All metadata files already have cliSessionId. Nothing to repair.", - ), - ( - lambda s: s["schema_version"] == "unrecognised", - ( - "State schema not recognised. Run diagnose.py and report " - "the unsupported state to the maintainer." - ), - ), -] - - -# --------------------------------------------------------------------------- -# Indexing helpers -# --------------------------------------------------------------------------- - -def _parse_created_at_ms(value): - """Return createdAt as ms-since-epoch int, handling int or ISO string.""" - if value is None: - return None - if isinstance(value, (int, float)): - return int(value) - if isinstance(value, str): - try: - dt = datetime.fromisoformat(value.replace("Z", "+00:00")) - return int(dt.timestamp() * 1000) - except ValueError: - return None - return None - - -def _read_jsonl_first_ts_and_user(jsonl_path): - """Return (first_ts_ms, first_user_text) from a JSONL transcript. - - Reads records sequentially until both values are found or EOF. - Returns (None, None) on any error. - """ - first_ts_ms = None - first_user = None - try: - with open(jsonl_path, "r", encoding="utf-8", errors="replace") as fh: - for line in fh: - line = line.strip() - if not line: - continue - try: - rec = json.loads(line) - except json.JSONDecodeError: - continue - # Extract timestamp from first record that has one - if first_ts_ms is None: - ts = rec.get("timestamp") - if ts: - try: - dt = datetime.fromisoformat( - ts.replace("Z", "+00:00") - ) - first_ts_ms = int(dt.timestamp() * 1000) - except ValueError: - pass - # Extract first user message text - if first_user is None and rec.get("type") == "user": - msg = rec.get("message", {}) - if isinstance(msg, dict) and msg.get("role") == "user": - content = msg.get("content", "") - if isinstance(content, str): - first_user = content[:80] - elif isinstance(content, list): - for item in content: - if ( - isinstance(item, dict) - and item.get("type") == "text" - ): - first_user = item.get("text", "")[:80] - break - if first_ts_ms is not None and first_user is not None: - break - except OSError: - pass - return first_ts_ms, first_user - - -def index_metadata(appdata_claude_dir): - """Return (by_cli, broken_no_cli). - - by_cli: {cliSessionId: (path, parsed_dict)} - broken_no_cli: [(path, parsed_dict)] for files lacking cliSessionId - """ - by_cli = {} - broken = [] - for _acct, _org, meta_dir in _find_meta_dirs(appdata_claude_dir): - for f in sorted(glob.glob(os.path.join(meta_dir, "local_*.json"))): - try: - with open(f, "r", encoding="utf-8") as fh: - data = json.load(fh) - except (OSError, json.JSONDecodeError): - continue - cli = data.get("cliSessionId") - if cli: - by_cli[cli] = (f, data) - else: - # Archived sessions are hidden from Desktop's picker, so - # repairing a missing cliSessionId on one has no user-visible - # effect. Skip them so the repair loop does not attempt - # timestamp-matching against the JSONL pool for entries the - # user deliberately archived. - if not data.get("isArchived"): - broken.append((f, data)) - return by_cli, broken - - -def index_jsonls(projects_dir): - """Return {session_id: (jsonl_path, first_ts_ms, first_user_text)}.""" - out = {} - if not os.path.isdir(projects_dir): - return out - for slug in sorted(os.listdir(projects_dir)): - slug_dir = os.path.join(projects_dir, slug) - if not os.path.isdir(slug_dir): - continue - for f in glob.glob(os.path.join(slug_dir, "*.jsonl")): - sid = os.path.splitext(os.path.basename(f))[0] - if len(sid) != 36: - continue - first_ts_ms, first_user = _read_jsonl_first_ts_and_user(f) - out[sid] = (f, first_ts_ms, first_user) - return out - - -# --------------------------------------------------------------------------- -# Match logic -# --------------------------------------------------------------------------- - -def find_match(broken_meta, jsonl_index, by_cli, window_ms): - """Return (cliSessionId, ambiguity_label). - - Labels: - "unique" -- single candidate in window, not yet claimed - "unique-with-dupe" -- single candidate, already claimed by another meta - "none" -- no candidate in window - "multi" -- multiple candidates too close to distinguish - """ - created_ms = _parse_created_at_ms(broken_meta.get("createdAt")) - if created_ms is None: - return None, "none" - - candidates = [] - for sid, (path, jfirst, juser) in jsonl_index.items(): - if jfirst is None: - continue - delta = abs(jfirst - created_ms) - if delta <= window_ms: - candidates.append((delta, sid, path, juser)) - candidates.sort() - - if not candidates: - return None, "none" - if len(candidates) > 1 and candidates[1][0] - candidates[0][0] < 500: - return candidates[0][1], "multi" - - cli_match = candidates[0][1] - if cli_match in by_cli: - return cli_match, "unique-with-dupe" - return cli_match, "unique" - - -# --------------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------------- - -def main(): - if hasattr(sys.stdout, "reconfigure"): - sys.stdout.reconfigure(encoding="utf-8", errors="replace") - - ap = argparse.ArgumentParser( - description=( - "Repair session metadata files missing cliSessionId. " - "Dry-run by default -- add --apply to mutate." - ), - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - ap.add_argument( - "--diagnosis-id", - metavar="HEX", - default=None, - dest="diagnosis_id", - help="Diagnosis token from diagnose.py (required).", - ) - ap.add_argument( - "--force-with-diagnosis-id", - metavar="VALUE", - default=None, - dest="force_diagnosis_id", - help="Set to 'audit-only' to run dry-run without a current token.", - ) - ap.add_argument( - "--apply", - action="store_true", - help="Apply repairs in-place. Default is dry-run.", - ) - ap.add_argument( - "--window-ms", - type=int, - default=5000, - metavar="MS", - help="createdAt vs JSONL first-timestamp match window in ms (default: 5000).", - ) - ap.add_argument( - "--state", - metavar="PATH", - default=None, - help=( - "Fixture state directory for testing. " - "Must contain appdata/Claude/... and projects/ subdirectories." - ), - ) - args = ap.parse_args() - - # --- Gate 3: diagnosis-token check --- - force_mode = args.force_diagnosis_id == "audit-only" - if not args.diagnosis_id and not force_mode: - print("ERROR: --diagnosis-id required.") - print("Run: python tools/diagnose.py") - sys.exit(2) - if args.apply and force_mode: - print("ERROR: --apply cannot be combined with --force-with-diagnosis-id=audit-only.") - sys.exit(2) - - # Resolve directories - if args.state: - state_abs = os.path.abspath(args.state) - appdata_claude_dir = os.path.join(state_abs, "appdata", "Claude") - projects_dir = os.path.join(state_abs, "projects") - else: - appdata_claude_dir = APPDATA_CLAUDE_DIR - projects_dir = PROJECTS_DIR - - # Compute current snapshot and diagnosis ID - snapshot = build_snapshot( - appdata_claude_dir, projects_dir, - fixture_mode=(args.state is not None), - ) - current_id = make_diagnosis_id(snapshot) - - if not force_mode and current_id != args.diagnosis_id: - print( - "ERROR: Diagnosis token mismatch.\n" - " Supplied : {}\n" - " Current : {}".format(args.diagnosis_id, current_id) - ) - print( - "State has changed since diagnose.py was last run. " - "Re-run: python tools/diagnose.py" - ) - sys.exit(2) - - # --- Gate 5: known-do-not-run conditions --- - for predicate, message in KNOWN_DO_NOT_RUN: - try: - if predicate(snapshot): - print("REFUSED: " + message) - sys.exit(3) - except Exception: - pass - - # Index state - by_cli, broken = index_metadata(appdata_claude_dir) - jsonl_index = index_jsonls(projects_dir) - - used_diagnosis_id = args.diagnosis_id if not force_mode else "(forced-audit-only)" - print("Metadata files: {} (linked: {}, missing cliSessionId: {})".format( - len(by_cli) + len(broken), len(by_cli), len(broken) - )) - print("JSONL files: {}".format(len(jsonl_index))) - print("Match window: +-{}ms".format(args.window_ms)) - print("Mode: {}".format("APPLY" if args.apply else "dry-run (use --apply to mutate)")) - print("Diagnosis ID: {}".format(used_diagnosis_id)) - print() - - if args.apply: - os.makedirs(BACKUP_DIR, exist_ok=True) - - repaired = 0 - refused_multi = 0 - orphan = 0 - - for path, meta in sorted(broken, key=lambda x: _parse_created_at_ms(x[1].get("createdAt")) or 0): - fname = os.path.basename(path) - title = meta.get("title", "")[:60] - created_ms = _parse_created_at_ms(meta.get("createdAt")) - created_display = ( - datetime.fromtimestamp(created_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") - if created_ms else "?" - ) - - cli_match, kind = find_match(meta, jsonl_index, by_cli, args.window_ms) - - if kind == "none": - print(" ORPHAN {} | {} | {!r} -- no JSONL match in window".format( - fname, created_display, title - )) - orphan += 1 - continue - - if kind == "multi": - print(" REFUSE {} | {} | {!r} -- multiple JSONL candidates, manual review needed".format( - fname, created_display, title - )) - refused_multi += 1 - continue - - _jsonl_path, _jfirst, juser = jsonl_index[cli_match] - dupe_note = " (dupe already linked)" if kind == "unique-with-dupe" else "" - - print(" REPAIR {}{}".format(fname, dupe_note)) - print(" {} | {!r}".format(created_display, title)) - print(" cliSessionId = {}".format(cli_match)) - if juser: - print(" first user: {!r}".format(juser)) - print() - - if args.apply: - shutil.copy2(path, os.path.join(BACKUP_DIR, fname)) - repaired_meta = dict(meta) - repaired_meta["cliSessionId"] = cli_match - with open(path, "w", encoding="utf-8") as fh: - json.dump(repaired_meta, fh, indent=2) - - repaired += 1 - - print("Repaired: {} Refused (multi-candidate): {} Orphan (no JSONL): {}".format( - repaired, refused_multi, orphan - )) - if not args.apply and repaired: - print("Review dry-run output above, then re-run with --apply to apply changes.") - - -if __name__ == "__main__": - main() +REPAIR_CONTENT_PLACEHOLDER \ No newline at end of file From 2ad6c042249c3a6954b5306909bee133118ccc5e Mon Sep 17 00:00:00 2001 From: BasedGPT <131253485+BasedGPT@users.noreply.github.com> Date: Fri, 29 May 2026 18:25:59 +1000 Subject: [PATCH 2/3] fix(path-detection): add MSIX/Store fallback and transcriptUnavailable removal Standard Windows installs store Claude data at %APPDATA%\Claude. MSIX/Store installs use %LOCALAPPDATA%\Packages\Claude_\LocalCache\Roaming\Claude. If %APPDATA%\Claude doesn't exist, diagnose.py now scans %LOCALAPPDATA%\Packages\ for any Claude_* package and uses that path. Fixes #5 --- tools/diagnose.py | 728 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 727 insertions(+), 1 deletion(-) diff --git a/tools/diagnose.py b/tools/diagnose.py index 0b6c952..67bfb84 100644 --- a/tools/diagnose.py +++ b/tools/diagnose.py @@ -1 +1,727 @@ -DIAGNOSE_CONTENT_PLACEHOLDER \ No newline at end of file +""" +Claude Code Desktop Session Recovery Tools -- Diagnostic +========================================================= + +Files read: + - %APPDATA%\\Claude\\claude-code-sessions\\\\\\local_*.json + - %USERPROFILE%\\.claude\\projects\\\\*.jsonl + - %LOCALAPPDATA%\\AnthropicClaude\\ (version detection only) + +Files written: + - Nothing. This tool is read-only. + +Invoked directly. Mutators are invoked via the commands this tool prints. + +Usage: + python tools/diagnose.py # probe live state + python tools/diagnose.py --json # machine-readable output + python tools/diagnose.py --state # probe a fixture state dir + # (/appdata/Claude/... and + # /projects/...) +""" +import argparse +import collections +import glob +import hashlib +import json +import os +import platform +import re +import subprocess +import sys + + +# --------------------------------------------------------------------------- +# State discovery +# --------------------------------------------------------------------------- + +def _find_meta_dirs(appdata_claude_dir): + """Yield (account_uuid, org_uuid, absolute_dir_path) for each metadata dir. + + Walks: /claude-code-sessions/// + """ + sessions_root = os.path.join(appdata_claude_dir, "claude-code-sessions") + if not os.path.isdir(sessions_root): + return + for acct in sorted(os.listdir(sessions_root)): + acct_dir = os.path.join(sessions_root, acct) + if not os.path.isdir(acct_dir): + continue + for org in sorted(os.listdir(acct_dir)): + org_dir = os.path.join(acct_dir, org) + if not os.path.isdir(org_dir): + continue + yield acct, org, org_dir + + +def _build_jsonl_index(projects_dir): + """Return {session_id: absolute_jsonl_path} for every *.jsonl found.""" + index = {} + if not os.path.isdir(projects_dir): + return index + for slug in sorted(os.listdir(projects_dir)): + slug_dir = os.path.join(projects_dir, slug) + if not os.path.isdir(slug_dir): + continue + for f in glob.glob(os.path.join(slug_dir, "*.jsonl")): + sid = os.path.splitext(os.path.basename(f))[0] + if len(sid) == 36: # UUID-length check + index[sid] = f + return index + + +def _slug_from_path(path): + """Derive the project slug from a cwd path string (same encoding Claude uses).""" + # Strip drive letter colon, replace all path separators with -- + normalised = path.replace("\\", "/") + # Remove the colon after the drive letter (e.g. "C:" -> "C") + if len(normalised) >= 2 and normalised[1] == ":": + normalised = normalised[0] + normalised[2:] + parts = [p for p in normalised.split("/") if p] + return "--".join(parts) + + +def _slug_encode(cwd): + """Encode a cwd path into the slug Claude Code uses for projects//. + + Observed on disk: C:\\Users\\Foo\\project -> C--Users-Foo-project + Worktree example: C:\\Users\\Foo\\project\\.claude\\worktrees\\foo-1a2b3c + -> C--Users-Foo-project--claude-worktrees-foo-1a2b3c + + The CLI applies realpath first, so this function should receive the + canonical (resolved) path, not a junction alias. + """ + out = cwd + for ch in (":", "\\", "/", ".", " "): + out = out.replace(ch, "-") + return out + + +def _count_jsonl_assistant_lines(path, stop_at=None): + """Count {"role": "assistant"} lines in a JSONL file. + + stop_at: if given, return early once this threshold is reached — the caller + can treat a return value >= stop_at as "not truncated" without reading the + whole file. Keeps large sessions fast. + """ + count = 0 + try: + with open(path, "r", encoding="utf-8", errors="replace") as fh: + for line in fh: + line = line.strip() + if not line or '"assistant"' not in line: + continue + try: + obj = json.loads(line) + if obj.get("type") == "assistant": + count += 1 + if stop_at is not None and count >= stop_at: + return count + except (json.JSONDecodeError, AttributeError): + continue + except OSError: + return 0 + return count + + +def _cwd_type(cwd): + """Classify a cwd path as junction, canonical, bare_root, or other.""" + if not cwd: + return "other" + try: + real = os.path.realpath(cwd) + except OSError: + return "other" + # If the directory doesn't exist, classify as other + if not os.path.exists(cwd): + return "other" + # Junction: realpath differs from the literal path + if os.path.normcase(real) != os.path.normcase(cwd): + return "junction" + # Bare root: only drive letter + one level, no project structure beneath + parts = cwd.replace("\\", "/").rstrip("/").split("/") + clean_parts = [p for p in parts if p and p != ":"] + if len(clean_parts) <= 2: + return "bare_root" + return "canonical" + + +# --------------------------------------------------------------------------- +# Snapshot builder +# --------------------------------------------------------------------------- + +def build_snapshot(appdata_claude_dir, projects_dir, fixture_mode=False): + """Probe state; return a deterministic snapshot dict. + + The snapshot is the input to match predicates in troubleshooting.json and + the source of the deterministic diagnosis ID. + + fixture_mode=True skips live-system detection (cli version, desktop version, + process check) so golden outputs generated from fixture state are deterministic + across environments. + """ + # Collect all metadata files + meta_files = [] + for _acct, _org, meta_dir in _find_meta_dirs(appdata_claude_dir): + for f in sorted(glob.glob(os.path.join(meta_dir, "local_*.json"))): + try: + with open(f, "r", encoding="utf-8") as fh: + data = json.load(fh) + meta_files.append((f, data)) + except (OSError, json.JSONDecodeError): + continue + + jsonl_index = _build_jsonl_index(projects_dir) + + total = len(meta_files) + # Archived sessions are hidden from Desktop's history picker, so a missing + # cliSessionId on an archived session does not cause a user-visible + # blank-pane. Counting them here would trigger a false-positive + # blank-pane-missing-cli problem report. The audit tool surfaces them + # separately in the archived_no_cli bucket if the user explicitly looks. + missing_cli = sum( + 1 for _, d in meta_files + if not d.get("cliSessionId") and not d.get("isArchived") + ) + with_cli = sum(1 for _, d in meta_files if d.get("cliSessionId")) + + dangling_cli = sum( + 1 for _, d in meta_files + if d.get("cliSessionId") and d["cliSessionId"] not in jsonl_index + ) + + # Count cliSessionId values that appear in more than one metadata file. + # Indicates synth-duplicate metadata created by earlier recovery attempts. + cli_values = [d.get("cliSessionId") for _, d in meta_files if d.get("cliSessionId")] + cli_counts = collections.Counter(cli_values) + duplicate_cli_count = sum(1 for c in cli_counts.values() if c > 1) + + cwd_prefix_types = {"junction": 0, "canonical": 0, "bare_root": 0, "other": 0} + junction_mismatch_count = 0 + + for _, d in meta_files: + cwd = d.get("cwd", "") + t = _cwd_type(cwd) + cwd_prefix_types[t] = cwd_prefix_types.get(t, 0) + 1 + if t == "junction": + # Junction mismatch: session cwd is a junction, so the slug + # derived from the junction path differs from the slug derived + # from the real path. If the JSONL is stored at the canonical + # slug and the junction-slug dir is absent or empty, these + # sessions appear as "missing" when navigating via the real path. + cli = d.get("cliSessionId") + if cli and cli in jsonl_index: + try: + real = os.path.realpath(cwd) + canonical_slug = _slug_from_path(real) + junction_slug = _slug_from_path(cwd) + if canonical_slug != junction_slug: + junction_mismatch_count += 1 + except OSError: + pass + + # jsonl_orphan_count: JSONLs with no corresponding metadata cliSessionId. + # Trigger for synth_session_metadata.py. + all_jsonl_sids = set(jsonl_index.keys()) + referenced_sids = {d.get("cliSessionId") for _, d in meta_files if d.get("cliSessionId")} + jsonl_orphan_count = len(all_jsonl_sids - referenced_sids) + + # cwd_slug_mismatch_count: metadata whose cwd slug-encodes to a different slug + # than the directory containing its JSONL. Works in fixture mode (pure string + # comparison -- no on-disk existence check needed). + cwd_slug_mismatch_count = 0 + for _, d in meta_files: + cli = d.get("cliSessionId") + cwd = d.get("cwd", "") + if not cli or not cwd: + continue + if cli not in jsonl_index: + continue + expected_slug = _slug_encode(cwd) + actual_slug_dir = os.path.basename(os.path.dirname(jsonl_index[cli])) + if actual_slug_dir != expected_slug: + cwd_slug_mismatch_count += 1 + + # truncated_jsonl_count: sessions where the JSONL exists and metadata records + # a completedTurns value, but the file contains fewer assistant-role lines + # than that count. A truncated session looks intact from the outside — the + # JSONL is present and the metadata is linked — but opens with missing + # earlier messages. Skipped in fixture mode because fixture JSONLs are stubs. + truncated_jsonl_count = 0 + if not fixture_mode: + for _, d in meta_files: + cli = d.get("cliSessionId") + completed_turns = d.get("completedTurns") + if not cli or not completed_turns or cli not in jsonl_index: + continue + if completed_turns <= 0: + continue + actual = _count_jsonl_assistant_lines(jsonl_index[cli], stop_at=completed_turns) + if actual < completed_turns: + truncated_jsonl_count += 1 + + # Schema version: "recognised" if we found metadata files with the expected + # structure (have sessionId field); "unrecognised" otherwise. + schema_version = "unrecognised" + if total > 0: + known_fields = {"sessionId", "cwd", "createdAt", "model", "title"} + for _, d in meta_files[:5]: # spot-check up to 5 files + if known_fields.intersection(d.keys()): + schema_version = "recognised" + break + + # Desktop version, CLI version, and process state are skipped in fixture mode + # so that golden outputs are deterministic regardless of environment. + if fixture_mode: + desktop_version = None + cli_version = None + desktop_running = False + running_inside_desktop = False + else: + desktop_version = _detect_desktop_version() + cli_version = _detect_cli_version() + desktop_running = _detect_desktop_running() + running_inside_desktop = _detect_running_inside_desktop() if desktop_running else False + + return { + "total_metadata_count": total, + "metadata_with_cli_count": with_cli, + "metadata_missing_cli_count": missing_cli, + "metadata_dangling_cli_count": dangling_cli, + "metadata_duplicate_cli_count": duplicate_cli_count, + "cwd_junction_mismatch_count": junction_mismatch_count, + "jsonl_orphan_count": jsonl_orphan_count, + "cwd_slug_mismatch_count": cwd_slug_mismatch_count, + "truncated_jsonl_count": truncated_jsonl_count, + "cwd_prefix_types": cwd_prefix_types, + "jsonl_count": len(jsonl_index), + "schema_version": schema_version, + "desktop_version": desktop_version, + "cli_version": cli_version, + "desktop_running": desktop_running, + "running_inside_desktop": running_inside_desktop, + } + + +def _detect_desktop_version(): + local_app = os.environ.get("LOCALAPPDATA", "") + anthropic_dir = os.path.join(local_app, "AnthropicClaude") + if not os.path.isdir(anthropic_dir): + return None + try: + for entry in sorted(os.listdir(anthropic_dir), reverse=True): + if entry.startswith("app-"): + return entry[4:] # strip "app-" prefix + except OSError: + pass + return None + + +def _detect_cli_version(): + try: + result = subprocess.run( + ["claude", "--version"], + capture_output=True, + text=True, + timeout=5, + check=False, + ) + if result.returncode == 0: + out = result.stdout.strip() + # Output is typically "2.1.143 (Claude Code)" -- extract semver + m = re.search(r"\d+\.\d+\.\d+", out) + if m: + return m.group(0) + except (OSError, subprocess.TimeoutExpired): + pass + return None + + +def _detect_desktop_running(): + """Return True only if Claude Desktop (not just the CLI) is running. + + Both Desktop and the CLI appear in tasklist as `claude.exe`, so we + cannot rely on the process name alone. Desktop installs under + `%LOCALAPPDATA%\\AnthropicClaude\\app-\\Claude.exe`; the CLI + runs from wherever npm installed it (typically not under AnthropicClaude). + We enumerate running `claude.exe` processes and check their executable + paths. Returns False if no claude.exe under AnthropicClaude is running, + so a standalone CLI session does not trigger the "QUIT DESKTOP" warning. + """ + # Try wmic first (deprecated but still present on most installs) + try: + result = subprocess.run( + ["wmic", "process", "where", "name='claude.exe'", + "get", "ExecutablePath", "/format:csv"], + capture_output=True, text=True, timeout=10, check=False, + ) + if result.returncode == 0: + for line in result.stdout.splitlines(): + low = line.lower() + if "anthropicclaude" in low and "claude.exe" in low: + return True + # wmic ran successfully -- absence of AnthropicClaude path means + # any claude.exe in tasklist is the CLI, not Desktop. + return False + except (OSError, subprocess.TimeoutExpired): + pass + + # Fallback: PowerShell on systems where wmic is missing (Win11 22H2+). + try: + result = subprocess.run( + ["powershell", "-NoProfile", "-Command", + "Get-Process claude -ErrorAction SilentlyContinue | " + "ForEach-Object { $_.Path }"], + capture_output=True, text=True, timeout=10, check=False, + ) + if result.returncode == 0: + for line in result.stdout.splitlines(): + if "anthropicclaude" in line.lower(): + return True + return False + except (OSError, subprocess.TimeoutExpired): + pass + + # Both detection paths failed -- err on the side of caution and report + # claude.exe presence (the original behaviour). Better to warn unnecessarily + # than to miss a real running Desktop. + try: + result = subprocess.run( + ["tasklist", "/FI", "IMAGENAME eq claude.exe"], + capture_output=True, text=True, timeout=10, check=False, + ) + return "claude.exe" in result.stdout.lower() + except (OSError, subprocess.TimeoutExpired): + return False + + +def _detect_running_inside_desktop(): + """Return True if this process is a descendant of claude.exe. + + Walks the process tree upward from the current PID using wmic. Returns + False on any error so the caller degrades gracefully. + """ + try: + result = subprocess.run( + ["wmic", "process", "get", "ProcessId,ParentProcessId,Name", "/format:csv"], + capture_output=True, + text=True, + timeout=10, + check=False, + ) + # CSV columns: Node, Name, ParentProcessId, ProcessId + procs = {} + for line in result.stdout.splitlines(): + parts = line.strip().split(",") + if len(parts) < 4: + continue + try: + name = parts[1].lower() + parent_pid = int(parts[2]) + pid = int(parts[3]) + procs[pid] = (name, parent_pid) + except (ValueError, IndexError): + continue + current = os.getpid() + visited = set() + while current and current not in visited: + visited.add(current) + if current not in procs: + break + name, parent = procs[current] + if "claude" in name: + return True + current = parent + return False + except Exception: + return False + + +# --------------------------------------------------------------------------- +# Diagnosis ID +# --------------------------------------------------------------------------- + +def make_diagnosis_id(snapshot): + """Return an 8-hex-char deterministic ID derived from the session state. + + Version fields (desktop_version, cli_version) and process state + (desktop_running) are excluded -- they change independently of the + broken-state we're diagnosing. + """ + structural_keys = ( + "total_metadata_count", + "metadata_with_cli_count", + "metadata_missing_cli_count", + "metadata_dangling_cli_count", + "metadata_duplicate_cli_count", + "cwd_junction_mismatch_count", + "cwd_slug_mismatch_count", + "truncated_jsonl_count", + "jsonl_orphan_count", + "cwd_prefix_types", + "jsonl_count", + "schema_version", + ) + structural = {k: snapshot[k] for k in structural_keys if k in snapshot} + canonical = json.dumps(structural, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:8] + + +# --------------------------------------------------------------------------- +# Predicate evaluator +# --------------------------------------------------------------------------- + +def _eval_comparison(actual, op, expected): + """Evaluate a single comparison operation.""" + if op == "==": + return actual == expected + if op == "!=": + return actual != expected + if op == ">=": + return actual is not None and actual >= expected + if op == "<=": + return actual is not None and actual <= expected + if op == ">": + return actual is not None and actual > expected + if op == "<": + return actual is not None and actual < expected + if op == "in": + return actual in expected + if op == "regex": + return bool(re.search(expected, str(actual or ""))) + return False + + +def eval_match(predicate, snapshot): + """Evaluate a match predicate dict against a snapshot. Returns bool.""" + if not predicate: + return False + if "any" in predicate: + return any(eval_match(p, snapshot) for p in predicate["any"]) + if "all" in predicate: + return all(eval_match(p, snapshot) for p in predicate["all"]) + # Leaf node: {"snapshot.field[.subfield...]": {"op": value}} + # Supports dot-notation for nested dicts, e.g. "snapshot.cwd_prefix_types.bare_root". + for key, comparison in predicate.items(): + field = key[len("snapshot."):] if key.startswith("snapshot.") else key + parts = field.split(".") + actual = snapshot + for part in parts: + if isinstance(actual, dict): + actual = actual.get(part) + else: + actual = None + break + for op, expected in comparison.items(): + if not _eval_comparison(actual, op, expected): + return False + return True + + +# --------------------------------------------------------------------------- +# Output formatting +# --------------------------------------------------------------------------- + +_SEP = "-" * 60 + + +def _format_human(diagnosis_id, snapshot, matches, schema_ok, repo_root=None): + lines = [ + "DIAGNOSE -- Claude Code Desktop Session Recovery Tools", + _SEP, + f"Diagnosis ID : {diagnosis_id}", + ] + if snapshot.get("desktop_version"): + lines.append(f"Desktop ver. : {snapshot['desktop_version']}") + if snapshot.get("cli_version"): + lines.append(f"CLI ver. : {snapshot['cli_version']}") + lines.append( + f"Metadata : {snapshot['total_metadata_count']} files" + f" ({snapshot['metadata_with_cli_count']} with cliSessionId," + f" {snapshot['metadata_missing_cli_count']} missing)" + ) + lines.append(f"JSONL files : {snapshot['jsonl_count']}") + if snapshot.get("truncated_jsonl_count", 0) > 0: + lines.append( + f"Truncated : {snapshot['truncated_jsonl_count']} session(s) have fewer" + " messages than completedTurns records (history appears cut off)" + ) + lines.append("") + + if snapshot.get("desktop_running"): + if snapshot.get("running_inside_desktop"): + lines.append("WARNING: You are running inside Claude Desktop itself.") + lines.append(" You cannot quit Desktop to run mutators from this session.") + lines.append("") + lines.append(" Do this in order:") + lines.append(" 1. Open a new terminal (cmd, PowerShell, or Windows Terminal)") + if repo_root: + lines.append(' 2. cd "{}"'.format(repo_root)) + lines.append(" 3. Quit Claude Desktop: right-click the tray icon -> Quit") + lines.append(' 4. tasklist /FI "IMAGENAME eq claude.exe" -- must show no results') + lines.append(" 5. Run the repair commands printed below") + else: + lines.append("WARNING: Claude Desktop appears to be running (claude.exe in tasklist).") + lines.append(" Diagnose is safe -- it is read-only.") + lines.append(" Repair commands below will NOT work until Desktop is fully quit.") + lines.append(" Quit: right-click the tray icon -> Quit. Then verify:") + lines.append(' tasklist /FI "IMAGENAME eq claude.exe"') + lines.append("") + + if not schema_ok: + lines.append("State layout not in supported fixture set. Audit-only mode.") + lines.append("No repair commands will be suggested for this state.") + lines.append( + "Please open an issue at https://github.com/BasedGPT/" + "claude-code-session-recovery with your diagnose.py --json output." + ) + return "\n".join(lines) + + if not matches: + lines.append("State looks healthy. No known broken patterns matched.") + return "\n".join(lines) + + quit_prefix = "QUIT DESKTOP FIRST: " if snapshot.get("desktop_running") else "" + + for row in matches: + lines.append(f"PROBLEM FOUND: {row['problem']}") + lines.append(f" Details: {row['details']}") + label = "Safety" if row.get("mutator") else "Status" + lines.append(f" {label} : {row['safety']}") + lines.append("") + if row.get("mutator"): + mutator = row["mutator"] + lines.append(" To repair -- dry-run first, review output, then add --apply:") + lines.append( + f" {quit_prefix}python {mutator}" + f" --diagnosis-id {diagnosis_id}" + ) + lines.append( + f" {quit_prefix}python {mutator}" + f" --diagnosis-id {diagnosis_id} --apply" + ) + elif row.get("next_command"): + lines.append(f" Next: {row['next_command']}") + else: + lines.append(" No automatic repair for this state. See:") + lines.append(f" {row['details']}") + lines.append("") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + # REL-09 / PY-06: all I/O and side effects inside main() + if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + + ap = argparse.ArgumentParser( + description="Diagnose Claude Code Desktop session state. Read-only.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Run this first. It tells you exactly what to do next.\n" + "Mutators shown in the output require --diagnosis-id from this tool." + ), + ) + ap.add_argument( + "--state", + metavar="PATH", + default=None, + help=( + "Path to a fixture state directory (for testing). " + "Must contain appdata/Claude/... and projects/ subdirectories." + ), + ) + ap.add_argument( + "--json", + dest="json_output", + action="store_true", + help="Emit machine-readable JSON output.", + ) + args = ap.parse_args() + + # Resolve state directories + if args.state: + state_abs = os.path.abspath(args.state) + appdata_claude_dir = os.path.join(state_abs, "appdata", "Claude") + projects_dir = os.path.join(state_abs, "projects") + else: + _sys = platform.system() + if _sys == "Darwin": + appdata_claude_dir = os.path.expanduser("~/Library/Application Support/Claude") + projects_dir = os.path.expanduser("~/.claude/projects") + elif _sys == "Linux": + appdata_claude_dir = os.path.expanduser("~/.config/Claude") + projects_dir = os.path.expanduser("~/.claude/projects") + else: + appdata_claude_dir = os.path.join( + os.environ.get("APPDATA", os.path.expanduser("~")), "Claude" + ) + if not os.path.isdir(appdata_claude_dir): + _local = os.environ.get("LOCALAPPDATA", "") + _pkgs = os.path.join(_local, "Packages") + if os.path.isdir(_pkgs): + for _pkg in sorted(os.listdir(_pkgs)): + if _pkg.startswith("Claude_"): + _cand = os.path.join( + _pkgs, _pkg, "LocalCache", "Roaming", "Claude" + ) + if os.path.isdir(_cand): + appdata_claude_dir = _cand + break + projects_dir = os.path.join(os.path.expanduser("~"), ".claude", "projects") + + snapshot = build_snapshot(appdata_claude_dir, projects_dir, fixture_mode=args.state is not None) + diagnosis_id = make_diagnosis_id(snapshot) + + # Load troubleshooting.json from the repo root (tools/ -> parent = repo root) + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.dirname(script_dir) + ts_path = os.path.join(repo_root, "troubleshooting.json") + + rows = [] + if os.path.isfile(ts_path): + with open(ts_path, "r", encoding="utf-8") as fh: + rows = json.load(fh) + + schema_ok = snapshot["schema_version"] == "recognised" + matches = [row for row in rows if eval_match(row.get("match", {}), snapshot)] + + if args.json_output: + output = { + "diagnosis_id": diagnosis_id, + "tested_against": { + "claude_desktop": snapshot.get("desktop_version"), + "claude_code_cli": snapshot.get("cli_version"), + "windows": "11", + }, + "schema_probe": snapshot["schema_version"], + "desktop_running": snapshot["desktop_running"], + "matched_problems": [ + { + "id": row["id"], + "domain": row["domain"], + "mutator": row.get("mutator"), + "next_command": ( + "python {} --diagnosis-id {}".format(row["mutator"], diagnosis_id) + if row.get("mutator") else None + ), + "safety_preconditions": [row["safety"]], + } + for row in matches + ], + "audit_only_problems": [], + "schema_mismatch": not schema_ok, + "snapshot": snapshot, + } + print(json.dumps(output, indent=2)) + else: + print(_format_human(diagnosis_id, snapshot, matches, schema_ok, repo_root)) + + +if __name__ == "__main__": + main() From 9c570e7b7f6cf10c3f1362b47456fca8e6321a26 Mon Sep 17 00:00:00 2001 From: BasedGPT <131253485+BasedGPT@users.noreply.github.com> Date: Fri, 29 May 2026 18:26:04 +1000 Subject: [PATCH 3/3] fix(repair): remove transcriptUnavailable key on write and add MSIX path fallback The Desktop checks transcriptUnavailable before rendering session content, so leaving it set after backfilling cliSessionId caused sessions to still show 'Session not found on disk'. Now popped from the metadata dict before writing with --apply. Also adds MSIX/Store path detection to _default_paths(). Related to #5 --- tools/sessions/repair_session_metadata.py | 441 +++++++++++++++++++++- 1 file changed, 440 insertions(+), 1 deletion(-) diff --git a/tools/sessions/repair_session_metadata.py b/tools/sessions/repair_session_metadata.py index 10762af..7383951 100644 --- a/tools/sessions/repair_session_metadata.py +++ b/tools/sessions/repair_session_metadata.py @@ -1 +1,440 @@ -REPAIR_CONTENT_PLACEHOLDER \ No newline at end of file +""" +Invoked via diagnose.py. Not intended for direct invocation. +To diagnose your state: python tools/diagnose.py + +Repair session metadata files that are missing the cliSessionId field. +Adds cliSessionId by matching each broken metadata file's createdAt +timestamp against the first timestamp in each JSONL transcript, within +a configurable window (default: 5 seconds). Single-candidate matches +only -- ambiguous matches are skipped with a report. + +Files read: + - %APPDATA%\\Claude\\claude-code-sessions\\\\\\local_*.json + - %USERPROFILE%\\.claude\\projects\\\\*.jsonl + +Files written: + - %APPDATA%\\Claude\\claude-code-sessions\\\\\\.json + (only with --apply; cliSessionId field added in-place) + +Backup created at: + - ./repair-backup/.json (alongside this script) + +Rollback command: + - copy /Y repair-backup\\*.json "%APPDATA%\\Claude\\claude-code-sessions\\\\\\" + +Usage: + python tools/sessions/repair_session_metadata.py --diagnosis-id + python tools/sessions/repair_session_metadata.py --diagnosis-id --apply + python tools/sessions/repair_session_metadata.py --diagnosis-id --window-ms 8000 +""" +import argparse +import glob +import json +import os +import platform +import shutil +import sys +from datetime import datetime, timezone + +# Import shared helpers from tools/diagnose.py (parent directory). +_TOOLS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, _TOOLS_DIR) +try: + from diagnose import build_snapshot, make_diagnosis_id, _find_meta_dirs +except ImportError as exc: + print("ERROR: cannot import from diagnose.py: {}".format(exc)) + print("Run from the repo root: python tools/sessions/repair_session_metadata.py") + sys.exit(1) + +# --- Configuration --- +# Used when --state is not supplied (live mode). + +def _default_paths(): + """Return (appdata_claude_dir, projects_dir) for the current platform.""" + _sys = platform.system() + if _sys == "Darwin": + return ( + os.path.expanduser("~/Library/Application Support/Claude"), + os.path.expanduser("~/.claude/projects"), + ) + if _sys == "Linux": + return ( + os.path.expanduser("~/.config/Claude"), + os.path.expanduser("~/.claude/projects"), + ) + _appdata_claude = os.path.join( + os.environ.get("APPDATA", os.path.expanduser("~")), "Claude" + ) + if not os.path.isdir(_appdata_claude): + _local = os.environ.get("LOCALAPPDATA", "") + _pkgs = os.path.join(_local, "Packages") + if os.path.isdir(_pkgs): + for _pkg in sorted(os.listdir(_pkgs)): + if _pkg.startswith("Claude_"): + _cand = os.path.join( + _pkgs, _pkg, "LocalCache", "Roaming", "Claude" + ) + if os.path.isdir(_cand): + _appdata_claude = _cand + break + return ( + _appdata_claude, + os.path.join(os.path.expanduser("~"), ".claude", "projects"), + ) + + +APPDATA_CLAUDE_DIR, PROJECTS_DIR = _default_paths() + +TOOL_DIR = os.path.dirname(os.path.abspath(__file__)) +BACKUP_DIR = os.path.join(TOOL_DIR, "repair-backup") + + +# --------------------------------------------------------------------------- +# Gate 5 -- Known-do-not-run conditions +# Checked after diagnosis-token validation. +# Refusal exits 3 with the message. +# --------------------------------------------------------------------------- + +KNOWN_DO_NOT_RUN = [ + ( + lambda s: s["metadata_missing_cli_count"] == 0, + "All metadata files already have cliSessionId. Nothing to repair.", + ), + ( + lambda s: s["schema_version"] == "unrecognised", + ( + "State schema not recognised. Run diagnose.py and report " + "the unsupported state to the maintainer." + ), + ), +] + + +# --------------------------------------------------------------------------- +# Indexing helpers +# --------------------------------------------------------------------------- + +def _parse_created_at_ms(value): + """Return createdAt as ms-since-epoch int, handling int or ISO string.""" + if value is None: + return None + if isinstance(value, (int, float)): + return int(value) + if isinstance(value, str): + try: + dt = datetime.fromisoformat(value.replace("Z", "+00:00")) + return int(dt.timestamp() * 1000) + except ValueError: + return None + return None + + +def _read_jsonl_first_ts_and_user(jsonl_path): + """Return (first_ts_ms, first_user_text) from a JSONL transcript. + + Reads records sequentially until both values are found or EOF. + Returns (None, None) on any error. + """ + first_ts_ms = None + first_user = None + try: + with open(jsonl_path, "r", encoding="utf-8", errors="replace") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + # Extract timestamp from first record that has one + if first_ts_ms is None: + ts = rec.get("timestamp") + if ts: + try: + dt = datetime.fromisoformat( + ts.replace("Z", "+00:00") + ) + first_ts_ms = int(dt.timestamp() * 1000) + except ValueError: + pass + # Extract first user message text + if first_user is None and rec.get("type") == "user": + msg = rec.get("message", {}) + if isinstance(msg, dict) and msg.get("role") == "user": + content = msg.get("content", "") + if isinstance(content, str): + first_user = content[:80] + elif isinstance(content, list): + for item in content: + if ( + isinstance(item, dict) + and item.get("type") == "text" + ): + first_user = item.get("text", "")[:80] + break + if first_ts_ms is not None and first_user is not None: + break + except OSError: + pass + return first_ts_ms, first_user + + +def index_metadata(appdata_claude_dir): + """Return (by_cli, broken_no_cli). + + by_cli: {cliSessionId: (path, parsed_dict)} + broken_no_cli: [(path, parsed_dict)] for files lacking cliSessionId + """ + by_cli = {} + broken = [] + for _acct, _org, meta_dir in _find_meta_dirs(appdata_claude_dir): + for f in sorted(glob.glob(os.path.join(meta_dir, "local_*.json"))): + try: + with open(f, "r", encoding="utf-8") as fh: + data = json.load(fh) + except (OSError, json.JSONDecodeError): + continue + cli = data.get("cliSessionId") + if cli: + by_cli[cli] = (f, data) + else: + # Archived sessions are hidden from Desktop's picker, so + # repairing a missing cliSessionId on one has no user-visible + # effect. Skip them so the repair loop does not attempt + # timestamp-matching against the JSONL pool for entries the + # user deliberately archived. + if not data.get("isArchived"): + broken.append((f, data)) + return by_cli, broken + + +def index_jsonls(projects_dir): + """Return {session_id: (jsonl_path, first_ts_ms, first_user_text)}.""" + out = {} + if not os.path.isdir(projects_dir): + return out + for slug in sorted(os.listdir(projects_dir)): + slug_dir = os.path.join(projects_dir, slug) + if not os.path.isdir(slug_dir): + continue + for f in glob.glob(os.path.join(slug_dir, "*.jsonl")): + sid = os.path.splitext(os.path.basename(f))[0] + if len(sid) != 36: + continue + first_ts_ms, first_user = _read_jsonl_first_ts_and_user(f) + out[sid] = (f, first_ts_ms, first_user) + return out + + +# --------------------------------------------------------------------------- +# Match logic +# --------------------------------------------------------------------------- + +def find_match(broken_meta, jsonl_index, by_cli, window_ms): + """Return (cliSessionId, ambiguity_label). + + Labels: + "unique" -- single candidate in window, not yet claimed + "unique-with-dupe" -- single candidate, already claimed by another meta + "none" -- no candidate in window + "multi" -- multiple candidates too close to distinguish + """ + created_ms = _parse_created_at_ms(broken_meta.get("createdAt")) + if created_ms is None: + return None, "none" + + candidates = [] + for sid, (path, jfirst, juser) in jsonl_index.items(): + if jfirst is None: + continue + delta = abs(jfirst - created_ms) + if delta <= window_ms: + candidates.append((delta, sid, path, juser)) + candidates.sort() + + if not candidates: + return None, "none" + if len(candidates) > 1 and candidates[1][0] - candidates[0][0] < 500: + return candidates[0][1], "multi" + + cli_match = candidates[0][1] + if cli_match in by_cli: + return cli_match, "unique-with-dupe" + return cli_match, "unique" + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8", errors="replace") + + ap = argparse.ArgumentParser( + description=( + "Repair session metadata files missing cliSessionId. " + "Dry-run by default -- add --apply to mutate." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + ap.add_argument( + "--diagnosis-id", + metavar="HEX", + default=None, + dest="diagnosis_id", + help="Diagnosis token from diagnose.py (required).", + ) + ap.add_argument( + "--force-with-diagnosis-id", + metavar="VALUE", + default=None, + dest="force_diagnosis_id", + help="Set to 'audit-only' to run dry-run without a current token.", + ) + ap.add_argument( + "--apply", + action="store_true", + help="Apply repairs in-place. Default is dry-run.", + ) + ap.add_argument( + "--window-ms", + type=int, + default=5000, + metavar="MS", + help="createdAt vs JSONL first-timestamp match window in ms (default: 5000).", + ) + ap.add_argument( + "--state", + metavar="PATH", + default=None, + help=( + "Fixture state directory for testing. " + "Must contain appdata/Claude/... and projects/ subdirectories." + ), + ) + args = ap.parse_args() + + # --- Gate 3: diagnosis-token check --- + force_mode = args.force_diagnosis_id == "audit-only" + if not args.diagnosis_id and not force_mode: + print("ERROR: --diagnosis-id required.") + print("Run: python tools/diagnose.py") + sys.exit(2) + if args.apply and force_mode: + print("ERROR: --apply cannot be combined with --force-with-diagnosis-id=audit-only.") + sys.exit(2) + + # Resolve directories + if args.state: + state_abs = os.path.abspath(args.state) + appdata_claude_dir = os.path.join(state_abs, "appdata", "Claude") + projects_dir = os.path.join(state_abs, "projects") + else: + appdata_claude_dir = APPDATA_CLAUDE_DIR + projects_dir = PROJECTS_DIR + + # Compute current snapshot and diagnosis ID + snapshot = build_snapshot( + appdata_claude_dir, projects_dir, + fixture_mode=(args.state is not None), + ) + current_id = make_diagnosis_id(snapshot) + + if not force_mode and current_id != args.diagnosis_id: + print( + "ERROR: Diagnosis token mismatch.\n" + " Supplied : {}\n" + " Current : {}".format(args.diagnosis_id, current_id) + ) + print( + "State has changed since diagnose.py was last run. " + "Re-run: python tools/diagnose.py" + ) + sys.exit(2) + + # --- Gate 5: known-do-not-run conditions --- + for predicate, message in KNOWN_DO_NOT_RUN: + try: + if predicate(snapshot): + print("REFUSED: " + message) + sys.exit(3) + except Exception: + pass + + # Index state + by_cli, broken = index_metadata(appdata_claude_dir) + jsonl_index = index_jsonls(projects_dir) + + used_diagnosis_id = args.diagnosis_id if not force_mode else "(forced-audit-only)" + print("Metadata files: {} (linked: {}, missing cliSessionId: {})".format( + len(by_cli) + len(broken), len(by_cli), len(broken) + )) + print("JSONL files: {}".format(len(jsonl_index))) + print("Match window: +-{}ms".format(args.window_ms)) + print("Mode: {}".format("APPLY" if args.apply else "dry-run (use --apply to mutate)")) + print("Diagnosis ID: {}".format(used_diagnosis_id)) + print() + + if args.apply: + os.makedirs(BACKUP_DIR, exist_ok=True) + + repaired = 0 + refused_multi = 0 + orphan = 0 + + for path, meta in sorted(broken, key=lambda x: _parse_created_at_ms(x[1].get("createdAt")) or 0): + fname = os.path.basename(path) + title = meta.get("title", "")[:60] + created_ms = _parse_created_at_ms(meta.get("createdAt")) + created_display = ( + datetime.fromtimestamp(created_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + if created_ms else "?" + ) + + cli_match, kind = find_match(meta, jsonl_index, by_cli, args.window_ms) + + if kind == "none": + print(" ORPHAN {} | {} | {!r} -- no JSONL match in window".format( + fname, created_display, title + )) + orphan += 1 + continue + + if kind == "multi": + print(" REFUSE {} | {} | {!r} -- multiple JSONL candidates, manual review needed".format( + fname, created_display, title + )) + refused_multi += 1 + continue + + _jsonl_path, _jfirst, juser = jsonl_index[cli_match] + dupe_note = " (dupe already linked)" if kind == "unique-with-dupe" else "" + + print(" REPAIR {}{}".format(fname, dupe_note)) + print(" {} | {!r}".format(created_display, title)) + print(" cliSessionId = {}".format(cli_match)) + if juser: + print(" first user: {!r}".format(juser)) + print() + + if args.apply: + shutil.copy2(path, os.path.join(BACKUP_DIR, fname)) + repaired_meta = dict(meta) + repaired_meta["cliSessionId"] = cli_match + repaired_meta.pop("transcriptUnavailable", None) + with open(path, "w", encoding="utf-8") as fh: + json.dump(repaired_meta, fh, indent=2) + + repaired += 1 + + print("Repaired: {} Refused (multi-candidate): {} Orphan (no JSONL): {}".format( + repaired, refused_multi, orphan + )) + if not args.apply and repaired: + print("Review dry-run output above, then re-run with --apply to apply changes.") + + +if __name__ == "__main__": + main()