diff --git a/recce/mcp_server.py b/recce/mcp_server.py index 39726b1b3..0657e100e 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -750,8 +750,8 @@ async def list_tools() -> List[Tool]: Use the results to identify anomalies, then follow up with profile_diff, query_diff, or other tools until you have confidence in the root cause. - Models with value_diff: null have unknown data impact — use - suggested_deep_dives or call profile_diff/query_diff to investigate. + Models with data_impact: 'potential' have unknown data impact — follow + the model's next_action field to investigate with profile_diff/query_diff. """ ).strip(), inputSchema={ @@ -769,6 +769,13 @@ async def list_tools() -> List[Tool]: "type": "boolean", "description": "Skip row-level value comparison on modified models. Default: false", }, + "skip_downstream_value_diff": { + "type": "boolean", + "description": ( + "Skip value comparison on downstream models " + "(faster for large DAGs). Default: false" + ), + }, }, }, ) @@ -1190,6 +1197,7 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An "state:modified.body+ state:modified.macros+ state:modified.contract+", ) skip_value_diff = arguments.get("skip_value_diff", False) + skip_downstream_value_diff = arguments.get("skip_downstream_value_diff", False) errors = [] # Step 1: Lineage classification @@ -1239,10 +1247,8 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An else: not_impacted_models.append(name) - # Step 2a: Row count diff (skip views and removed models) - countable_models = [ - m for m in impacted_models if m["materialized"] != "view" and m["change_status"] != "removed" - ] + # Step 2a: Row count diff (skip removed models; include views for delta detection) + countable_models = [m for m in impacted_models if m["change_status"] != "removed"] if countable_models: countable_names = [m["name"] for m in countable_models] try: @@ -1310,11 +1316,13 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An except Exception as e: errors.append({"step": "schema_diff", "message": str(e)}) - # Step 3: Value diff (PK Join on modified non-view models) + # Step 3: Value diff (PK Join on non-view impacted models) if not skip_value_diff: for model in impacted_models: - if model["materialized"] == "view" or model["change_status"] is None: - continue # skip views and downstream-only models + if model["materialized"] == "view": + continue # skip views (no PK, ambiguous semantics) + if skip_downstream_value_diff and model["change_status"] is None: + continue # opt-out for large DAGs node_id = node_id_by_name.get(model["name"]) if not node_id: continue @@ -1417,12 +1425,14 @@ def _run_value_diff_query(adapter, query): current_mean = float(raw_curr) if raw_curr is not None else None col_idx += 2 columns_result[col] = { - "rows_changed": col_changed, + "affected_row_count": col_changed, "base_mean": base_mean, "current_mean": current_mean, } + total_affected = rows_added + rows_removed + rows_changed model["value_diff"] = { + "affected_row_count": total_affected, "rows_added": rows_added, "rows_removed": rows_removed, "rows_changed": rows_changed, @@ -1431,78 +1441,94 @@ def _run_value_diff_query(adapter, query): except Exception as e: errors.append({"step": "value_diff", "model": model["name"], "message": str(e)}) - # Step 4: Suggested deep dives (deterministic rules) - suggested_deep_dives = [] - seen_models = set() # Avoid duplicate suggestions - + # Step 4: Compute per-model affected_row_count, data_impact, and next_action + max_affected = 0 for model in impacted_models: - name = model["name"] - - # R1: rows_changed high + row_count stable → profile changed columns - if ( - model["value_diff"] is not None - and model["row_count"] is not None - and model["row_count"]["delta_pct"] is not None - and abs(model["row_count"]["delta_pct"]) <= 5 - ): - vd = model["value_diff"] - # Calculate ratio of changed rows to total matched rows - total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"] - if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2: - top_cols = [ - col for col, stats in (vd.get("columns") or {}).items() if stats.get("rows_changed", 0) > 0 - ] - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": top_cols if top_cols else None, - } - ) - seen_models.add(name) - continue - - # R2: row_count delta > 5% → profile whole model - if model["row_count"] and model["row_count"]["delta_pct"] is not None: - if abs(model["row_count"]["delta_pct"]) > 5: - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": None, # whole model - } - ) - seen_models.add(name) - continue - - # R3: schema_changes non-empty → profile changed columns - if model["schema_changes"]: - changed_cols = [c["column"] for c in model["schema_changes"]] - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": changed_cols, - } - ) - seen_models.add(name) - continue + # affected_row_count: value_diff total (priority) or abs(row_count.delta) (fallback) + if model["value_diff"] is not None: + model["affected_row_count"] = model["value_diff"]["affected_row_count"] + elif model["row_count"] is not None and model["row_count"].get("delta") is not None: + model["affected_row_count"] = abs(model["row_count"]["delta"]) + else: + model["affected_row_count"] = None - # R4: value_diff null on modified model → profile whole model - is_modified = model["change_status"] in ("modified", "added") - if model["value_diff"] is None and is_modified: - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, + # data_impact: evidence level from value_diff + if model["value_diff"] is not None: + if model["value_diff"]["affected_row_count"] > 0: + model["data_impact"] = "confirmed" + else: + model["data_impact"] = "none" + else: + model["data_impact"] = "potential" + + # When data_impact is "potential", force affected_row_count to null + # to avoid confusion from row_count fallback showing 0 + if model["data_impact"] == "potential": + model["affected_row_count"] = None + + if model["affected_row_count"] is not None and model["affected_row_count"] > max_affected: + max_affected = model["affected_row_count"] + + # next_action: only for "potential" models — confirmed/none need no follow-up + model["next_action"] = None + if model["data_impact"] == "potential": + is_modified = model["change_status"] in ("modified", "added") + is_downstream = model["change_status"] is None + + if model["schema_changes"]: + # Schema changed — profile the changed columns + changed_cols = [c["column"] for c in model["schema_changes"]] + model["next_action"] = { + "tool": "profile_diff", + "columns": changed_cols, + "reason": "schema changed, value_diff unavailable", + "priority": "high" if is_modified else "medium", + } + elif is_modified: + # Modified but no value_diff (view, no PK, or error) + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "modified model, value_diff unavailable (view or no PK)", + "priority": "high", + } + elif is_downstream and model["materialized"] == "view": + # Downstream view — low priority + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "downstream view, value_diff skipped", + "priority": "low", + } + elif is_downstream: + # Downstream table — medium priority + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "downstream model, value_diff skipped", + "priority": "medium", + } + elif model["data_impact"] == "confirmed": + # Confirmed changes — suggest profile_diff only if high change ratio + vd = model["value_diff"] + if ( + model["row_count"] is not None + and model["row_count"]["delta_pct"] is not None + and abs(model["row_count"]["delta_pct"]) <= 5 + ): + total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"] + if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2: + top_cols = [ + col + for col, stats in (vd.get("columns") or {}).items() + if stats.get("affected_row_count", 0) > 0 + ] + model["next_action"] = { "tool": "profile_diff", - "columns": None, + "columns": top_cols if top_cols else None, + "reason": "high change ratio with stable row count — investigate value shifts", + "priority": "medium", } - ) - seen_models.add(name) if sentry_metrics: duration = time.time() - start_time @@ -1511,20 +1537,20 @@ def _run_value_diff_query(adapter, query): result = { "_guidance": ( - "DO NOT OVERRIDE these classifications with your own analysis. " - "These lists are computed from the lineage DAG and are definitive. " - "Copy impacted_models and not_impacted_models directly into your output. " - "When value_diff.rows_changed is present for a model, " - "use that number as the affected_row_count (exact count of " - "rows whose values differ between base and current). " - "A model with 0 value changes is still impacted if it appears " - "in this list — impact means 'in the blast radius', not " - "'has changed data'." + "confirmed_impacted_models lists all models in the DAG blast radius " + "(modified + downstream). Use data_impact to triage: " + "'confirmed' = value_diff verified data changes exist — report directly. " + "'none' = value_diff verified zero data changes — report directly. " + "'potential' = no value_diff available (views, no PK, or skipped) " + "— follow the model's next_action to investigate. " + "Only models with next_action != null need further tool calls. " + "Note: incremental model value_diff may reflect " + "build window artifacts if not fully refreshed." ), "classification_source": "lineage_dag", - "impacted_models": impacted_models, - "not_impacted_models": not_impacted_models, - "suggested_deep_dives": suggested_deep_dives, + "max_affected_row_count": max_affected, + "confirmed_impacted_models": impacted_models, + "confirmed_not_impacted_models": not_impacted_models, "errors": errors, } return self._maybe_add_single_env_warning(result) diff --git a/tests/test_mcp_e2e.py b/tests/test_mcp_e2e.py index 3bbd32f17..033ee7db3 100644 --- a/tests/test_mcp_e2e.py +++ b/tests/test_mcp_e2e.py @@ -166,16 +166,15 @@ async def test_classifies_modified_and_downstream(self, mcp_e2e_impact): result = await server._tool_impact_analysis({}) # Structure check - assert "impacted_models" in result - assert "not_impacted_models" in result - assert "suggested_deep_dives" in result + assert "confirmed_impacted_models" in result + assert "confirmed_not_impacted_models" in result assert "errors" in result # customers is modified (different data → different checksum) - model_names = [m["name"] for m in result["impacted_models"]] + model_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "customers" in model_names - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["change_status"] == "modified" assert customers["materialized"] == "table" @@ -185,10 +184,10 @@ async def test_downstream_has_null_change_status(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - model_names = [m["name"] for m in result["impacted_models"]] + model_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "orders" in model_names - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["change_status"] is None # downstream, not directly modified @pytest.mark.asyncio @@ -206,9 +205,9 @@ async def test_no_false_positives_without_siblings(self, mcp_e2e_impact): ) result = await server._tool_impact_analysis({}) - impacted_names = [m["name"] for m in result["impacted_models"]] + impacted_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "unrelated" not in impacted_names - assert "unrelated" in result["not_impacted_models"] + assert "unrelated" in result["confirmed_not_impacted_models"] @pytest.mark.asyncio async def test_row_count_populated_for_tables(self, mcp_e2e_impact): @@ -216,7 +215,7 @@ async def test_row_count_populated_for_tables(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["row_count"] is not None assert customers["row_count"]["base"] == 2 assert customers["row_count"]["current"] == 3 @@ -226,7 +225,7 @@ async def test_row_count_populated_for_tables(self, mcp_e2e_impact): @pytest.mark.asyncio async def test_row_count_null_for_views(self, mcp_e2e_impact): - """Views should have row_count: null (expensive full-table scan).""" + """Views now included in row_count (delta detection signal).""" server, helper = mcp_e2e_impact # Add a view model @@ -242,9 +241,11 @@ async def test_row_count_null_for_views(self, mcp_e2e_impact): ) result = await server._tool_impact_analysis({}) - view_model = next((m for m in result["impacted_models"] if m["name"] == "customers_view"), None) + view_model = next((m for m in result["confirmed_impacted_models"] if m["name"] == "customers_view"), None) assert view_model is not None - assert view_model["row_count"] is None + # Views get row_count (useful metadata signal) but no value_diff + assert view_model["row_count"] is not None + assert view_model["value_diff"] is None @pytest.mark.asyncio async def test_schema_changes_detected(self, mcp_e2e): @@ -262,7 +263,7 @@ async def test_schema_changes_detected(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - users = next(m for m in result["impacted_models"] if m["name"] == "users") + users = next(m for m in result["confirmed_impacted_models"] if m["name"] == "users") assert len(users["schema_changes"]) > 0 added_cols = [c for c in users["schema_changes"] if c["change_status"] == "added"] assert any(c["column"] == "email" for c in added_cols) @@ -273,24 +274,25 @@ async def test_schema_changes_empty_when_no_change(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["schema_changes"] == [] @pytest.mark.asyncio - async def test_suggested_deep_dive_r2_row_count_delta(self, mcp_e2e_impact): - """R2: row_count delta > 5% → suggest profile_diff on whole model.""" + async def test_next_action_row_count_delta(self, mcp_e2e_impact): + """Row count delta > 5% on potential model → next_action profile_diff.""" server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - # customers: base=2, curr=3, delta_pct=50% → R2 triggers - dives = result["suggested_deep_dives"] - customer_dive = next((d for d in dives if d["model"] == "customers"), None) - assert customer_dive is not None - assert customer_dive["tool"] == "profile_diff" + # customers: base=2, curr=3, delta_pct=50%, modified → data_impact depends on value_diff + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") + # If data_impact is potential (no PK), next_action should suggest profile_diff + if customers["data_impact"] == "potential": + assert customers["next_action"] is not None + assert customers["next_action"]["tool"] == "profile_diff" @pytest.mark.asyncio - async def test_suggested_deep_dive_r3_schema_change(self, mcp_e2e): - """R3: schema_changes non-empty → suggest profile_diff on changed columns.""" + async def test_next_action_schema_change(self, mcp_e2e): + """Schema changes on potential model → next_action profile_diff on changed columns.""" server, helper = mcp_e2e helper.create_model( @@ -303,18 +305,50 @@ async def test_suggested_deep_dive_r3_schema_change(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - dives = result["suggested_deep_dives"] - users_dive = next((d for d in dives if d["model"] == "users"), None) - assert users_dive is not None - assert "email" in (users_dive.get("columns") or []) + users = next(m for m in result["confirmed_impacted_models"] if m["name"] == "users") + assert users["next_action"] is not None + assert users["next_action"]["tool"] == "profile_diff" + assert users["next_action"]["priority"] == "high" + assert "email" in (users["next_action"].get("columns") or []) @pytest.mark.asyncio - async def test_suggested_deep_dive_r4_null_value_diff(self, mcp_e2e): - """R4: value_diff null on modified model → suggest profile_diff.""" + async def test_next_action_downstream_view_low_priority(self, mcp_e2e): + """Downstream view → next_action with priority=low.""" + server, helper = mcp_e2e + + helper.create_model( + "orders", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,200", + unique_id="model.recce_test.orders", + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + ) + # downstream view of orders + helper.create_model( + "orders_view", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,100", + unique_id="model.recce_test.orders_view", + depends_on=["model.recce_test.orders"], + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + patch_func=lambda d: d["config"].update({"materialized": "view"}), + ) + result = await server._tool_impact_analysis({"skip_downstream_value_diff": True}) + + view = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders_view") + assert view["data_impact"] == "potential" + assert view["next_action"] is not None + assert view["next_action"]["priority"] == "low" + + @pytest.mark.asyncio + async def test_next_action_null_value_diff_on_view(self, mcp_e2e): + """Modified view (value_diff=null) → next_action profile_diff.""" server, helper = mcp_e2e # Create a modified view: different data → different checksum → "modified" - # Views get value_diff=null (skipped), so R4 should trigger + # Views get value_diff=null (skipped), so next_action should trigger helper.create_model( "stg_orders", base_csv="id,amount\n1,100", @@ -326,16 +360,16 @@ async def test_suggested_deep_dive_r4_null_value_diff(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - # stg_orders is modified (different checksum) + view (value_diff=null) → R4 - stg = next(m for m in result["impacted_models"] if m["name"] == "stg_orders") + # stg_orders is modified (different checksum) + view (value_diff=null) + stg = next(m for m in result["confirmed_impacted_models"] if m["name"] == "stg_orders") assert stg["change_status"] == "modified" assert stg["value_diff"] is None + assert stg["data_impact"] == "potential" - dives = result["suggested_deep_dives"] - view_dive = next((d for d in dives if d["model"] == "stg_orders"), None) - assert view_dive is not None - assert view_dive["tool"] == "profile_diff" - assert view_dive["columns"] is None # whole model + assert stg["next_action"] is not None + assert stg["next_action"]["tool"] == "profile_diff" + assert stg["next_action"]["columns"] is None # whole model + assert stg["next_action"]["priority"] == "high" class TestImpactAnalysisFullScenario: @@ -378,14 +412,14 @@ async def test_full_scenario_modified_with_downstream(self, mcp_e2e): result = await server._tool_impact_analysis({}) # Verify classification - impacted_names = {m["name"] for m in result["impacted_models"]} + impacted_names = {m["name"] for m in result["confirmed_impacted_models"]} assert "stg_orders" in impacted_names assert "orders" in impacted_names assert "customers" not in impacted_names - assert "customers" in result["not_impacted_models"] + assert "customers" in result["confirmed_not_impacted_models"] # Verify row counts - stg = next(m for m in result["impacted_models"] if m["name"] == "stg_orders") + stg = next(m for m in result["confirmed_impacted_models"] if m["name"] == "stg_orders") assert stg["row_count"]["base"] == 2 assert stg["row_count"]["current"] == 2 assert stg["row_count"]["delta"] == 0 @@ -429,9 +463,45 @@ async def test_row_count_error_captured_not_fatal(self, mcp_e2e): result = await server._tool_impact_analysis({}) # Should not crash — structure is intact - assert "impacted_models" in result + assert "confirmed_impacted_models" in result assert "errors" in result + @pytest.mark.asyncio + async def test_value_diff_error_captured_not_fatal(self, mcp_e2e): + """If value_diff query fails, error is captured and model still appears.""" + server, helper = mcp_e2e + + helper.create_model( + "orders", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,150", + unique_id="model.recce_test.orders", + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + ) + helper.add_unique_test("model.recce_test.orders", "orders", "id") + + # Patch adapter.execute to raise during value_diff SQL execution + original_execute = server.context.adapter.execute + + def failing_execute(sql, fetch=False): + if "FULL OUTER JOIN" in sql: + raise RuntimeError("simulated value_diff failure") + return original_execute(sql, fetch=fetch) + + with patch.object(server.context.adapter, "execute", side_effect=failing_execute): + result = await server._tool_impact_analysis({}) + + # Model still appears + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") + assert orders["value_diff"] is None + assert orders["data_impact"] == "potential" + + # Error captured + vd_errors = [e for e in result["errors"] if e["step"] == "value_diff"] + assert len(vd_errors) == 1 + assert "simulated value_diff failure" in vd_errors[0]["message"] + @pytest.mark.asyncio async def test_schema_diff_error_does_not_block_value_diff(self, mcp_e2e): """Regression: schema-diff failure must not prevent value-diff from running. @@ -474,12 +544,12 @@ def patched_get_lineage_diff(): assert any(e["step"] == "schema_diff" for e in result["errors"]) # Function returned successfully (no UnboundLocalError) - assert "impacted_models" in result - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + assert "confirmed_impacted_models" in result + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") # Value-diff still ran (the whole point of the fix) assert orders["value_diff"] is not None - assert orders["value_diff"]["rows_changed"] >= 0 + assert orders["value_diff"]["affected_row_count"] >= 0 class TestImpactAnalysisValueDiff: @@ -500,7 +570,7 @@ async def test_value_diff_with_pk(self, mcp_e2e): helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") vd = orders["value_diff"] assert vd is not None assert vd["rows_changed"] == 1 # id=1: amount 100→150 @@ -508,7 +578,7 @@ async def test_value_diff_with_pk(self, mcp_e2e): assert vd["rows_removed"] == 0 assert "columns" in vd assert "amount" in vd["columns"] - assert vd["columns"]["amount"]["rows_changed"] == 1 + assert vd["columns"]["amount"]["affected_row_count"] == 1 @pytest.mark.asyncio async def test_no_pk_returns_null(self, mcp_e2e): @@ -523,7 +593,7 @@ async def test_no_pk_returns_null(self, mcp_e2e): curr_columns={"id": "INTEGER", "amount": "INTEGER"}, ) result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["value_diff"] is None @pytest.mark.asyncio @@ -540,7 +610,7 @@ async def test_skip_value_diff_flag(self, mcp_e2e): ) helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({"skip_value_diff": True}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["value_diff"] is None @pytest.mark.asyncio @@ -560,17 +630,16 @@ async def test_r1_high_rows_changed_stable_count(self, mcp_e2e): result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") # row_count: base=2, curr=2, delta=0 (stable) # value_diff: rows_changed=2 (100% of matched rows) assert orders["row_count"]["delta"] == 0 - assert orders["value_diff"]["rows_changed"] == 2 + assert orders["value_diff"]["affected_row_count"] == 2 - dives = result["suggested_deep_dives"] - orders_dive = next((d for d in dives if d["model"] == "orders"), None) - assert orders_dive is not None - assert orders_dive["tool"] == "profile_diff" - assert "amount" in (orders_dive["columns"] or []) + # High change ratio → next_action suggests profile_diff on changed columns + assert orders["next_action"] is not None + assert orders["next_action"]["tool"] == "profile_diff" + assert "amount" in (orders["next_action"]["columns"] or []) @pytest.mark.asyncio async def test_value_diff_per_column_means(self, mcp_e2e): @@ -586,7 +655,7 @@ async def test_value_diff_per_column_means(self, mcp_e2e): ) helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") vd = orders["value_diff"] assert vd["columns"]["amount"]["base_mean"] is not None assert vd["columns"]["amount"]["current_mean"] is not None diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 251e79fcb..b9ad0d65e 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -2132,3 +2132,421 @@ async def test_impact_analysis_schema_has_select(self, mcp_server): tool = next(t for t in result.root.tools if t.name == "impact_analysis") assert "select" in tool.inputSchema["properties"] assert "skip_value_diff" in tool.inputSchema["properties"] + assert "skip_downstream_value_diff" in tool.inputSchema["properties"] + + +class TestImpactAnalysisBehavior: + """Test impact_analysis behavioral logic: data_impact, downstream value_diff.""" + + # --------------------------------------------------------------------------- + # Mock setup helpers + # --------------------------------------------------------------------------- + + LINEAGE_DIFF_DATA = { + "base": { + "nodes": { + "model.project.modified_model": { + "name": "modified_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "name": "downstream_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + "model.project.view_model": { + "name": "view_model", + "config": {"materialized": "view"}, + "columns": {}, + }, + }, + "parent_map": {}, + }, + "current": { + "nodes": { + "model.project.modified_model": { + "name": "modified_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "name": "downstream_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + "model.project.view_model": { + "name": "view_model", + "config": {"materialized": "view"}, + "columns": {}, + }, + }, + "parent_map": {}, + }, + "diff": { + "model.project.modified_model": {"change_status": "modified"}, + }, + } + + @staticmethod + def _make_mock_adapter(): + """Return a mock adapter with all value_diff and row_count hooks wired up.""" + adapter = MagicMock() + + def mock_select_nodes(select=""): + if any( + s in select + for s in [ + "state:modified.body+", + "state:modified.macros+", + "state:modified.contract+", + ] + ): + return { + "model.project.modified_model", + "model.project.downstream_model", + "model.project.view_model", + } + elif select == "state:modified": + return {"model.project.modified_model"} + return set() + + adapter.select_nodes.side_effect = mock_select_nodes + + def mock_get_model(node_id): + models = { + "model.project.modified_model": { + "primary_key": "id", + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "primary_key": "id", + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + } + return models.get(node_id, {}) + + adapter.get_model.side_effect = mock_get_model + adapter.create_relation.return_value = "some_relation" + # connection_named is used as a context manager — MagicMock supports this natively + return adapter + + @staticmethod + def _make_execute_side_effect(modified_has_changes=True): + """ + Return a side_effect callable for adapter.execute(query, fetch=True). + + Dispatches on query content (model name in SQL) rather than call order, + because impacted_models iteration order depends on dict/set ordering. + + Row layout per model: + - modified_model (non-pk col: amount): [rows_added, rows_removed, rows_changed, amount__changed, amount__base_mean, amount__curr_mean] + - downstream_model (non-pk col: total): [rows_added, rows_removed, rows_changed, total__changed, total__base_mean, total__curr_mean] + """ + + def side_effect(query, fetch=False): + # Dispatch on column names in the SQL (create_relation returns a + # generic mock, so model name won't appear in query). + # modified_model has column "amount"; downstream_model has "total". + q = str(query) + if '"amount"' in q: + # modified_model + if modified_has_changes: + row = [0, 0, 5, 5, 10.0, 15.0] # 5 rows changed + else: + row = [0, 0, 0, 0, 10.0, 10.0] + else: + # downstream_model (column "total") — zero changes + row = [0, 0, 0, 0, 5.0, 5.0] + table = MagicMock() + table.__len__ = MagicMock(return_value=1) + table.__getitem__ = MagicMock(side_effect=lambda i: row if i == 0 else None) + return (None, table) + + return side_effect + + @pytest.fixture + def setup_impact_mocks(self, mcp_server): + """Fixture that yields (server, mock_context) with all impact_analysis mocks wired.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + adapter.execute.side_effect = self._make_execute_side_effect(modified_has_changes=True) + mock_context.adapter = adapter + + return server, mock_context + + @staticmethod + async def _call_impact_analysis(server, **extra_args): + """Invoke impact_analysis via the MCP call_tool handler.""" + handler = server.server.request_handlers[CallToolRequest] + req = CallToolRequest( + method="tools/call", + params=CallToolRequestParams(name="impact_analysis", arguments=extra_args), + ) + result = await handler(req) + import json + + return json.loads(result.root.content[0].text) + + # --------------------------------------------------------------------------- + # Tests + # --------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_all_models_have_data_impact(self, setup_impact_mocks): + """Every model in confirmed_impacted_models must have a data_impact field.""" + server, mock_context = setup_impact_mocks + valid_values = {"confirmed", "none", "potential"} + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "confirmed_impacted_models" in result + assert len(result["confirmed_impacted_models"]) > 0 + for model in result["confirmed_impacted_models"]: + assert "data_impact" in model, f"model {model['name']} missing data_impact" + assert ( + model["data_impact"] in valid_values + ), f"model {model['name']} has invalid data_impact: {model['data_impact']}" + + @pytest.mark.asyncio + async def test_data_impact_confirmed_when_value_changes_exist(self, setup_impact_mocks): + """Modified model with rows_changed > 0 → data_impact='confirmed'.""" + server, mock_context = setup_impact_mocks + # adapter.execute already set to return 5 rows_changed for modified_model + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "modified_model" in models_by_name + modified = models_by_name["modified_model"] + assert modified["data_impact"] == "confirmed" + assert modified["value_diff"] is not None + assert modified["value_diff"]["affected_row_count"] == 5 + # confirmed models with low change ratio → next_action is None + # (next_action only set for potential, or confirmed with high change ratio) + + @pytest.mark.asyncio + async def test_data_impact_none_when_zero_changes(self, setup_impact_mocks): + """Downstream model with all-zero value_diff → data_impact='none'.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "downstream_model" in models_by_name + downstream = models_by_name["downstream_model"] + assert downstream["data_impact"] == "none" + assert downstream["value_diff"] is not None + assert downstream["value_diff"]["affected_row_count"] == 0 + + @pytest.mark.asyncio + async def test_potential_has_null_affected_row_count(self, setup_impact_mocks): + """Views (no value_diff) → data_impact='potential', affected_row_count=None. + + Even if row_count.delta exists, affected_row_count should remain None for + potential models to avoid misleading callers. + """ + server, mock_context = setup_impact_mocks + # Give view_model a non-zero row_count delta to confirm the override to null + row_count_data = { + "view_model": {"base": 100, "curr": 110}, + "modified_model": {"base": 50, "curr": 50}, + "downstream_model": {"base": 50, "curr": 50}, + } + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value=row_count_data), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "view_model" in models_by_name + view = models_by_name["view_model"] + assert view["data_impact"] == "potential" + assert view["affected_row_count"] is None + # Potential models always get next_action + assert view["next_action"] is not None + assert view["next_action"]["tool"] == "profile_diff" + + @pytest.mark.asyncio + async def test_guidance_is_descriptive(self, setup_impact_mocks): + """_guidance must describe data_impact without containing forbidden control text.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "_guidance" in result + guidance = result["_guidance"] + assert "data_impact" in guidance + assert "next_action" in guidance + assert "DO NOT OVERRIDE" not in guidance + + @pytest.mark.asyncio + async def test_skip_downstream_value_diff(self, mcp_server): + """skip_downstream_value_diff=True: downstream tables get value_diff=None, data_impact='potential'.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + # Only ONE call expected — for modified_model only + execute_side_effect = self._make_execute_side_effect(modified_has_changes=True) + adapter.execute.side_effect = execute_side_effect + mock_context.adapter = adapter + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server, skip_downstream_value_diff=True) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + + # Modified model still gets value_diff + assert models_by_name["modified_model"]["value_diff"] is not None + assert models_by_name["modified_model"]["data_impact"] == "confirmed" + + # Downstream table: skipped → potential with next_action + assert models_by_name["downstream_model"]["value_diff"] is None + assert models_by_name["downstream_model"]["data_impact"] == "potential" + assert models_by_name["downstream_model"]["next_action"] is not None + assert models_by_name["downstream_model"]["next_action"]["priority"] == "medium" + + @pytest.mark.asyncio + async def test_skip_value_diff_takes_precedence(self, mcp_server): + """skip_value_diff=True: ALL models get value_diff=None, data_impact='potential'.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + mock_context.adapter = adapter + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server, skip_value_diff=True, skip_downstream_value_diff=False) + + # adapter.execute should NOT have been called at all + adapter.execute.assert_not_called() + + for model in result["confirmed_impacted_models"]: + assert ( + model["value_diff"] is None + ), f"model {model['name']} should have value_diff=None when skip_value_diff=True" + assert ( + model["data_impact"] == "potential" + ), f"model {model['name']} should have data_impact='potential' when skip_value_diff=True" + + @pytest.mark.asyncio + async def test_confirmed_low_change_ratio_has_null_next_action(self, setup_impact_mocks): + """Confirmed model with low change ratio → next_action=None.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + modified = models_by_name["modified_model"] + assert modified["data_impact"] == "confirmed" + # Mock has 5 rows_changed out of many — not high ratio → null next_action + assert modified["next_action"] is None + + @pytest.mark.asyncio + async def test_none_data_impact_has_null_next_action(self, setup_impact_mocks): + """data_impact='none' (zero changes) → next_action=None.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + downstream = models_by_name["downstream_model"] + assert downstream["data_impact"] == "none" + assert downstream["next_action"] is None + + @pytest.mark.asyncio + async def test_next_action_has_all_required_fields(self, setup_impact_mocks): + """next_action when present must have tool, columns, reason, priority.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + for model in result["confirmed_impacted_models"]: + if model["next_action"] is not None: + na = model["next_action"] + assert "tool" in na, f"{model['name']}: missing tool" + assert "reason" in na, f"{model['name']}: missing reason" + assert "priority" in na, f"{model['name']}: missing priority" + assert na["priority"] in ("high", "medium", "low"), f"{model['name']}: invalid priority" + assert "columns" in na, f"{model['name']}: missing columns key" + + @pytest.mark.asyncio + async def test_response_uses_new_field_names(self, setup_impact_mocks): + """Response must use max_affected_row_count, not total_affected or suggested_deep_dives.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "max_affected_row_count" in result + assert "total_affected_row_count" not in result + assert "suggested_deep_dives" not in result