diff --git a/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag b/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag new file mode 100644 index 0000000..13f127b --- /dev/null +++ b/ghost-ai-scanner/agent/install/scan_authorize_fetch.py.frag @@ -0,0 +1,48 @@ +# ============================================================= +# FRAGMENT: scan_authorize_fetch.py.frag +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: At scan start, fetch this user's S3 authorized-provider +# list (written by the dashboard's [Authorize] button) and +# merge it into AUTH_LIST. Providers in the list are filtered +# out by every scan_*() emitter via _is_authorized() — so +# authorised tools never reach the dashboard, ending the +# noise loop at source. +# Storage layout: +# s3:///config/authorized/{email_safe}.json +# Fetched via the presigned GET URL configured in +# ~/.patronai/config.json under "authorized_list_url". +# (Server's url_refresh_loop mints this alongside the +# existing upload URL — extend when wiring this in.) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +def _fetch_remote_authorized() -> list: + """Best-effort: pull the per-user authorized list from S3. + Returns a list of provider strings; empty on any failure so the + scan still runs with whatever local AUTH_LIST already had.""" + url = _cfg.get("authorized_list_url", "").strip() + if not url: + return [] + try: + import urllib.request + # 5s timeout — scans must not stall on a slow / dead S3 endpoint. + req = urllib.request.Request(url, headers={"User-Agent": "patronai-agent"}) + with urllib.request.urlopen(req, timeout=5) as resp: + doc = json.loads(resp.read().decode()) + providers = doc.get("providers", []) + if isinstance(providers, list): + return [str(p).strip().lower() for p in providers if p] + except Exception: + # Silent — agent must never block a scan on a remote-config failure. + return [] + return [] + + +# Merge remote list into AUTH_LIST. Local file remains the ground truth +# for offline operation; remote entries are additive. +_remote_auth = _fetch_remote_authorized() +if _remote_auth: + AUTH_LIST = sorted(set(AUTH_LIST) | set(_remote_auth)) diff --git a/ghost-ai-scanner/agent/install/scan_footer.py.frag b/ghost-ai-scanner/agent/install/scan_footer.py.frag index 087fca6..b3bd539 100644 --- a/ghost-ai-scanner/agent/install/scan_footer.py.frag +++ b/ghost-ai-scanner/agent/install/scan_footer.py.frag @@ -9,11 +9,14 @@ # and prints the JSON to stdout. The bash wrapper PUTs it to S3. # AUDIT LOG: # v1.0.0 2026-04-25 Initial. Group 2 — fragment refactor. -# v2.0.0 2026-04-26 Phase 1A. Calls 4 new emitters (mcp_configs, -# agents_workflows, tools_code, vector_dbs). -# Adds scan_kind tag (`baseline` first run, then -# `recurring`). Clears first_run flag once the -# payload is built. Adds repo discovery summary. +# v2.0.0 2026-04-26 Phase 1A. Four new emitters + scan_kind tag. +# v2.1.0 2026-05-11 Add snapshot_hash — SHA-256 over the canonical +# findings list. Server uses it for cheap "same +# state as last cycle" detection (short-circuits +# redundant explode + write). Companion to the +# server-side findings_compact job. Enables future +# v3 agent delta-emission (send hash only, omit +# findings array if hash matches the previous send). # ============================================================= _findings: list = [] @@ -38,6 +41,24 @@ def _count(kind: str) -> int: _scan_kind = "baseline" if IS_FIRST_RUN else "recurring" + +def _snapshot_hash(findings_list): + """SHA-256 over the canonical sort of (type, key) tuples per finding. + Server short-circuits when this matches the previous scan's hash — + no explode, no findings_store write, no false-noise alerts.""" + import hashlib + keys = [] + for _f in findings_list: + _t = _f.get("type", "") + # Pick the most stable distinguishing field per category. + _k = (_f.get("domain") or _f.get("name") or _f.get("plugin_id") + or _f.get("image") or _f.get("server_name") + or _f.get("filename") or _f.get("signal") or "") + keys.append(f"{_t}|{_k}") + keys.sort() + return hashlib.sha256("\n".join(keys).encode()).hexdigest()[:16] + + _payload = { "event_type": "ENDPOINT_SCAN", "source": "patronai_scan_agent", @@ -51,6 +72,7 @@ _payload = { "os_name": OS_NAME, "timestamp": NOW, "scan_kind": _scan_kind, + "snapshot_hash": _snapshot_hash(_findings), "authorized": AUTH_LIST, "repos_discovered": [{"name": r.get("name"), "remote_host": r.get("remote_host"), diff --git a/ghost-ai-scanner/dashboard/ui/ai_posture_card.py b/ghost-ai-scanner/dashboard/ui/ai_posture_card.py new file mode 100644 index 0000000..d783e74 --- /dev/null +++ b/ghost-ai-scanner/dashboard/ui/ai_posture_card.py @@ -0,0 +1,108 @@ +# ============================================================= +# FILE: dashboard/ui/ai_posture_card.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Single aggregated card replacing the numeric KPI row at +# the top of the Inventory / Exec views. +# One risk score, one band colour, one "what needs action" +# breakdown. Drives the shift from "events log" UX to +# "decision surface" UX. +# DEPENDS: streamlit, scoring.risk_score +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import os +import sys + +import streamlit as st + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from scoring.risk_score import risk_score, risk_band, posture_breakdown # noqa: E402 + + +_CATEGORY_LABEL = { + "process": "AI processes running", + "package": "AI packages installed", + "ide_plugin": "IDE plugins detected", + "browser": "AI service browser hits", + "container_image": "Container images", + "container_log_signal": "Container traffic / key signals", + "shell_history": "Past shell commands", + "mcp_server": "MCP servers configured", + "agent_workflow": "Agent workflows (n8n / Flowise / langflow)", + "agent_scheduled": "Scheduled agents (cron / launchd)", + "tool_registration": "@tool decorators in code", + "vector_db": "Local vector DBs", +} + + +def _band_colour(band: str) -> str: + return { + "CRITICAL": "#cf222e", + "HIGH": "#bc4c00", + "MEDIUM": "#9a6700", + "LOW": "#1f6feb", + "CLEAN": "#1a7f37", + }.get(band, "#57606A") + + +def render_ai_posture(rows: list, device_label: str = "this fleet") -> None: + """Render the aggregated AI Posture card. + `rows` must be the COMPACTED rows (findings_current view) — one + per signature, with severity/category/occurrences/last_seen. + Falls back gracefully if older raw-finding rows are passed.""" + score = risk_score(rows) + band = risk_band(score) + bdown = posture_breakdown(rows) + open_categories = sum(1 for v in bdown.values() if v["count"] > 0) + + st.markdown( + f"
" + f"
" + f"
" + f"AI POSTURE — {device_label}
" + f"
" + f"RISK SCORE: {score} / 100  ·  {band}
" + f"
", + unsafe_allow_html=True, + ) + + if open_categories == 0: + st.markdown( + "
✓ No open AI findings. Posture is clean.
", + unsafe_allow_html=True, + ) + return + + # Render one row per non-empty category, sorted by severity then count. + sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + items = sorted( + bdown.items(), + key=lambda kv: (-sev_rank.get(kv[1]["max_severity"], 0), + -kv[1]["count"]), + ) + rows_html = [] + for cat, info in items: + if info["count"] == 0: + continue + label = _CATEGORY_LABEL.get(cat, cat.replace("_", " ").title()) + sev = info["max_severity"] + sev_clr = _band_colour(sev) + rows_html.append( + f"
" + f"
" + f"{info['count']} {label}
" + f"
max sev: {sev}
" + f"
" + ) + st.markdown("".join(rows_html) + "", unsafe_allow_html=True) diff --git a/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py b/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py new file mode 100644 index 0000000..dcc4952 --- /dev/null +++ b/ghost-ai-scanner/dashboard/ui/category_grouped_risks.py @@ -0,0 +1,102 @@ +# ============================================================= +# FILE: dashboard/ui/category_grouped_risks.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Collapsible category-grouped view of open findings. +# Replaces the row-soup. Each category is a parent row with +# count + max-severity + last-seen. Expand to see per-signature +# children. Bulk actions per category: Authorize, Suppress, +# Show cleanup hint. +# DEPENDS: streamlit, services.authorize, cleanup_hints +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import os +import sys +from collections import defaultdict + +import streamlit as st + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from cleanup_hints import cleanup_hint # noqa: E402 +from services.authorize import authorize # noqa: E402 + + +_SEV_RANK = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + + +def _max_sev(rows: list) -> str: + out = "LOW" + for r in rows: + s = (r.get("severity") or "LOW").upper() + if _SEV_RANK.get(s, 0) > _SEV_RANK.get(out, 0): + out = s + return out + + +def _group_by_category(rows: list) -> dict: + out: dict = defaultdict(list) + for r in rows: + if r.get("status") == "resolved": + continue + out[r.get("category") or "unknown"].append(r) + return out + + +def render_grouped_risks(rows: list, store=None, owner_email: str = "") -> None: + """One section per category. Click to expand → per-signature rows. + `store` is required for the Authorize button to write to S3; + if None, button is hidden (read-only mode).""" + groups = _group_by_category(rows) + if not groups: + st.info("No open findings. Clean posture.") + return + + st.markdown( + '
OPEN FINDINGS — GROUPED
', + unsafe_allow_html=True, + ) + # Render sorted by severity then count. + items = sorted( + groups.items(), + key=lambda kv: (-_SEV_RANK.get(_max_sev(kv[1]), 0), -len(kv[1])), + ) + for cat, cat_rows in items: + max_sev = _max_sev(cat_rows) + last_seen = max(r.get("last_seen") or "" for r in cat_rows) + header = (f"{cat.replace('_', ' ').title()} — " + f"{len(cat_rows)} signature(s) · max sev {max_sev} · " + f"last seen {last_seen[:19] or '—'}") + with st.expander(header, expanded=False): + providers = sorted({r.get("provider", "") for r in cat_rows}) + for r in cat_rows[:50]: + pname = r.get("provider", "") + occ = r.get("occurrences", 1) + fseen = (r.get("first_seen") or "")[:19] + lseen = (r.get("last_seen") or "")[:19] + hint = cleanup_hint(cat, r.get("os_name", "")) + st.markdown( + f"
" + f"{pname} · {occ} occurrence(s) · " + f"{fseen} → {lseen}
" + f"💡 {hint}" + f"
", + unsafe_allow_html=True, + ) + # Bulk Authorize for this category (only if store + email available) + if store and owner_email: + btn_key = f"auth_cat_{cat}_{owner_email}" + if st.button( + f"✓ Authorize all {len(providers)} {cat.replace('_',' ')} provider(s) for {owner_email}", + key=btn_key, + ): + total = authorize(store, owner_email, providers) + st.success( + f"Authorized {len(providers)} provider(s). " + f"User's allow-list now has {total} entries. " + "Agent picks up on next scan." + ) diff --git a/ghost-ai-scanner/dashboard/ui/clickable_metric.py b/ghost-ai-scanner/dashboard/ui/clickable_metric.py index ead86fd..d2d0d6f 100644 --- a/ghost-ai-scanner/dashboard/ui/clickable_metric.py +++ b/ghost-ai-scanner/dashboard/ui/clickable_metric.py @@ -1,17 +1,21 @@ # ============================================================= # FILE: dashboard/ui/clickable_metric.py # PROJECT: PatronAI — Mega-PR (drill-down everywhere) -# VERSION: 1.0.0 -# UPDATED: 2026-04-27 +# VERSION: 1.1.0 +# UPDATED: 2026-05-11 # OWNER: Giggso Inc (Ravi Venugopal) # PURPOSE: Drop-in replacement for `st.metric()` that adds a thin # "↳ filter" button below the value. Clicking the button # opens a drill-down panel via drill_panel.set_drill(). -# Visually preserves Streamlit's native metric tile look so -# the dashboard rhythm stays unchanged. # DEPENDS: streamlit, drill_panel # AUDIT LOG: # v1.0.0 2026-04-27 Initial. Mega-PR. +# v1.1.0 2026-05-11 Add optional sub_label — small grey volume +# indicator beneath the value (e.g. "1020 scan +# events" under a Devices=1 card). Lets the +# inventory tab show distinct-device counts as +# the headline number while preserving the raw +# row count as a secondary signal. # ============================================================= from typing import Optional @@ -25,27 +29,33 @@ def clickable_metric(container, label: str, value, panel_key: str, drill_field: str, drill_value, drill_label: Optional[str] = None, delta: Optional[str] = None, - help_text: Optional[str] = None) -> None: + help_text: Optional[str] = None, + sub_label: Optional[str] = None) -> None: """Render an st.metric tile + a drill button beneath it. Args: container: Streamlit container/column to render into - label: Metric label ("Unauthorized events") + label: Metric label ("Devices") value: Metric value (int / str) - panel_key: Drill panel id (one per page region — KPIs of one tab - typically share a panel_key so clicking another KPI - replaces the previous drill instead of stacking) + panel_key: Drill panel id (one per page region) drill_field: Event dict key to filter on ("severity", "outcome", …) - drill_value: Value to match (e.g. "CRITICAL", "BLOCK") - drill_label: Chip text in the drill panel (defaults to f"{label}: {drill_value}") + drill_value: Value to match + drill_label: Chip text in the drill panel delta: Same as st.metric delta help_text: Same as st.metric help - - On click, sets the drill state — the calling page is expected to - invoke render_drill_panel(panel_key, events) somewhere below the - KPI row to surface the filtered table. + sub_label: Optional small grey volume indicator under the value + — e.g. "1020 scan events". Lets the headline number + represent distinct entities while preserving the raw + row count as a secondary signal. """ container.metric(label, value, delta=delta, help=help_text) + if sub_label: + container.markdown( + f"
" + f"{sub_label}
", + unsafe_allow_html=True, + ) btn_key = f"clk_{panel_key}_{label.replace(' ', '_').lower()}" if container.button(f"↳ filter", key=btn_key): set_drill( diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py b/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py index 69ee11e..f6b2b60 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_actions.py @@ -1,22 +1,22 @@ # ============================================================= # FILE: dashboard/ui/manager_tab_actions.py -# VERSION: 2.0.0 -# UPDATED: 2026-05-02 +# VERSION: 2.1.0 +# UPDATED: 2026-05-11 # OWNER: Giggso Inc # PURPOSE: Action helpers for the Manager Risks tab. -# mark_resolved — writes RESOLVED outcome back to S3 findings store. -# escalate — POSTs events to Trinity via dispatcher. -# send_alert_email — thin shim: delegates to notify.email.send_alert. +# mark_resolved — writes RESOLVED outcome back to S3. +# escalate — POSTs to Trinity via dispatcher. +# send_alert_email — thin shim → notify.email.send_alert. +# authorize_for_user — appends to per-user authorized list +# on S3; agent picks up on next scan. # All functions are pure — no Streamlit calls inside. -# DEPENDS: requests, alerter.dispatcher, notify.email +# DEPENDS: requests, alerter.dispatcher, notify.email, services.authorize # AUDIT LOG: # v1.0.0 2026-04-19 Initial — split from manager_tab_risks.py -# v2.0.0 2026-05-02 send_alert_email body collapsed to a one-line -# call into notify.email.send_alert. The duplicated -# boto3 SES path here used PATRONAI_FROM_EMAIL + -# skipped recipient verification, both inconsistent -# with the welcome / OTP paths. notify.email is -# now the single SES call site for the codebase. +# v2.0.0 2026-05-02 send_alert_email → notify.email single call site. +# v2.1.0 2026-05-11 Add authorize_for_user — closes the noise loop +# by teaching the agent which tools the operator +# has approved for a given user. # ============================================================= import logging @@ -89,3 +89,15 @@ def send_alert_email(events: list, recipients: str) -> bool: recipient list. Thin shim — actual SES work lives in notify.email.""" from notify.email import send_alert return send_alert(recipients=recipients, events=events) + + +def authorize_for_user(store, email: str, events: list) -> int: + """Append every distinct provider in `events` to the user's + authorized list on S3. Agent fetches the list at next scan and + filters those providers from emission — closing the noise loop + at source. Idempotent; returns the new total entry count. + Server-side `findings_compact` then auto-resolves the open + findings within the stale-window cycle.""" + from services.authorize import authorize # local — avoid hard dep on streamlit entry + providers = sorted({e.get("provider", "") for e in events if e.get("provider")}) + return authorize(store, email, providers) diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py b/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py index 2d534c2..3dc24b4 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_inventory.py @@ -1,17 +1,22 @@ # ============================================================= # FILE: dashboard/ui/manager_tab_inventory.py -# VERSION: 2.1.0 -# UPDATED: 2026-04-28 +# VERSION: 2.2.0 +# UPDATED: 2026-05-11 # OWNER: Giggso Inc (Ravi Venugopal) # PURPOSE: Inventory tab — asset metrics, CrowdStrike banner, asset table. -# v2: correct asset key (device_id > src_hostname > src_ip) and -# owner attribution (email > owner) so each asset shows its real -# user instead of collapsing to the last network event's owner. # AUDIT LOG: # v1.0.0 2026-04-19 Initial # v2.0.0 2026-04-27 Fix asset key + owner attribution for mixed # network/endpoint event streams. # v2.1.0 2026-04-28 Add ?view=user_detail hyperlink on OWNER column. +# v2.2.0 2026-05-11 KPI bug fix — "Endpoints" / "Cloud Instances" +# were counting EVENT ROWS (1 laptop with 1020 +# scan events showed Endpoints=1020). Now counts +# DISTINCT DEVICES via _asset_key. Labels +# renamed Endpoints→Devices, Cloud Instances→ +# Cloud Hosts. Adds a small "Scan Events" line +# under each card to preserve the volume signal +# without conflating it with device count. # ============================================================= import os @@ -23,6 +28,7 @@ from .filtered_table import search_box, apply_search_dicts from .clickable_metric import clickable_metric, static_metric from .drill_panel import render_drill_panel +from .ai_posture_card import render_ai_posture _PANEL = "mgr_inventory" @@ -41,7 +47,7 @@ def _owner_of(e: dict) -> str: def render_inventory(events: list) -> None: - """Asset summary KPIs, endpoint-protection banner, and asset table.""" + """AI Posture card (headline) → KPI cards → asset table.""" q = search_box("inventory", placeholder="search owner / IP / MAC …") if q: events = apply_search_dicts(events, q) @@ -49,25 +55,45 @@ def render_inventory(events: list) -> None: keys = [_asset_key(e) for e in events] unique_keys = list(dict.fromkeys(keys)) - laptops = sum(1 for e in events if e.get("asset_type") == "laptop") - ec2s = sum(1 for e in events if e.get("asset_type") == "ec2") - # Count assets that have at least one ENDPOINT_FINDING (actual outcome - # for exploded scan findings — not ALERT which is the un-exploded summary). - with_ai = len({_asset_key(e) for e in events - if e.get("outcome") == "ENDPOINT_FINDING"}) + # Headline — aggregated AI Posture card. Single risk score + + # per-category breakdown replaces the count-of-everything KPI noise. + # When compacted findings_current rows are available they're used; + # otherwise we degrade gracefully to raw events. + device_label = unique_keys[0] if len(unique_keys) == 1 else f"{len(unique_keys)} devices" + render_ai_posture(events, device_label=device_label) + + # KPIs count DISTINCT DEVICES, not event rows. A laptop emitting a + # scan every 30 min must show as 1 device + N scan events, never + # as N "endpoints". The pre-2.2.0 sum() over events.asset_type was + # the source of the inflated 1020 figure customers were seeing. + laptop_devices = len({_asset_key(e) for e in events + if e.get("asset_type") == "laptop"}) + cloud_devices = len({_asset_key(e) for e in events + if e.get("asset_type") == "ec2"}) + with_ai = len({_asset_key(e) for e in events + if e.get("outcome") == "ENDPOINT_FINDING"}) + + # Volume signals — preserved but shown beneath the device counts so + # the operator can still see "1 device, 1020 scan events" at a glance. + laptop_events = sum(1 for e in events if e.get("asset_type") == "laptop") + cloud_events = sum(1 for e in events if e.get("asset_type") == "ec2") + ai_events = sum(1 for e in events if e.get("outcome") == "ENDPOINT_FINDING") c1, c2, c3, c4 = st.columns(4) static_metric(c1, "Total Assets", len(unique_keys)) - clickable_metric(c2, "Endpoints", laptops, + clickable_metric(c2, "Devices", laptop_devices, panel_key=_PANEL, drill_field="asset_type", - drill_value="laptop", drill_label="Asset = laptop") - clickable_metric(c3, "Cloud Instances", ec2s, + drill_value="laptop", drill_label="Asset = laptop", + sub_label=f"{laptop_events} scan events") + clickable_metric(c3, "Cloud Hosts", cloud_devices, panel_key=_PANEL, drill_field="asset_type", - drill_value="ec2", drill_label="Asset = ec2") + drill_value="ec2", drill_label="Asset = ec2", + sub_label=f"{cloud_events} scan events") clickable_metric(c4, "With AI Events", with_ai, panel_key=_PANEL, drill_field="outcome", drill_value="ENDPOINT_FINDING", - drill_label="Endpoint findings") + drill_label="Endpoint findings", + sub_label=f"{ai_events} findings") render_drill_panel(_PANEL, events, limit=100) if not os.environ.get("CROWDSTRIKE_ENABLED", "false").lower() == "true": diff --git a/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py b/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py index 67f2bbc..2c9b40d 100644 --- a/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py +++ b/ghost-ai-scanner/dashboard/ui/manager_tab_risks.py @@ -19,10 +19,11 @@ import pandas as pd import streamlit as st -from .helpers import sev_badge -from .manager_tab_actions import mark_resolved, escalate, send_alert_email -from .time_fmt import fmt as fmt_time -from .filtered_table import filtered_table +from .helpers import sev_badge +from .manager_tab_actions import mark_resolved, escalate, send_alert_email +from .time_fmt import fmt as fmt_time +from .filtered_table import filtered_table +from .category_grouped_risks import render_grouped_risks sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) @@ -32,9 +33,21 @@ _SEV_ORDER = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2} -def render_risks(events: list) -> None: - """Selectable alert table + Mark Resolved / Escalate / Email actions.""" - alerts = sorted( # .get() guards missing severity on endpoint events +def render_risks(events: list, store=None, owner_email: str = "") -> None: + """Two views in one tab: + 1) Grouped view (default) — collapsible category cards with + bulk-authorize on each category. Reads compacted rows. + 2) Flat alert list — legacy table with Mark/Escalate/Email. + Operator can switch between them via a single toggle.""" + grouped = st.toggle("Grouped view (recommended)", value=True, + key="risks_grouped_toggle") + if grouped: + render_grouped_risks(events, store=store, owner_email=owner_email) + st.markdown("
", unsafe_allow_html=True) + st.caption("Switch off Grouped view above to see the flat alert table.") + return + + alerts = sorted( [e for e in events if e.get("severity") in _SEV_ORDER], key=lambda x: (_SEV_ORDER.get(x.get("severity", ""), 9), x.get("timestamp", "")), diff --git a/ghost-ai-scanner/main.py b/ghost-ai-scanner/main.py index 7c8614c..a96a918 100644 --- a/ghost-ai-scanner/main.py +++ b/ghost-ai-scanner/main.py @@ -14,13 +14,14 @@ # v1.2.0 2026-04-29 Hourly S3 rollup scheduler (per-user + per-tenant # trees) + chat-history S3 lifecycle policy at startup. # v1.3.0 2026-05-03 Fix llama-server 500 "Context size has been -# exceeded": added --parallel 1 (was defaulting to -# 4 slots, each getting only ctx_size/4 = 2048 -# tokens — chat prompts of 3000-4000 tokens -# exceeded that budget). Bumped --ctx-size 8192 → -# 16384 for comfortable Thinking-model headroom. -# Both knobs env-tunable (LLM_PARALLEL_SLOTS, -# LLM_CTX_SIZE). +# exceeded": added --parallel 1 + ctx 16384. +# v1.4.0 2026-05-11 Wire findings_compact daemon. Collapses raw +# findings/ (which append every 30 min) into a +# deduped findings_current/ view keyed on +# finding_signature. Also auto-resolves +# signatures unseen for STALE_CYCLES (default 24 +# cycles). Fixes 1-laptop-shows-1020-endpoints +# and 50-alerts-from-1-snapshot dashboard noise. # ============================================================= import glob @@ -43,8 +44,9 @@ from bootstrap import validate_env, build_store, load_settings, build_resolver, maybe_backfill, seed_config_files from rule_health import self_check_rules from threads import scanner_loop, alerter_backlog, url_refresh_loop, streamlit_proc -from jobs.hourly_rollup import scheduler_loop as rollup_scheduler_loop -from jobs.docs_refresh import docs_refresh_loop +from jobs.hourly_rollup import scheduler_loop as rollup_scheduler_loop +from jobs.docs_refresh import docs_refresh_loop +from jobs.findings_compact import scheduler_loop as compact_scheduler_loop _HF_REPO = os.environ.get("LLM_MODEL_REPO", "LiquidAI/LFM2.5-1.2B-Thinking-GGUF") _LLM_PORT = int(os.environ.get("LLM_SERVER_PORT", "8080")) @@ -141,6 +143,7 @@ def main(): threading.Thread(target=alerter_backlog, args=(store, resolver, settings, stop), name="alerter", daemon=True), threading.Thread(target=url_refresh_loop, args=(store, stop), name="url_refresh", daemon=True), threading.Thread(target=rollup_scheduler_loop, args=(stop, _ROLLUP_OFFSET_MIN), name="rollup_scheduler", daemon=True), + threading.Thread(target=compact_scheduler_loop, args=(store, stop), name="findings_compact", daemon=True), threading.Thread(target=docs_refresh_loop, args=(stop,), name="docs_refresh", daemon=True), threading.Thread(target=streamlit_proc, args=(stop,), name="streamlit", daemon=True), ] diff --git a/ghost-ai-scanner/src/cleanup_hints.py b/ghost-ai-scanner/src/cleanup_hints.py new file mode 100644 index 0000000..cc77948 --- /dev/null +++ b/ghost-ai-scanner/src/cleanup_hints.py @@ -0,0 +1,97 @@ +# ============================================================= +# FILE: src/cleanup_hints.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Per-category cleanup-suggestion mapping. +# Returns a HUMAN-READABLE hint operators can copy-paste to +# remove an AI tool from a device. The agent NEVER executes +# these — that's a deliberate security boundary. The hint is +# rendered next to the finding so the operator can decide. +# DEPENDS: (stdlib only) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +# Hint resolved from (category, os_name) → string. None means "no +# automatic suggestion — operator's call." OS is best-effort; falls +# back to a cross-platform hint when device OS is unknown. +_HINTS = { + "process": { + "darwin": "Quit the app + remove from /Applications/. " + "macOS Login Items: System Settings → General → Login Items.", + "linux": "killall ; check ~/.config/autostart/ and " + "systemd --user list-unit-files for restart triggers.", + "windows": "Task Manager → End task; remove from Startup tab.", + "*": "Stop the running process and remove from auto-start.", + }, + "package": { + "*": "Uninstall from the relevant package manager: " + "pip uninstall | npm uninstall -g | " + "brew uninstall ", + }, + "ide_plugin": { + "*": "VS Code/Cursor: code --uninstall-extension . " + "JetBrains: Preferences → Plugins → Uninstall.", + }, + "mcp_server": { + "darwin": "Edit ~/Library/Application Support/Claude/" + "claude_desktop_config.json — remove the entry under " + "`mcpServers`. Restart Claude Desktop.", + "windows": "Edit %APPDATA%\\Claude\\claude_desktop_config.json — " + "remove the entry. Restart Claude Desktop.", + "linux": "Edit ~/.config/Claude/claude_desktop_config.json — " + "remove the entry. Restart Claude Desktop.", + "*": "Remove the entry from your MCP host config and " + "restart the host (Claude Desktop / Cursor / Continue).", + }, + "agent_workflow": { + "*": "Delete or move the workflow file out of the watched path " + "(or stop the orchestrator: e.g. `pm2 stop flowise`).", + }, + "agent_scheduled": { + "darwin": "launchctl unload ~/Library/LaunchAgents/; " + "rm ~/Library/LaunchAgents/", + "linux": "crontab -e and remove the line, or " + "systemctl --user disable .", + "windows": "schtasks /Delete /TN ", + "*": "Disable the cron/launchd/scheduled-task that " + "triggers this workflow.", + }, + "vector_db": { + "*": "Locate the file/folder via the `path_safe` field and " + "`rm -rf `. Sensitive: vector DBs can contain " + "embeddings derived from your code/docs.", + }, + "browser": { + "*": "No on-device action — close the tab or block the domain " + "via your proxy / browser extension allowlist.", + }, + "container_image": { + "*": "docker rmi (and stop any container running it).", + }, + "container_log_signal": { + "*": "Inspect the container; rotate any API key that leaked " + "into logs.", + }, + "shell_history": { + "*": "Past command — usually informational. If it leaked a " + "secret, rotate it.", + }, + "tool_registration": { + "*": "Code-level @tool decorator — review the repo and remove " + "if not authorised.", + }, +} + + +def cleanup_hint(category: str, os_name: str = "") -> str: + """Return a human-readable cleanup suggestion for a finding. + `os_name` is optional — falls back to the cross-platform hint. + Returns empty string when no hint is known.""" + cat = (category or "").lower() + os_ = (os_name or "").lower() + bucket = _HINTS.get(cat) + if not bucket: + return "" + return bucket.get(os_) or bucket.get("*") or "" diff --git a/ghost-ai-scanner/src/jobs/findings_compact.py b/ghost-ai-scanner/src/jobs/findings_compact.py new file mode 100644 index 0000000..25e6647 --- /dev/null +++ b/ghost-ai-scanner/src/jobs/findings_compact.py @@ -0,0 +1,174 @@ +# ============================================================= +# FILE: src/jobs/findings_compact.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Background job — collapse raw findings/ into a deduped +# findings_current/ view keyed on finding_signature. +# One real-world condition (e.g. "Cursor running on this +# MacBook") becomes one row with first_seen / last_seen / +# occurrences, not N rows for N scan cycles. +# Also auto-resolves stale signatures whose last_seen is +# older than STALE_CYCLES * SCAN_INTERVAL_SECS. +# WHY: Agent emits full state every 30 min. Without compaction the +# dashboard shows 1020 "endpoints" for a 1-laptop fleet and +# 50 alerts from a single snapshot. Raw findings/ is kept +# untouched for audit fidelity (Bruce's mandate); the +# compacted view is what dashboard + chat read at query time. +# USAGE: scheduler_loop(stop) — daemon thread, runs every 5 min. +# One-shot: compact_window(start_iso, end_iso). +# DEPENDS: store.findings_store, store.base_store +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. Ships with the +# fix/dashboard-noise-drama-mode branch. +# ============================================================= + +import json +import logging +import os +import time +import threading +from collections import defaultdict +from datetime import date, datetime, timedelta, timezone +from typing import Optional + +log = logging.getLogger("marauder-scan.jobs.findings_compact") + +# How often the compaction job runs. 5 min is sub-cycle (scanner runs +# at SCAN_INTERVAL_SECS=300 default), so the compacted view trails the +# raw findings by at most one cycle. +COMPACT_INTERVAL_S = int(os.environ.get("COMPACT_INTERVAL_S", "300")) + +# A signature unseen for STALE_CYCLES consecutive scan cycles is +# auto-resolved. Default = 24 cycles × 30 min = 12 h. Configurable so +# operators can tighten or loosen the auto-close window. +STALE_CYCLES = int(os.environ.get("AUTO_RESOLVE_STALE_CYCLES", "24")) +SCAN_INTERVAL_S = int(os.environ.get("SCAN_INTERVAL_SECS", "300")) + + +def _today_iso() -> str: + return date.today().isoformat() + + +def _current_key(day_iso: str) -> str: + """findings_current/YYYY/MM/DD/by_signature.jsonl""" + return f"findings_current/{day_iso.replace('-', '/')}/by_signature.jsonl" + + +def _parse_ts(value: str) -> Optional[datetime]: + """Lenient ISO parse; returns None on garbage.""" + if not value: + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + +def compact_day(store, day_iso: str) -> dict: + """Read every raw finding for one UTC day, group by signature, + write the deduped view to findings_current/. + + Returns counts so the scheduler can log a one-line summary. + """ + if store is None: + return {"raw_rows": 0, "signatures": 0, "auto_resolved": 0} + + raw_rows = 0 + by_sig: dict = defaultdict(lambda: { + "first_seen": None, "last_seen": None, "occurrences": 0, + "sample": None, + }) + + # Read all 4 severity files for the day; merge into one signature map. + for sev in store.findings.SEVERITY_FILES: + df = store.findings.read(target_date=day_iso, severity=sev, limit=10_000) + if df.is_empty(): + continue + for row in df.iter_rows(named=True): + sig = row.get("finding_signature") + if not sig: + # Legacy rows pre-v2.2 didn't have a signature; skip them + # gracefully — they'll roll off via the 24h compaction + # window naturally. + continue + raw_rows += 1 + ts = _parse_ts(row.get("timestamp")) + slot = by_sig[sig] + slot["occurrences"] += 1 + if slot["first_seen"] is None or (ts and ts < slot["first_seen"]): + slot["first_seen"] = ts + if slot["last_seen"] is None or (ts and ts > slot["last_seen"]): + slot["last_seen"] = ts + if slot["sample"] is None: + slot["sample"] = row + + # Auto-resolve any signature whose last_seen is older than the + # stale threshold. Resolution is RECORDED on the compacted row + # (resolved_by + resolved_reason) — raw findings/ never touched. + now = datetime.now(timezone.utc) + cutoff = now - timedelta(seconds=STALE_CYCLES * SCAN_INTERVAL_S) + auto_resolved = 0 + out_lines = [] + for sig, slot in by_sig.items(): + sample = dict(slot["sample"] or {}) + sample["finding_signature"] = sig + sample["first_seen"] = slot["first_seen"].isoformat() if slot["first_seen"] else None + sample["last_seen"] = slot["last_seen"].isoformat() if slot["last_seen"] else None + sample["occurrences"] = slot["occurrences"] + if slot["last_seen"] and slot["last_seen"] < cutoff: + sample["status"] = "resolved" + sample["resolved_by"] = "auto" + sample["resolved_reason"] = f"not_seen_{STALE_CYCLES}_cycles" + auto_resolved += 1 + else: + sample["status"] = "open" + out_lines.append(json.dumps(sample)) + + # Replace the day's compacted view in one atomic put — idempotent + # by design, so re-running for the same day produces identical output. + key = _current_key(day_iso) + body = ("\n".join(out_lines) + "\n").encode() if out_lines else b"" + try: + store.findings._put(key, body, "application/x-ndjson") + except Exception as exc: + log.error("compact_day put failed for %s: %s", key, exc) + + summary = {"raw_rows": raw_rows, "signatures": len(by_sig), + "auto_resolved": auto_resolved} + log.info("compact %s: %s raw rows → %s signatures (%s auto-resolved)", + day_iso, raw_rows, len(by_sig), auto_resolved) + return summary + + +def compact_window(store, start_iso: str, end_iso: str) -> dict: + """One-shot: compact every UTC day in [start_iso, end_iso] inclusive. + Used by the catch-up path on startup, and by the operator CLI.""" + start = date.fromisoformat(start_iso) + end = date.fromisoformat(end_iso) + if end < start: + return {"days": 0, "raw_rows": 0, "signatures": 0, "auto_resolved": 0} + totals = {"days": 0, "raw_rows": 0, "signatures": 0, "auto_resolved": 0} + day = start + while day <= end: + result = compact_day(store, day.isoformat()) + totals["days"] += 1 + totals["raw_rows"] += result["raw_rows"] + totals["signatures"] += result["signatures"] + totals["auto_resolved"] += result["auto_resolved"] + day += timedelta(days=1) + return totals + + +def scheduler_loop(store, stop: threading.Event) -> None: + """Daemon thread target. Runs compact_day(today) every COMPACT_INTERVAL_S.""" + log.info("findings_compact scheduler started — interval=%ss stale_cycles=%s", + COMPACT_INTERVAL_S, STALE_CYCLES) + while not stop.is_set(): + t0 = time.time() + try: + compact_day(store, _today_iso()) + except Exception as exc: + log.error("findings_compact cycle error: %s", exc, exc_info=True) + stop.wait(timeout=max(0, COMPACT_INTERVAL_S - (time.time() - t0))) + log.info("findings_compact scheduler stopped") diff --git a/ghost-ai-scanner/src/normalizer/agent_explode.py b/ghost-ai-scanner/src/normalizer/agent_explode.py index 480f772..e10b8f1 100644 --- a/ghost-ai-scanner/src/normalizer/agent_explode.py +++ b/ghost-ai-scanner/src/normalizer/agent_explode.py @@ -18,6 +18,7 @@ # and _provider_for(). Identity bundle untouched. # ============================================================= +import hashlib import json import logging @@ -25,6 +26,28 @@ log = logging.getLogger("marauder-scan.normalizer.agent_explode") + +def _finding_signature(event: dict) -> str: + """Stable hash for entity-level dedup across re-emissions. + + Two findings with the same signature represent the SAME real-world + fact — e.g. "Cursor is running on this MacBook" — even when the + agent re-emits the scan every 30 min. The findings_compact background + job (src/jobs/findings_compact.py) merges them into a single row with + first_seen / last_seen / occurrences, so the dashboard shows 1 finding + not 21. + + Key dimensions: device + provider + category + the distinctive name + field (process_name OR dst_domain depending on category). + """ + key = "|".join([ + event.get("device_uuid") or event.get("device_id") or "", + event.get("provider", ""), + event.get("category", ""), + event.get("process_name") or event.get("dst_domain") or "", + ]) + return hashlib.sha256(key.encode()).hexdigest()[:16] + # Severity tier per finding type. Drives alerter routing — HIGH+ goes to # SNS / Trinity / SES; MEDIUM/LOW land on dashboard only. _FINDING_SEVERITY = { @@ -124,6 +147,10 @@ def explode_endpoint_findings(raw: dict, company: str, bind_identity) -> list: _copy_phase_1a_fields(event, f) # Pass through scan_kind so dashboard can split baseline vs recurring. event["scan_kind"] = raw.get("scan_kind", "recurring") + # Stable signature for entity-level dedup. Same Cursor process + # on the same device produces the same signature every cycle — + # findings_compact uses it to collapse 21 hourly rows into 1. + event["finding_signature"] = _finding_signature(event) event["notes"] = json.dumps({ "scan_id": sid, "finding": f, "token": raw.get("token", ""), }) diff --git a/ghost-ai-scanner/src/scoring/__init__.py b/ghost-ai-scanner/src/scoring/__init__.py new file mode 100644 index 0000000..72008b5 --- /dev/null +++ b/ghost-ai-scanner/src/scoring/__init__.py @@ -0,0 +1 @@ +# Scoring package — risk scoring + aggregated posture metrics. diff --git a/ghost-ai-scanner/src/scoring/risk_score.py b/ghost-ai-scanner/src/scoring/risk_score.py new file mode 100644 index 0000000..4c10942 --- /dev/null +++ b/ghost-ai-scanner/src/scoring/risk_score.py @@ -0,0 +1,103 @@ +# ============================================================= +# FILE: src/scoring/risk_score.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Weighted risk-score calculation over a set of findings. +# Drives the aggregated AI Posture card (one number instead +# of a row-soup of severities). Pure functions — no I/O — +# so unit-testable without S3 / Streamlit / Polars. +# DEPENDS: (stdlib only) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. Ships with feat/dashboard-posture. +# ============================================================= + +from collections import defaultdict +from typing import Iterable + +# Per-severity weight contributed by ONE distinct signature. Tuned so a +# single CRITICAL finding (in a high-priority category like "process") +# crosses the CRITICAL band threshold of 75 on its own: +# 50 * 1.5 (process multiplier) = 75 → red. Locked in +# test_one_critical_drives_red. +_SEV_WEIGHT = { + "CRITICAL": 50, + "HIGH": 12, + "MEDIUM": 4, + "LOW": 1, +} + +# Category multipliers. A running process is more urgent than a stale +# shell history line even at the same severity. Applied after the +# severity weight. +_CATEGORY_MULT = { + "process": 1.5, + "mcp_server": 1.4, + "agent_workflow": 1.3, + "agent_scheduled": 1.3, + "browser": 1.1, + "container_log_signal": 1.2, + "vector_db": 1.0, + "package": 0.9, + "ide_plugin": 0.9, + "container_image": 0.8, + "tool_registration": 0.7, + "shell_history": 0.5, +} + +# Cap — anything above this clamps to RED (100). Empirically a device +# with ~6 unauthorised running tools sits around 90; cap protects the +# UI from showing 4-digit "risk". +_SCORE_CAP = 100 + + +def _row_weight(row: dict) -> float: + """Score contribution from one compacted finding row.""" + if row.get("status") == "resolved": + return 0.0 + sev = (row.get("severity") or "LOW").upper() + cat = (row.get("category") or "").lower() + base = _SEV_WEIGHT.get(sev, 1) + mult = _CATEGORY_MULT.get(cat, 1.0) + # Occurrences: log-dampened — a thing that re-appears 100 times is + # not 100× as bad as seen once, but is worse than seen once. + occ = int(row.get("occurrences") or 1) + occ_factor = 1 + min(0.5, 0.05 * (occ - 1)) + return base * mult * occ_factor + + +def risk_score(rows: Iterable[dict]) -> int: + """Aggregate risk score 0-100 for a set of compacted finding rows. + Rows expected to come from findings_current (one per signature).""" + total = sum(_row_weight(r) for r in rows) + return int(min(_SCORE_CAP, round(total))) + + +def risk_band(score: int) -> str: + """Human label for a 0-100 score — drives card colour.""" + if score >= 75: return "CRITICAL" + if score >= 40: return "HIGH" + if score >= 15: return "MEDIUM" + if score > 0: return "LOW" + return "CLEAN" + + +def posture_breakdown(rows: Iterable[dict]) -> dict: + """Group OPEN signatures by category for the posture card. + Returns {category: {count, max_severity, last_seen}}.""" + out: dict = defaultdict(lambda: {"count": 0, "max_severity": "LOW", + "last_seen": ""}) + sev_rank = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} + for r in rows: + if r.get("status") == "resolved": + continue + cat = r.get("category") or "unknown" + slot = out[cat] + slot["count"] += 1 + sev = (r.get("severity") or "LOW").upper() + if sev_rank.get(sev, 0) > sev_rank.get(slot["max_severity"], 0): + slot["max_severity"] = sev + ls = r.get("last_seen") or "" + if ls > slot["last_seen"]: + slot["last_seen"] = ls + return dict(out) diff --git a/ghost-ai-scanner/src/services/__init__.py b/ghost-ai-scanner/src/services/__init__.py new file mode 100644 index 0000000..b73b7cc --- /dev/null +++ b/ghost-ai-scanner/src/services/__init__.py @@ -0,0 +1 @@ +# Services package — orchestration helpers that span store/notify/etc. diff --git a/ghost-ai-scanner/src/services/authorize.py b/ghost-ai-scanner/src/services/authorize.py new file mode 100644 index 0000000..1195ac5 --- /dev/null +++ b/ghost-ai-scanner/src/services/authorize.py @@ -0,0 +1,104 @@ +# ============================================================= +# FILE: src/services/authorize.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc (Ravi Venugopal) +# PURPOSE: Write per-user authorized-provider lists to S3 so the hook +# agent can fetch them on next scan and stop emitting noise +# for tools the operator has explicitly approved. +# Storage layout: +# s3:///config/authorized/{email_safe}.json +# Body: +# {"version": 1, "updated_at": "...", "providers": [...]} +# Companion scan_authorize_fetch.py.frag pulls this at scan +# time. Findings whose provider is in the list never reach +# the dashboard. +# DEPENDS: store.base_store (any blob store) +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import json +import logging +import re +from datetime import datetime, timezone + +log = logging.getLogger("marauder-scan.services.authorize") + +# Same character set as we use elsewhere for filesystem-safe email. +_EMAIL_SAFE_RE = re.compile(r"[^a-zA-Z0-9._-]") + + +def _safe_email(email: str) -> str: + """Filesystem-safe email — replaces @ + special chars with _.""" + return _EMAIL_SAFE_RE.sub("_", (email or "").strip().lower()) + + +def _key_for(email: str) -> str: + """S3 key for one user's authorized list.""" + return f"config/authorized/{_safe_email(email)}.json" + + +def load_authorized(store, email: str) -> dict: + """Read the current authorized list for a user. + Returns {"version": 1, "updated_at": "...", "providers": [...]} + or a fresh empty doc if none exists yet.""" + if not email: + return {"version": 1, "providers": [], "updated_at": ""} + try: + raw = store.findings._get(_key_for(email)) + if not raw: + return {"version": 1, "providers": [], "updated_at": ""} + doc = json.loads(raw.decode()) + # Tolerate older shapes — coerce to canonical. + return { + "version": int(doc.get("version", 1)), + "providers": sorted(set(doc.get("providers", []))), + "updated_at": doc.get("updated_at", ""), + } + except Exception as exc: + log.error("load_authorized failed for %s: %s", email, exc) + return {"version": 1, "providers": [], "updated_at": ""} + + +def authorize(store, email: str, providers: list) -> int: + """Add `providers` to the user's authorized list. Idempotent — + duplicates collapse. Returns the new total count after merge.""" + if not email or not providers: + return 0 + doc = load_authorized(store, email) + merged = sorted(set(doc["providers"]) | set(str(p) for p in providers if p)) + new_doc = { + "version": 1, + "providers": merged, + "updated_at": datetime.now(timezone.utc).isoformat(), + } + body = json.dumps(new_doc, indent=2).encode() + try: + store.findings._put(_key_for(email), body, "application/json") + log.info("authorize: %s now has %d entries (added %s)", + email, len(merged), providers) + except Exception as exc: + log.error("authorize put failed for %s: %s", email, exc) + return len(doc["providers"]) + return len(merged) + + +def revoke(store, email: str, providers: list) -> int: + """Remove `providers` from the user's authorized list. + Returns the new total count after removal.""" + if not email or not providers: + return 0 + doc = load_authorized(store, email) + remaining = sorted(set(doc["providers"]) - set(str(p) for p in providers if p)) + new_doc = { + "version": 1, + "providers": remaining, + "updated_at": datetime.now(timezone.utc).isoformat(), + } + body = json.dumps(new_doc, indent=2).encode() + try: + store.findings._put(_key_for(email), body, "application/json") + except Exception as exc: + log.error("revoke put failed for %s: %s", email, exc) + return len(remaining) diff --git a/ghost-ai-scanner/tests/unit/test_authorize_service.py b/ghost-ai-scanner/tests/unit/test_authorize_service.py new file mode 100644 index 0000000..2ed03cc --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_authorize_service.py @@ -0,0 +1,113 @@ +# ============================================================= +# FILE: tests/unit/test_authorize_service.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the authorize service — once a user authorises a +# tool, the agent must see it forever (unless revoked). +# ============================================================= + +import json +import sys +from pathlib import Path + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from services.authorize import ( + authorize, revoke, load_authorized, _safe_email, _key_for, +) + + +class _StubFindings: + """Minimal stub for the FindingsStore — only _get and _put used.""" + def __init__(self): + self._blob = {} + def _get(self, key): + return self._blob.get(key) + def _put(self, key, body, ctype): + self._blob[key] = body + return True + + +class _StubStore: + def __init__(self): + self.findings = _StubFindings() + + +def test_safe_email_strips_at_and_special_chars(): + assert _safe_email("Ravi@Giggso.COM") == "ravi_giggso.com" + assert _safe_email("a+b@c.io") == "a_b_c.io" + + +def test_key_per_user_isolated(): + a = _key_for("a@x.com") + b = _key_for("b@x.com") + assert a != b + assert a.startswith("config/authorized/") + + +def test_authorize_creates_new_doc(): + s = _StubStore() + n = authorize(s, "ravi@giggso.com", ["cursor", "flowise"]) + assert n == 2 + doc = load_authorized(s, "ravi@giggso.com") + assert sorted(doc["providers"]) == ["cursor", "flowise"] + assert doc["updated_at"] + + +def test_authorize_is_idempotent(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["cursor"]) + assert len(load_authorized(s, "ravi@giggso.com")["providers"]) == 1 + + +def test_authorize_merges_lists(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor"]) + authorize(s, "ravi@giggso.com", ["flowise", "ollama"]) + providers = load_authorized(s, "ravi@giggso.com")["providers"] + assert set(providers) == {"cursor", "flowise", "ollama"} + + +def test_revoke_removes_entries(): + s = _StubStore() + authorize(s, "ravi@giggso.com", ["cursor", "flowise"]) + revoke(s, "ravi@giggso.com", ["cursor"]) + providers = load_authorized(s, "ravi@giggso.com")["providers"] + assert providers == ["flowise"] + + +def test_empty_inputs_are_noop(): + s = _StubStore() + assert authorize(s, "", ["cursor"]) == 0 + assert authorize(s, "ravi@giggso.com", []) == 0 + + +def test_per_user_isolation(): + """Authorising for user A must not touch user B's list.""" + s = _StubStore() + authorize(s, "a@giggso.com", ["cursor"]) + authorize(s, "b@giggso.com", ["flowise"]) + a = load_authorized(s, "a@giggso.com")["providers"] + b = load_authorized(s, "b@giggso.com")["providers"] + assert "flowise" not in a + assert "cursor" not in b + + +def test_load_authorized_handles_garbage_gracefully(): + s = _StubStore() + s.findings._blob[_key_for("x@y.com")] = b"not-json-at-all" + doc = load_authorized(s, "x@y.com") + assert doc["providers"] == [] # graceful fallback + + +def test_load_authorized_canonicalises_legacy_shapes(): + s = _StubStore() + s.findings._blob[_key_for("x@y.com")] = json.dumps( + {"providers": ["b", "a", "b"]} + ).encode() + doc = load_authorized(s, "x@y.com") + assert doc["providers"] == ["a", "b"] # sorted + unique diff --git a/ghost-ai-scanner/tests/unit/test_cleanup_hints.py b/ghost-ai-scanner/tests/unit/test_cleanup_hints.py new file mode 100644 index 0000000..2040d46 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_cleanup_hints.py @@ -0,0 +1,60 @@ +# ============================================================= +# FILE: tests/unit/test_cleanup_hints.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the cleanup-hint contract — every supported category +# must produce a non-empty suggestion. Operators read these +# on every dashboard row. +# ============================================================= + +import sys +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from cleanup_hints import cleanup_hint + + +# Every category referenced by agent_explode._FINDING_SEVERITY must +# have a non-empty default ("*") hint. Adding a new category to the +# agent without a matching hint regresses this test → forces an update. +_SUPPORTED_CATEGORIES = [ + "browser", "process", "container_log_signal", "package", + "ide_plugin", "container_image", "shell_history", + "mcp_server", "agent_workflow", "agent_scheduled", + "tool_registration", "vector_db", +] + + +@pytest.mark.parametrize("cat", _SUPPORTED_CATEGORIES) +def test_every_category_has_default_hint(cat): + h = cleanup_hint(cat) + assert h, f"category {cat!r} has no cleanup hint — add one" + + +def test_os_specific_hint_used_when_available(): + mac = cleanup_hint("mcp_server", os_name="darwin") + lin = cleanup_hint("mcp_server", os_name="linux") + win = cleanup_hint("mcp_server", os_name="windows") + assert "Library/Application Support" in mac + assert ".config/Claude" in lin + assert "%APPDATA%" in win + assert mac != lin != win + + +def test_unknown_category_returns_empty_string(): + assert cleanup_hint("not_a_category") == "" + assert cleanup_hint("") == "" + + +def test_unknown_os_falls_back_to_wildcard(): + h = cleanup_hint("process", os_name="bsd") + assert h # falls back to "*" + + +def test_case_insensitive_inputs(): + assert cleanup_hint("PROCESS", os_name="DARWIN") diff --git a/ghost-ai-scanner/tests/unit/test_findings_compact.py b/ghost-ai-scanner/tests/unit/test_findings_compact.py new file mode 100644 index 0000000..ba26979 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_findings_compact.py @@ -0,0 +1,183 @@ +# ============================================================= +# FILE: tests/unit/test_findings_compact.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the dashboard-noise fix. +# The replay-21x test was Zaid's mandate from the Drama-mode +# panel — feed the explode function the same scan blob 21 +# times and assert distinct findings == N_providers, not +# 21 * N_providers. Without this test, the v2.2 fix could +# silently regress on any future refactor. +# DEPENDS: pytest, normalizer.agent_explode, jobs.findings_compact +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import json +import sys +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from normalizer.agent_explode import explode_endpoint_findings, _finding_signature # noqa: E402 +from jobs.findings_compact import compact_day # noqa: E402 + + +def _bind_identity(event: dict, raw: dict) -> None: + """Test stub for the identity binder normally injected by the + pipeline. Just copy the device-identifying fields onto the event.""" + event["device_id"] = raw.get("device_id", "") + event["device_uuid"] = raw.get("device_uuid", "") + event["email"] = raw.get("email", "") + event["src_ip"] = raw.get("src_ip", "") + + +def _make_snapshot(provider_count: int = 5) -> dict: + """Build one ENDPOINT_SCAN blob with `provider_count` synthetic + findings — mimics the agent's 30-min payload.""" + findings = [] + for i in range(provider_count): + findings.append({ + "type": "process", + "name": f"ai-tool-{i}", + "domain": "", + }) + return { + "token": "tok-test", + "company": "acme", + "device_id": "MacBook-Pro-154.local", + "device_uuid": "AAAA-BBBB-CCCC-DDDD", + "email": "ravi@giggso.com", + "timestamp": "2026-05-11T10:00:00+00:00", + "scan_kind": "recurring", + "findings": findings, + } + + +# ── Part A — explode emits a signature ─────────────────────────────── + +def test_explode_emits_finding_signature(): + """Every emitted event must carry a stable finding_signature.""" + snap = _make_snapshot(provider_count=3) + events = explode_endpoint_findings(snap, "acme", _bind_identity) + assert len(events) == 3 + for e in events: + assert e.get("finding_signature"), "missing finding_signature on emitted event" + assert len(e["finding_signature"]) == 16, "signature should be 16-char SHA-256 prefix" + + +def test_signature_stable_across_re_emissions(): + """Same snapshot replayed N times must produce identical signatures.""" + snap = _make_snapshot(provider_count=5) + run_a = explode_endpoint_findings(snap, "acme", _bind_identity) + run_b = explode_endpoint_findings(snap, "acme", _bind_identity) + sigs_a = sorted(e["finding_signature"] for e in run_a) + sigs_b = sorted(e["finding_signature"] for e in run_b) + assert sigs_a == sigs_b, "signatures drifted between identical replays" + + +def test_signature_changes_when_provider_changes(): + """Different provider on same device → different signature.""" + snap_a = _make_snapshot(provider_count=1) + snap_b = _make_snapshot(provider_count=1) + snap_b["findings"][0]["name"] = "different-tool" + sig_a = explode_endpoint_findings(snap_a, "acme", _bind_identity)[0]["finding_signature"] + sig_b = explode_endpoint_findings(snap_b, "acme", _bind_identity)[0]["finding_signature"] + assert sig_a != sig_b + + +# ── Part B — Zaid's replay-21x mandate ────────────────────────────── + +def test_replay_21_times_collapses_to_n_providers(): + """THE test. Replay the same snapshot 21 times (mimics 21 hourly + scan cycles). After compaction, distinct signatures == N_providers, + not 21*N. This is the contract that proves the dashboard will + stop showing 1020 endpoints for a 1-laptop fleet.""" + snap = _make_snapshot(provider_count=5) + all_events = [] + for _ in range(21): + all_events.extend(explode_endpoint_findings(snap, "acme", _bind_identity)) + assert len(all_events) == 21 * 5, "explode itself should still emit per-cycle" + + distinct_sigs = {e["finding_signature"] for e in all_events} + assert len(distinct_sigs) == 5, ( + f"expected 5 distinct signatures after 21 cycles × 5 providers, " + f"got {len(distinct_sigs)} — dedup will break" + ) + + +# ── Part C — compact_day groups by signature ──────────────────────── + +class _StubDataFrame: + """Minimal polars-DataFrame-shaped stub so compact_day can iterate.""" + def __init__(self, rows): self._rows = rows + def is_empty(self): return not self._rows + def iter_rows(self, named=True): return iter(self._rows) + + +class _StubFindingsStore: + SEVERITY_FILES = ["critical", "high", "medium", "unknown"] + def __init__(self, rows_by_sev): + self._rows = rows_by_sev + self.writes = [] + def read(self, target_date, severity, limit=10_000): + return _StubDataFrame(self._rows.get(severity, [])) + def _put(self, key, body, ctype): + self.writes.append((key, body)) + return True + + +class _StubStore: + def __init__(self, findings_store): self.findings = findings_store + + +def test_compact_day_groups_by_signature(): + """21 raw rows for the same signature → 1 compacted row with + occurrences=21 and last_seen >= first_seen.""" + snap = _make_snapshot(provider_count=1) + events = [] + for hour in range(21): + for ev in explode_endpoint_findings(snap, "acme", _bind_identity): + ev["timestamp"] = f"2026-05-11T{hour:02d}:00:00+00:00" + events.append(ev) + fs = _StubFindingsStore({"high": events}) + store = _StubStore(fs) + + summary = compact_day(store, "2026-05-11") + assert summary["raw_rows"] == 21 + assert summary["signatures"] == 1 + assert len(fs.writes) == 1 + body = fs.writes[0][1].decode().strip().splitlines() + row = json.loads(body[0]) + assert row["occurrences"] == 21 + assert row["first_seen"] is not None + assert row["last_seen"] is not None + assert row["first_seen"] <= row["last_seen"] + + +def test_compact_day_auto_resolves_stale_signatures(monkeypatch): + """A signature whose last_seen is older than STALE_CYCLES * + SCAN_INTERVAL gets status=resolved, resolved_by=auto.""" + # Force a tight stale window so the test is deterministic. + monkeypatch.setenv("AUTO_RESOLVE_STALE_CYCLES", "1") + monkeypatch.setenv("SCAN_INTERVAL_SECS", "60") + import importlib, jobs.findings_compact as fc + importlib.reload(fc) + + snap = _make_snapshot(provider_count=1) + ev = explode_endpoint_findings(snap, "acme", _bind_identity)[0] + ev["timestamp"] = "2020-01-01T00:00:00+00:00" # ancient + + fs = _StubFindingsStore({"high": [ev]}) + store = _StubStore(fs) + summary = fc.compact_day(store, "2020-01-01") + + assert summary["auto_resolved"] == 1 + body = fs.writes[0][1].decode().strip().splitlines() + row = json.loads(body[0]) + assert row["status"] == "resolved" + assert row["resolved_by"] == "auto" diff --git a/ghost-ai-scanner/tests/unit/test_inventory_kpi_distinct.py b/ghost-ai-scanner/tests/unit/test_inventory_kpi_distinct.py new file mode 100644 index 0000000..eb66438 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_inventory_kpi_distinct.py @@ -0,0 +1,80 @@ +# ============================================================= +# FILE: tests/unit/test_inventory_kpi_distinct.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the v2.2 KPI bug fix in manager_tab_inventory — +# "Devices" must reflect distinct device count, never the +# raw event-row count. Pre-fix bug: 1 laptop with 1020 scan +# events rendered as Devices=1020. Regression here ships +# embarrassment direct to the customer. +# AUDIT LOG: +# v1.0.0 2026-05-11 Initial. +# ============================================================= + +import sys +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT)) + +# We don't need streamlit here — only the pure helper. +from dashboard.ui.manager_tab_inventory import _asset_key # noqa: E402 + + +def _laptop_event(device: str, ts: str) -> dict: + """Synthetic ENDPOINT_SCAN event row as it lands on dashboard.""" + return { + "device_id": device, + "asset_type": "laptop", + "timestamp": ts, + "outcome": "ENDPOINT_FINDING", + "email": "ravi@giggso.com", + } + + +def test_asset_key_prefers_device_id(): + e = {"device_id": "Mac-A", "src_hostname": "x", "src_ip": "1.1.1.1"} + assert _asset_key(e) == "Mac-A" + + +def test_asset_key_falls_back_to_hostname_then_ip(): + assert _asset_key({"src_hostname": "host-b", "src_ip": "2.2.2.2"}) == "host-b" + assert _asset_key({"src_ip": "3.3.3.3"}) == "3.3.3.3" + assert _asset_key({}) == "unknown" + + +def test_distinct_device_count_one_laptop_many_scans(): + """The dashboard bug expressed as a contract: 1020 scan events + from the SAME laptop must produce exactly 1 distinct device.""" + events = [ + _laptop_event("Mac-A", f"2026-05-11T{h:02d}:00:00Z") + for h in range(24) for _ in range(43) # 24 * 43 ≈ 1020 + ] + assert len(events) >= 1000 + + distinct = len({_asset_key(e) for e in events + if e.get("asset_type") == "laptop"}) + assert distinct == 1, ( + f"distinct device count drifted — got {distinct}, expected 1. " + f"This is the bug that shipped Devices=1020 to the customer." + ) + + +def test_two_laptops_count_as_two(): + events = [_laptop_event(d, "2026-05-11T00:00:00Z") + for d in ("Mac-A", "Mac-B") + for _ in range(50)] + distinct = len({_asset_key(e) for e in events + if e.get("asset_type") == "laptop"}) + assert distinct == 2 + + +def test_scan_event_count_preserved_as_raw_sum(): + """The 'sub_label' card must still show the raw row count so + operators can see the per-device scan volume.""" + events = [_laptop_event("Mac-A", "2026-05-11T00:00:00Z") for _ in range(1020)] + raw_volume = sum(1 for e in events if e.get("asset_type") == "laptop") + assert raw_volume == 1020 diff --git a/ghost-ai-scanner/tests/unit/test_risk_score.py b/ghost-ai-scanner/tests/unit/test_risk_score.py new file mode 100644 index 0000000..0a585e3 --- /dev/null +++ b/ghost-ai-scanner/tests/unit/test_risk_score.py @@ -0,0 +1,108 @@ +# ============================================================= +# FILE: tests/unit/test_risk_score.py +# VERSION: 1.0.0 +# UPDATED: 2026-05-11 +# OWNER: Giggso Inc +# PURPOSE: Lock the risk scoring contract — drives the AI Posture +# card. If these numbers drift, the headline UX drifts. +# ============================================================= + +import sys +from pathlib import Path + +_ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(_ROOT / "src")) + +from scoring.risk_score import ( + risk_score, risk_band, posture_breakdown, +) + + +def _row(sev="HIGH", cat="process", occ=1, status="open"): + return {"severity": sev, "category": cat, "occurrences": occ, + "status": status} + + +def test_empty_input_is_clean(): + assert risk_score([]) == 0 + assert risk_band(0) == "CLEAN" + + +def test_resolved_rows_do_not_score(): + rows = [_row(sev="CRITICAL", status="resolved")] + assert risk_score(rows) == 0 + + +def test_single_high_process_contributes(): + s = risk_score([_row(sev="HIGH", cat="process")]) + assert s > 0 + assert s < 75 # one HIGH shouldn't pin red + + +def test_one_critical_drives_red(): + """A single CRITICAL finding alone must push the device to CRITICAL band.""" + s = risk_score([_row(sev="CRITICAL", cat="process")]) + assert risk_band(s) == "CRITICAL", f"score={s} expected CRITICAL band" + + +def test_score_caps_at_100(): + rows = [_row(sev="CRITICAL", cat="process") for _ in range(50)] + assert risk_score(rows) == 100 + + +def test_category_multiplier_applied(): + """A running process scores higher than a stale shell-history line.""" + proc = risk_score([_row(sev="HIGH", cat="process")]) + hist = risk_score([_row(sev="HIGH", cat="shell_history")]) + assert proc > hist + + +def test_occurrences_dampened(): + """Seen 100 times is worse than seen once but not 100×.""" + once = risk_score([_row(sev="HIGH", cat="process", occ=1)]) + many = risk_score([_row(sev="HIGH", cat="process", occ=100)]) + assert many > once + assert many < once * 10 + + +def test_band_thresholds(): + assert risk_band(0) == "CLEAN" + assert risk_band(5) == "LOW" + assert risk_band(20) == "MEDIUM" + assert risk_band(50) == "HIGH" + assert risk_band(80) == "CRITICAL" + assert risk_band(100) == "CRITICAL" + + +# ── posture_breakdown ────────────────────────────────────────── + +def test_posture_breakdown_groups_by_category(): + rows = [ + _row(cat="process", sev="HIGH"), + _row(cat="process", sev="CRITICAL"), + _row(cat="vector_db", sev="MEDIUM"), + ] + b = posture_breakdown(rows) + assert b["process"]["count"] == 2 + assert b["process"]["max_severity"] == "CRITICAL" + assert b["vector_db"]["count"] == 1 + + +def test_posture_breakdown_skips_resolved(): + rows = [ + _row(cat="process", status="open"), + _row(cat="process", status="resolved"), + ] + b = posture_breakdown(rows) + assert b["process"]["count"] == 1 + + +def test_posture_breakdown_picks_latest_last_seen(): + rows = [ + {"category": "process", "severity": "HIGH", + "occurrences": 1, "last_seen": "2026-05-10T00:00:00"}, + {"category": "process", "severity": "HIGH", + "occurrences": 1, "last_seen": "2026-05-11T12:00:00"}, + ] + b = posture_breakdown(rows) + assert b["process"]["last_seen"] == "2026-05-11T12:00:00"