Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 116 additions & 90 deletions recce/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,8 +740,8 @@ async def list_tools() -> List[Tool]:
Use the results to identify anomalies, then follow up with profile_diff,
query_diff, or other tools until you have confidence in the root cause.

Models with value_diff: null have unknown data impact — use
suggested_deep_dives or call profile_diff/query_diff to investigate.
Models with data_impact: 'potential' have unknown data impact — follow
the model's next_action field to investigate with profile_diff/query_diff.
"""
).strip(),
inputSchema={
Expand All @@ -759,6 +759,13 @@ async def list_tools() -> List[Tool]:
"type": "boolean",
"description": "Skip row-level value comparison on modified models. Default: false",
},
"skip_downstream_value_diff": {
"type": "boolean",
"description": (
"Skip value comparison on downstream models "
"(faster for large DAGs). Default: false"
),
},
},
},
)
Expand Down Expand Up @@ -1180,6 +1187,7 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An
"state:modified.body+ state:modified.macros+ state:modified.contract+",
)
skip_value_diff = arguments.get("skip_value_diff", False)
skip_downstream_value_diff = arguments.get("skip_downstream_value_diff", False)
errors = []

# Step 1: Lineage classification
Expand Down Expand Up @@ -1229,10 +1237,8 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An
else:
not_impacted_models.append(name)

# Step 2a: Row count diff (skip views and removed models)
countable_models = [
m for m in impacted_models if m["materialized"] != "view" and m["change_status"] != "removed"
]
# Step 2a: Row count diff (skip removed models; include views for delta detection)
countable_models = [m for m in impacted_models if m["change_status"] != "removed"]
if countable_models:
countable_names = [m["name"] for m in countable_models]
try:
Expand Down Expand Up @@ -1300,11 +1306,13 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An
except Exception as e:
errors.append({"step": "schema_diff", "message": str(e)})

# Step 3: Value diff (PK Join on modified non-view models)
# Step 3: Value diff (PK Join on non-view impacted models)
if not skip_value_diff:
for model in impacted_models:
if model["materialized"] == "view" or model["change_status"] is None:
continue # skip views and downstream-only models
if model["materialized"] == "view":
continue # skip views (no PK, ambiguous semantics)
if skip_downstream_value_diff and model["change_status"] is None:
continue # opt-out for large DAGs
node_id = node_id_by_name.get(model["name"])
if not node_id:
continue
Expand Down Expand Up @@ -1407,12 +1415,14 @@ def _run_value_diff_query(adapter, query):
current_mean = float(raw_curr) if raw_curr is not None else None
col_idx += 2
columns_result[col] = {
"rows_changed": col_changed,
"affected_row_count": col_changed,
"base_mean": base_mean,
"current_mean": current_mean,
}

total_affected = rows_added + rows_removed + rows_changed
model["value_diff"] = {
"affected_row_count": total_affected,
"rows_added": rows_added,
"rows_removed": rows_removed,
"rows_changed": rows_changed,
Expand All @@ -1421,78 +1431,94 @@ def _run_value_diff_query(adapter, query):
except Exception as e:
errors.append({"step": "value_diff", "model": model["name"], "message": str(e)})

# Step 4: Suggested deep dives (deterministic rules)
suggested_deep_dives = []
seen_models = set() # Avoid duplicate suggestions

# Step 4: Compute per-model affected_row_count, data_impact, and next_action
max_affected = 0
for model in impacted_models:
name = model["name"]

# R1: rows_changed high + row_count stable → profile changed columns
if (
model["value_diff"] is not None
and model["row_count"] is not None
and model["row_count"]["delta_pct"] is not None
and abs(model["row_count"]["delta_pct"]) <= 5
):
vd = model["value_diff"]
# Calculate ratio of changed rows to total matched rows
total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"]
if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2:
top_cols = [
col for col, stats in (vd.get("columns") or {}).items() if stats.get("rows_changed", 0) > 0
]
if name not in seen_models:
suggested_deep_dives.append(
{
"model": name,
"tool": "profile_diff",
"columns": top_cols if top_cols else None,
}
)
seen_models.add(name)
continue

# R2: row_count delta > 5% → profile whole model
if model["row_count"] and model["row_count"]["delta_pct"] is not None:
if abs(model["row_count"]["delta_pct"]) > 5:
if name not in seen_models:
suggested_deep_dives.append(
{
"model": name,
"tool": "profile_diff",
"columns": None, # whole model
}
)
seen_models.add(name)
continue

# R3: schema_changes non-empty → profile changed columns
if model["schema_changes"]:
changed_cols = [c["column"] for c in model["schema_changes"]]
if name not in seen_models:
suggested_deep_dives.append(
{
"model": name,
"tool": "profile_diff",
"columns": changed_cols,
}
)
seen_models.add(name)
continue
# affected_row_count: value_diff total (priority) or abs(row_count.delta) (fallback)
if model["value_diff"] is not None:
model["affected_row_count"] = model["value_diff"]["affected_row_count"]
elif model["row_count"] is not None and model["row_count"].get("delta") is not None:
model["affected_row_count"] = abs(model["row_count"]["delta"])
else:
model["affected_row_count"] = None

# R4: value_diff null on modified model → profile whole model
is_modified = model["change_status"] in ("modified", "added")
if model["value_diff"] is None and is_modified:
if name not in seen_models:
suggested_deep_dives.append(
{
"model": name,
# data_impact: evidence level from value_diff
if model["value_diff"] is not None:
if model["value_diff"]["affected_row_count"] > 0:
model["data_impact"] = "confirmed"
else:
model["data_impact"] = "none"
else:
model["data_impact"] = "potential"

# When data_impact is "potential", force affected_row_count to null
# to avoid confusion from row_count fallback showing 0
if model["data_impact"] == "potential":
model["affected_row_count"] = None

if model["affected_row_count"] is not None and model["affected_row_count"] > max_affected:
max_affected = model["affected_row_count"]

# next_action: only for "potential" models — confirmed/none need no follow-up
model["next_action"] = None
if model["data_impact"] == "potential":
is_modified = model["change_status"] in ("modified", "added")
is_downstream = model["change_status"] is None

if model["schema_changes"]:
# Schema changed — profile the changed columns
changed_cols = [c["column"] for c in model["schema_changes"]]
model["next_action"] = {
"tool": "profile_diff",
"columns": changed_cols,
"reason": "schema changed, value_diff unavailable",
"priority": "high" if is_modified else "medium",
}
elif is_modified:
# Modified but no value_diff (view, no PK, or error)
model["next_action"] = {
"tool": "profile_diff",
"columns": None,
"reason": "modified model, value_diff unavailable (view or no PK)",
"priority": "high",
}
elif is_downstream and model["materialized"] == "view":
# Downstream view — low priority
model["next_action"] = {
"tool": "profile_diff",
"columns": None,
"reason": "downstream view, value_diff skipped",
"priority": "low",
}
elif is_downstream:
# Downstream table — medium priority
model["next_action"] = {
"tool": "profile_diff",
"columns": None,
"reason": "downstream model, value_diff skipped",
"priority": "medium",
}
elif model["data_impact"] == "confirmed":
# Confirmed changes — suggest profile_diff only if high change ratio
vd = model["value_diff"]
if (
model["row_count"] is not None
and model["row_count"]["delta_pct"] is not None
and abs(model["row_count"]["delta_pct"]) <= 5
):
total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"]
if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2:
top_cols = [
col
for col, stats in (vd.get("columns") or {}).items()
if stats.get("affected_row_count", 0) > 0
]
model["next_action"] = {
"tool": "profile_diff",
"columns": None,
"columns": top_cols if top_cols else None,
"reason": "high change ratio with stable row count — investigate value shifts",
"priority": "medium",
}
)
seen_models.add(name)

if sentry_metrics:
duration = time.time() - start_time
Expand All @@ -1501,20 +1527,20 @@ def _run_value_diff_query(adapter, query):

result = {
"_guidance": (
"DO NOT OVERRIDE these classifications with your own analysis. "
"These lists are computed from the lineage DAG and are definitive. "
"Copy impacted_models and not_impacted_models directly into your output. "
"When value_diff.rows_changed is present for a model, "
"use that number as the affected_row_count (exact count of "
"rows whose values differ between base and current). "
"A model with 0 value changes is still impacted if it appears "
"in this list — impact means 'in the blast radius', not "
"'has changed data'."
"confirmed_impacted_models lists all models in the DAG blast radius "
"(modified + downstream). Use data_impact to triage: "
"'confirmed' = value_diff verified data changes exist — report directly. "
"'none' = value_diff verified zero data changes — report directly. "
"'potential' = no value_diff available (views, no PK, or skipped) "
"— follow the model's next_action to investigate. "
"Only models with next_action != null need further tool calls. "
"Note: incremental model value_diff may reflect "
"build window artifacts if not fully refreshed."
),
"classification_source": "lineage_dag",
"impacted_models": impacted_models,
"not_impacted_models": not_impacted_models,
"suggested_deep_dives": suggested_deep_dives,
"max_affected_row_count": max_affected,
"confirmed_impacted_models": impacted_models,
"confirmed_not_impacted_models": not_impacted_models,
"errors": errors,
}
return self._maybe_add_single_env_warning(result)
Expand Down
Loading
Loading