From 3dc1946d48a6f4c1cc32c24d923cb2f482ec6d30 Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:19:26 -0700 Subject: [PATCH 1/8] feat(blast-radius): add public dataclasses and EdgeConfidence enum Phase 1/8 of blast-radius PR 1a. Pure data types -- no FalkorDB, chunker, or workflow imports. Every later module in this feature depends on this one; this depends on nothing. - EdgeConfidence enum with EXTRACTED/INFERRED/AMBIGUOUS tiers and default_score() mapping. - Symbol, ChangedSymbol, CallEdge, ImpactedUnchangedFile, BlastRadius dataclasses. --- app/services/blast_radius_types.py | 110 ++++++++++++++++++++++++++ tests/unit/test_blast_radius_types.py | 73 +++++++++++++++++ 2 files changed, 183 insertions(+) create mode 100644 app/services/blast_radius_types.py create mode 100644 tests/unit/test_blast_radius_types.py diff --git a/app/services/blast_radius_types.py b/app/services/blast_radius_types.py new file mode 100644 index 0000000..f58c194 --- /dev/null +++ b/app/services/blast_radius_types.py @@ -0,0 +1,110 @@ +""" +Public data types for blast-radius analysis. + +These are deliberately framework-agnostic dataclasses -- no FalkorDB, +no chunker, no workflow imports. Everything else in the blast-radius +feature depends on this module; this module depends on nothing. +""" + +from dataclasses import dataclass +from enum import Enum +from typing import Dict, List + + +class EdgeConfidence(str, Enum): + """Confidence tier for a graph edge. + + EXTRACTED -- both ends syntactically certain (e.g. `from x import y; y()`). + INFERRED -- resolves through one import hop, no aliasing/re-export ambiguity. + AMBIGUOUS -- multiple candidate targets, dynamic dispatch, decorator-wrapped. + + Always emit ALL candidates for an AMBIGUOUS site (recall = 1.0 design). + """ + + EXTRACTED = "extracted" + INFERRED = "inferred" + AMBIGUOUS = "ambiguous" + + def default_score(self) -> float: + return { + EdgeConfidence.EXTRACTED: 1.0, + EdgeConfidence.INFERRED: 0.7, + EdgeConfidence.AMBIGUOUS: 0.3, + }[self] + + +@dataclass +class Symbol: + """A function, method, or class in the code graph. + + `qualified_name` is the identity (e.g. `app.services.analyzer.Analyzer.review`) + and is stable across edits in a way line numbers are not. + """ + + repo_id: str + path: str + qualified_name: str + kind: str # "function" | "method" | "class" + start_line: int + end_line: int + signature_hash: str + is_test: bool = False + is_entry_point: bool = False + is_hub: bool = False + + +@dataclass +class ChangedSymbol: + """A symbol that was added/modified/removed by the PR diff.""" + + path: str + qualified_name: str + change_kind: str # "added" | "modified" | "removed" + + +@dataclass +class CallEdge: + """A CALLS edge between two symbols. + + `score` defaults to `confidence.default_score()` when not supplied. + """ + + from_qname: str + to_qname: str + confidence: EdgeConfidence + resolution_method: str + score: float + source_sha: str + + +@dataclass +class ImpactedUnchangedFile: + """A file outside the PR diff that is reachable from a changed symbol. + + Carried as context only -- never used as a review-iteration target, + because it has no patch hunks or inline-comment positions. + """ + + path: str + reached_via_symbol: str + hops: int + confidence: EdgeConfidence + + +@dataclass +class BlastRadius: + """Output of `compute_blast_radius()`. + + See spec D4 for field semantics. + """ + + changed_symbols: List[ChangedSymbol] + impacted_symbols: List[str] # qualified_names + pr_files: List[str] + impacted_unchanged_files: List[ImpactedUnchangedFile] + test_set: List[str] # file paths reachable via TESTED_BY + dropped_due_to_cap: Dict[str, int] # keyed by reason, e.g. {"hop2plus_cap": 47} + edge_confidence_summary: Dict[str, int] # {"extracted": 12, ...} + risk_score: float + why_risky: List[str] + graph_available: bool diff --git a/tests/unit/test_blast_radius_types.py b/tests/unit/test_blast_radius_types.py new file mode 100644 index 0000000..90035ce --- /dev/null +++ b/tests/unit/test_blast_radius_types.py @@ -0,0 +1,73 @@ +"""Sanity tests for blast-radius dataclasses and confidence enum.""" + +from app.services.blast_radius_types import ( + BlastRadius, + CallEdge, + ChangedSymbol, + EdgeConfidence, + Symbol, +) + + +def test_edge_confidence_values(): + assert EdgeConfidence.EXTRACTED.value == "extracted" + assert EdgeConfidence.INFERRED.value == "inferred" + assert EdgeConfidence.AMBIGUOUS.value == "ambiguous" + + +def test_edge_confidence_score_mapping(): + assert EdgeConfidence.EXTRACTED.default_score() == 1.0 + assert EdgeConfidence.INFERRED.default_score() == 0.7 + assert EdgeConfidence.AMBIGUOUS.default_score() == 0.3 + + +def test_symbol_defaults(): + sym = Symbol( + repo_id="owner/repo", + path="app/foo.py", + qualified_name="app.foo.bar", + kind="function", + start_line=10, + end_line=20, + signature_hash="abc123", + ) + assert sym.is_test is False + assert sym.is_entry_point is False + assert sym.is_hub is False + + +def test_changed_symbol_kinds(): + cs = ChangedSymbol( + path="app/foo.py", qualified_name="app.foo.bar", change_kind="modified" + ) + assert cs.change_kind in ("added", "modified", "removed") + + +def test_call_edge_carries_confidence_and_method(): + edge = CallEdge( + from_qname="app.foo.caller", + to_qname="app.bar.callee", + confidence=EdgeConfidence.INFERRED, + resolution_method="import_traced", + score=0.7, + source_sha="deadbeef", + ) + assert edge.confidence is EdgeConfidence.INFERRED + assert edge.resolution_method == "import_traced" + + +def test_blast_radius_defaults(): + br = BlastRadius( + changed_symbols=[], + impacted_symbols=[], + pr_files=[], + impacted_unchanged_files=[], + test_set=[], + dropped_due_to_cap={}, + edge_confidence_summary={}, + risk_score=0.0, + why_risky=[], + graph_available=False, + ) + assert br.dropped_due_to_cap == {} + assert br.graph_available is False From 6461260af932811907808e6b3b80859eb52637ef Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:21:16 -0700 Subject: [PATCH 2/8] feat(graph): symbol-level node and edge writes --- app/services/dependency_analyzer.py | 115 ++++++++++++++++++ .../test_dependency_analyzer_blast_radius.py | 109 +++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 tests/unit/test_dependency_analyzer_blast_radius.py diff --git a/app/services/dependency_analyzer.py b/app/services/dependency_analyzer.py index 1dec136..d021364 100644 --- a/app/services/dependency_analyzer.py +++ b/app/services/dependency_analyzer.py @@ -16,6 +16,7 @@ from falkordb import FalkorDB +from app.services.blast_radius_types import CallEdge, EdgeConfidence, Symbol from app.utils.graph_utils import get_graph_name, parse_file_path logger = logging.getLogger(__name__) @@ -281,6 +282,120 @@ async def add_import_edge( MERGE (from)-[:IMPORTS]->(to) """, {'from_path': from_file, 'to_path': to_file}) + # --------------------------------------------------------------- + # Symbol-level graph writes (blast-radius feature, spec D2). + # --------------------------------------------------------------- + + async def add_symbol_node(self, owner: str, repo: str, sym: Symbol) -> None: + """Upsert a Symbol node keyed by qualified_name.""" + graph = self._get_graph(owner, repo) + params = { + "qualified_name": sym.qualified_name, + "repo_id": sym.repo_id, + "path": sym.path, + "kind": sym.kind, + "start_line": sym.start_line, + "end_line": sym.end_line, + "signature_hash": sym.signature_hash, + "is_test": sym.is_test, + "is_entry_point": sym.is_entry_point, + "is_hub": sym.is_hub, + } + prop_str = ", ".join(f"{k}: ${k}" for k in params.keys()) + graph.query( + f"MERGE (s:Symbol {{qualified_name: $qualified_name}}) " + f"SET s += {{{prop_str}}}", + params, + ) + + async def add_defines_edge( + self, owner: str, repo: str, file_path: str, symbol_qname: str + ) -> None: + graph = self._get_graph(owner, repo) + graph.query( + "MATCH (f:File {path: $path}) " + "MATCH (s:Symbol {qualified_name: $qname}) " + "MERGE (f)-[:DEFINES]->(s)", + {"path": file_path, "qname": symbol_qname}, + ) + + async def add_call_edge( + self, owner: str, repo: str, edge: CallEdge + ) -> None: + graph = self._get_graph(owner, repo) + params = { + "from_qname": edge.from_qname, + "to_qname": edge.to_qname, + "confidence": edge.confidence.value, + "resolution_method": edge.resolution_method, + "score": edge.score, + "source_sha": edge.source_sha, + } + graph.query( + "MATCH (a:Symbol {qualified_name: $from_qname}) " + "MATCH (b:Symbol {qualified_name: $to_qname}) " + "MERGE (a)-[r:CALLS {resolution_method: $resolution_method}]->(b) " + "SET r.confidence = $confidence, r.score = $score, " + " r.source_sha = $source_sha", + params, + ) + + async def add_inherits_edge( + self, + owner: str, + repo: str, + child_qname: str, + parent_qname: str, + confidence: EdgeConfidence, + resolution_method: str, + source_sha: str, + ) -> None: + graph = self._get_graph(owner, repo) + graph.query( + "MATCH (c:Symbol {qualified_name: $child}) " + "MATCH (p:Symbol {qualified_name: $parent}) " + "MERGE (c)-[r:INHERITS_FROM]->(p) " + "SET r.confidence = $confidence, r.score = $score, " + " r.resolution_method = $resolution_method, " + " r.source_sha = $source_sha", + { + "child": child_qname, + "parent": parent_qname, + "confidence": confidence.value, + "score": confidence.default_score(), + "resolution_method": resolution_method, + "source_sha": source_sha, + }, + ) + + async def add_tested_by_edge( + self, + owner: str, + repo: str, + symbol_qname: str, + test_qname: str, + confidence: EdgeConfidence, + resolution_method: str, + source_sha: str, + ) -> None: + graph = self._get_graph(owner, repo) + graph.query( + "MATCH (s:Symbol {qualified_name: $sym_qname}) " + "MATCH (t:Symbol {qualified_name: $test_qname}) " + "MERGE (s)-[r:TESTED_BY]->(t) " + "SET r.confidence = $confidence, r.score = $score, " + " r.resolution_method = $resolution_method, " + " r.source_sha = $source_sha", + { + "sym_qname": symbol_qname, + "test_qname": test_qname, + "confidence": confidence.value, + "score": confidence.default_score(), + "resolution_method": resolution_method, + "source_sha": source_sha, + }, + ) + async def get_dependencies( self, owner: str, diff --git a/tests/unit/test_dependency_analyzer_blast_radius.py b/tests/unit/test_dependency_analyzer_blast_radius.py new file mode 100644 index 0000000..72f86e9 --- /dev/null +++ b/tests/unit/test_dependency_analyzer_blast_radius.py @@ -0,0 +1,109 @@ +"""Mocked-Falkor unit tests for new symbol/edge write methods.""" + +from unittest.mock import MagicMock + +import pytest + +from app.services.blast_radius_types import ( + CallEdge, + EdgeConfidence, + Symbol, +) +from app.services.dependency_analyzer import DependencyGraphAnalyzer + + +@pytest.fixture +def analyzer_with_mock_graph(): + analyzer = DependencyGraphAnalyzer() + analyzer.db = MagicMock() + fake_graph = MagicMock() + analyzer.db.select_graph = MagicMock(return_value=fake_graph) + return analyzer, fake_graph + + +@pytest.mark.asyncio +async def test_add_symbol_node_emits_merge_with_qualified_name_key( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + sym = Symbol( + repo_id="o/r", + path="app/foo.py", + qualified_name="app.foo.bar", + kind="function", + start_line=1, + end_line=10, + signature_hash="h", + ) + + await analyzer.add_symbol_node("o", "r", sym) + + assert graph.query.called + cypher, params = graph.query.call_args[0] + assert "MERGE (s:Symbol {qualified_name: $qualified_name})" in cypher + assert params["qualified_name"] == "app.foo.bar" + assert params["kind"] == "function" + assert params["is_test"] is False + + +@pytest.mark.asyncio +async def test_add_call_edge_writes_confidence_props( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + edge = CallEdge( + from_qname="app.foo.caller", + to_qname="app.bar.callee", + confidence=EdgeConfidence.INFERRED, + resolution_method="name_match", + score=0.7, + source_sha="sha1", + ) + + await analyzer.add_call_edge("o", "r", edge) + + cypher, params = graph.query.call_args[0] + assert "MERGE (a)-[r:CALLS" in cypher + assert params["confidence"] == "inferred" + assert params["resolution_method"] == "name_match" + assert params["score"] == 0.7 + assert params["source_sha"] == "sha1" + + +@pytest.mark.asyncio +async def test_add_inherits_edge_writes_confidence_props( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + + await analyzer.add_inherits_edge( + "o", "r", + child_qname="app.x.Sub", + parent_qname="app.x.Base", + confidence=EdgeConfidence.EXTRACTED, + resolution_method="import_traced", + source_sha="sha1", + ) + + cypher, params = graph.query.call_args[0] + assert "MERGE (c)-[r:INHERITS_FROM" in cypher + assert params["confidence"] == "extracted" + assert params["score"] == 1.0 + + +@pytest.mark.asyncio +async def test_add_tested_by_edge(analyzer_with_mock_graph): + analyzer, graph = analyzer_with_mock_graph + + await analyzer.add_tested_by_edge( + "o", "r", + symbol_qname="app.foo.bar", + test_qname="tests.unit.test_foo.test_bar", + confidence=EdgeConfidence.INFERRED, + resolution_method="name_match", + source_sha="sha1", + ) + + cypher, params = graph.query.call_args[0] + assert "MERGE (s)-[r:TESTED_BY" in cypher + assert params["test_qname"] == "tests.unit.test_foo.test_bar" From 89f52a8d9c9206dc6cdc06f99eef2100ba6d4755 Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:21:42 -0700 Subject: [PATCH 3/8] feat(chunker): extract_symbols + extract_changed_symbols (Python) --- app/services/code_chunker.py | 165 ++++++++++++++++++++++++ tests/unit/test_code_chunker_symbols.py | 148 +++++++++++++++++++++ 2 files changed, 313 insertions(+) create mode 100644 tests/unit/test_code_chunker_symbols.py diff --git a/app/services/code_chunker.py b/app/services/code_chunker.py index 96d6146..cf4953d 100644 --- a/app/services/code_chunker.py +++ b/app/services/code_chunker.py @@ -19,6 +19,7 @@ import tree_sitter_typescript as tsts from tree_sitter import Language, Parser +from app.services.blast_radius_types import ChangedSymbol, Symbol from app.services.vector_store import CodeChunk logger = logging.getLogger(__name__) @@ -361,6 +362,170 @@ def _infer_chunk_type(self, node_type: str) -> str: else: return ChunkType.UNKNOWN + # --------------------------------------------------------------- + # Symbol extraction (used by blast-radius graph indexing). + # --------------------------------------------------------------- + + _PYTHON_TEST_FILE_PATTERNS = ("test_", "_test") + _PYTHON_TEST_FUNC_PREFIX = "test_" + + def extract_symbols( + self, + source_code: str, + language: str, + file_path: str, + repo_id: str, + ) -> list: + """ + Extract `Symbol` records from a source file. + + PR 1 implements Python only. Other languages return [] and will + be added in PR 2 alongside their CallResolver implementations. + """ + if language != "python": + return [] + return self._extract_python_symbols(source_code, file_path, repo_id) + + def _extract_python_symbols( + self, source_code: str, file_path: str, repo_id: str + ) -> list: + parser = self._get_parser("python") + if parser is None: + return [] + + tree = parser.parse(bytes(source_code, "utf8")) + module_qname = self._python_module_qname(file_path) + is_test_file = self._is_python_test_file(file_path) + + symbols = [] + + def visit(node, class_chain): + if node.type == "function_definition": + name_node = node.child_by_field_name("name") + if name_node is None: + return + func_name = name_node.text.decode() + qname_parts = [module_qname, *class_chain, func_name] + qname = ".".join(p for p in qname_parts if p) + kind = "method" if class_chain else "function" + symbols.append( + Symbol( + repo_id=repo_id, + path=file_path, + qualified_name=qname, + kind=kind, + start_line=node.start_point[0] + 1, + end_line=node.end_point[0] + 1, + signature_hash=self._python_signature_hash(node), + is_test=( + is_test_file + and not class_chain + and func_name.startswith(self._PYTHON_TEST_FUNC_PREFIX) + ), + ) + ) + # Walk inner functions/classes too. + body = node.child_by_field_name("body") + if body is not None: + for child in body.children: + visit(child, class_chain) + return + + if node.type == "class_definition": + name_node = node.child_by_field_name("name") + if name_node is None: + return + cls_name = name_node.text.decode() + qname = ".".join([module_qname, *class_chain, cls_name]) + symbols.append( + Symbol( + repo_id=repo_id, + path=file_path, + qualified_name=qname, + kind="class", + start_line=node.start_point[0] + 1, + end_line=node.end_point[0] + 1, + signature_hash=self._python_signature_hash(node), + ) + ) + body = node.child_by_field_name("body") + if body is not None: + for child in body.children: + visit(child, [*class_chain, cls_name]) + return + + for child in node.children: + visit(child, class_chain) + + visit(tree.root_node, []) + return symbols + + def extract_changed_symbols( + self, + file_path: str, + source_before: str, + source_after: str, + language: str, + ) -> list: + """ + Compare before/after sources of a file and return ChangedSymbol entries. + + - "added" -- in `after` only. + - "removed" -- in `before` only. + - "modified" -- in both, but signature_hash differs. + Unchanged symbols are NOT returned. + """ + before_syms = { + s.qualified_name: s + for s in self.extract_symbols(source_before, language, file_path, repo_id="") + } + after_syms = { + s.qualified_name: s + for s in self.extract_symbols(source_after, language, file_path, repo_id="") + } + + changes = [] + for qname, sym in after_syms.items(): + if qname not in before_syms: + changes.append( + ChangedSymbol(path=file_path, qualified_name=qname, change_kind="added") + ) + elif before_syms[qname].signature_hash != sym.signature_hash: + changes.append( + ChangedSymbol(path=file_path, qualified_name=qname, change_kind="modified") + ) + for qname in before_syms.keys() - after_syms.keys(): + changes.append( + ChangedSymbol(path=file_path, qualified_name=qname, change_kind="removed") + ) + return changes + + def _python_module_qname(self, file_path: str) -> str: + """Convert `app/services/foo.py` -> `app.services.foo`.""" + if file_path.endswith(".py"): + file_path = file_path[:-3] + # Drop `__init__` suffixes so the package itself is the qualifier. + if file_path.endswith("/__init__"): + file_path = file_path[: -len("/__init__")] + return file_path.replace("/", ".") + + def _is_python_test_file(self, file_path: str) -> bool: + name = file_path.rsplit("/", 1)[-1] + return ( + name.startswith(self._PYTHON_TEST_FILE_PATTERNS[0]) + or name.removesuffix(".py").endswith(self._PYTHON_TEST_FILE_PATTERNS[1]) + or "/tests/" in f"/{file_path}" + ) + + def _python_signature_hash(self, node) -> str: + """Hash the symbol's full text - body included. + + Body changes invalidate downstream cached call resolutions because the + callee set inside the function may have changed. + """ + text = node.text.decode("utf-8") if node.text else "" + return hashlib.md5(text.encode("utf-8")).hexdigest() + def chunk_file( self, file_path: str, diff --git a/tests/unit/test_code_chunker_symbols.py b/tests/unit/test_code_chunker_symbols.py new file mode 100644 index 0000000..b57cb8e --- /dev/null +++ b/tests/unit/test_code_chunker_symbols.py @@ -0,0 +1,148 @@ +"""Symbol extraction tests for CodeChunker.""" + +from app.services.code_chunker import CodeChunker + +PYTHON_SOURCE = ''' +def top_level_func(a, b): + return a + b + + +class MyClass: + def method_a(self): + return 1 + + def method_b(self): + return 2 + + +def _private_helper(): + pass +''' + + +def test_extract_symbols_python_top_level_function(): + chunker = CodeChunker() + symbols = chunker.extract_symbols( + PYTHON_SOURCE, language="python", file_path="app/foo.py", repo_id="o/r" + ) + + qnames = {s.qualified_name for s in symbols} + assert "app.foo.top_level_func" in qnames + assert "app.foo._private_helper" in qnames + + +def test_extract_symbols_python_class_and_methods(): + chunker = CodeChunker() + symbols = chunker.extract_symbols( + PYTHON_SOURCE, language="python", file_path="app/foo.py", repo_id="o/r" + ) + + qnames = {s.qualified_name for s in symbols} + assert "app.foo.MyClass" in qnames + assert "app.foo.MyClass.method_a" in qnames + assert "app.foo.MyClass.method_b" in qnames + + +def test_extract_symbols_python_kinds_are_correct(): + chunker = CodeChunker() + symbols = chunker.extract_symbols( + PYTHON_SOURCE, language="python", file_path="app/foo.py", repo_id="o/r" + ) + + by_qname = {s.qualified_name: s for s in symbols} + assert by_qname["app.foo.top_level_func"].kind == "function" + assert by_qname["app.foo.MyClass"].kind == "class" + assert by_qname["app.foo.MyClass.method_a"].kind == "method" + + +def test_extract_symbols_signature_hash_changes_with_signature(): + chunker = CodeChunker() + src_v1 = "def f(a):\n return a\n" + src_v2 = "def f(a, b):\n return a + b\n" + + s1 = chunker.extract_symbols(src_v1, "python", "app/foo.py", "o/r") + s2 = chunker.extract_symbols(src_v2, "python", "app/foo.py", "o/r") + + assert s1[0].signature_hash != s2[0].signature_hash + + +def test_extract_symbols_marks_python_test_functions(): + chunker = CodeChunker() + test_src = ''' +def test_something(): + assert True + +def helper_not_a_test(): + pass +''' + symbols = chunker.extract_symbols( + test_src, "python", "tests/unit/test_foo.py", "o/r" + ) + by_qname = {s.qualified_name: s for s in symbols} + + assert by_qname["tests.unit.test_foo.test_something"].is_test is True + assert by_qname["tests.unit.test_foo.helper_not_a_test"].is_test is False + + +def test_extract_symbols_non_python_returns_empty_in_pr1(): + """Non-Python resolvers land in PR 2; PR 1 returns empty for them.""" + chunker = CodeChunker() + js_source = "function foo() { return 1; }" + + symbols = chunker.extract_symbols( + js_source, language="javascript", file_path="src/foo.js", repo_id="o/r" + ) + assert symbols == [] + + +def test_extract_changed_symbols_added_modified_removed(): + chunker = CodeChunker() + before = ''' +def kept_unchanged(): + return 1 + +def will_be_modified(a): + return a +''' + after = ''' +def kept_unchanged(): + return 1 + +def will_be_modified(a, b): + return a + b + +def newly_added(): + return 99 +''' + changes = chunker.extract_changed_symbols( + file_path="app/foo.py", + source_before=before, + source_after=after, + language="python", + ) + by_qname = {c.qualified_name: c for c in changes} + + assert "app.foo.will_be_modified" in by_qname + assert by_qname["app.foo.will_be_modified"].change_kind == "modified" + + assert "app.foo.newly_added" in by_qname + assert by_qname["app.foo.newly_added"].change_kind == "added" + + assert "app.foo.kept_unchanged" not in by_qname # unchanged, not reported + + +def test_extract_changed_symbols_removed(): + chunker = CodeChunker() + before = "def gone():\n return 1\n\ndef stays():\n return 2\n" + after = "def stays():\n return 2\n" + + changes = chunker.extract_changed_symbols( + file_path="app/foo.py", + source_before=before, + source_after=after, + language="python", + ) + by_qname = {c.qualified_name: c for c in changes} + + assert "app.foo.gone" in by_qname + assert by_qname["app.foo.gone"].change_kind == "removed" From f84d1cd2339585f95824fc9b94eff15efdb62803 Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:24:55 -0700 Subject: [PATCH 4/8] feat(blast-radius): PythonCallResolver with confidence tiers --- app/services/python_call_resolver.py | 351 ++++++++++++++++++++++++ tests/unit/test_python_call_resolver.py | 191 +++++++++++++ 2 files changed, 542 insertions(+) create mode 100644 app/services/python_call_resolver.py create mode 100644 tests/unit/test_python_call_resolver.py diff --git a/app/services/python_call_resolver.py b/app/services/python_call_resolver.py new file mode 100644 index 0000000..3c38aae --- /dev/null +++ b/app/services/python_call_resolver.py @@ -0,0 +1,351 @@ +"""Resolve Python `CALLS` edges with confidence tiers. + +Walks a parsed Python file, collects every call site, and resolves each +to one or more `Symbol`s in the project-wide table. Emits CallEdge +records per spec D2. + +This module is stateless and pure-Python; the FalkorDB write happens +in `DependencyGraphAnalyzer`, the indexing-time orchestration in PR 2. +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import Dict, List, Optional + +import tree_sitter_python as tspython +from tree_sitter import Language, Parser + +from app.services.blast_radius_types import CallEdge, EdgeConfidence, Symbol + +logger = logging.getLogger(__name__) + + +class PythonCallResolver: + """Cross-file Python call resolution (Tier 2 from spec D1).""" + + def __init__(self): + self._parser = Parser(Language(tspython.language())) + + def resolve_calls_in_file( + self, + source_code: str, + file_path: str, + symbol_table: Dict[str, Symbol], + source_sha: str, + ) -> List[CallEdge]: + tree = self._parser.parse(bytes(source_code, "utf8")) + root = tree.root_node + + imports = self._collect_imports(root) + local_index = self._index_by_local_name(symbol_table) + + edges: List[CallEdge] = [] + # Walk every function/method in the file; resolve calls inside its body. + for caller_qname, body_node in self._iter_callers( + root, file_path, symbol_table + ): + for call_node in self._iter_call_sites(body_node): + edges.extend( + self._resolve_call( + call_node=call_node, + caller_qname=caller_qname, + imports=imports, + symbol_table=symbol_table, + local_index=local_index, + source_sha=source_sha, + ) + ) + return edges + + # -- AST traversal helpers ------------------------------------------- + + def _iter_callers(self, root_node, file_path, symbol_table): + """Yield (caller_qname, body_node) for every function/method in file.""" + from app.services.code_chunker import CodeChunker + + # We only need the qualified-name mapping for symbols defined in this file. + chunker = CodeChunker() + module_qname = chunker._python_module_qname(file_path) + + def visit(node, class_chain): + if node.type == "function_definition": + name_node = node.child_by_field_name("name") + body = node.child_by_field_name("body") + if name_node is not None and body is not None: + qname = ".".join( + [module_qname, *class_chain, name_node.text.decode()] + ) + yield qname, body + # Walk nested defs. + if body is not None: + for child in body.children: + yield from visit(child, class_chain) + return + if node.type == "class_definition": + name_node = node.child_by_field_name("name") + body = node.child_by_field_name("body") + if name_node is not None and body is not None: + inner_chain = [*class_chain, name_node.text.decode()] + for child in body.children: + yield from visit(child, inner_chain) + return + for child in node.children: + yield from visit(child, class_chain) + + yield from visit(root_node, []) + + def _iter_call_sites(self, body_node): + def walk(node): + if node.type == "call": + yield node + for child in node.children: + yield from walk(child) + + yield from walk(body_node) + + # -- Import collection ----------------------------------------------- + + def _collect_imports(self, root_node) -> Dict[str, str]: + """Return {local_name_in_file: fully_qualified_target}. + + Examples: + `from app.bar import callee` -> {"callee": "app.bar.callee"} + `from app.bar import callee as c` -> {"c": "app.bar.callee"} + `from app import bar` -> {"bar": "app.bar"} + `import app.bar` -> {"app.bar": "app.bar", "app": "app"} + `import app.bar as ab` -> {"ab": "app.bar"} + """ + imports: Dict[str, str] = {} + + def visit(node): + if node.type == "import_from_statement": + module = None + for child in node.children: + if child.type == "dotted_name": + module = child.text.decode() + break + if module is None: + return + # Imported names appear as dotted_name / aliased_import children + # AFTER the "import" keyword. + seen_import_kw = False + for child in node.children: + if child.type == "import": + seen_import_kw = True + continue + if not seen_import_kw: + continue + if child.type == "dotted_name": + local = child.text.decode() + imports[local] = f"{module}.{local}" + elif child.type == "aliased_import": + name_node = child.child_by_field_name("name") + alias_node = child.child_by_field_name("alias") + if name_node and alias_node: + imports[alias_node.text.decode()] = ( + f"{module}.{name_node.text.decode()}" + ) + return + + if node.type == "import_statement": + for child in node.children: + if child.type == "dotted_name": + full = child.text.decode() + imports[full] = full + # Also bind the leftmost segment so `app.bar.foo()` works. + head = full.split(".", 1)[0] + imports.setdefault(head, head) + elif child.type == "aliased_import": + name_node = child.child_by_field_name("name") + alias_node = child.child_by_field_name("alias") + if name_node and alias_node: + imports[alias_node.text.decode()] = ( + name_node.text.decode() + ) + return + + for child in node.children: + visit(child) + + visit(root_node) + return imports + + # -- Symbol-table indexing ------------------------------------------- + + def _index_by_local_name( + self, symbol_table: Dict[str, Symbol] + ) -> Dict[str, List[str]]: + """Build {local_name: [matching qualified_name, ...]} for bare-name lookup.""" + index: Dict[str, List[str]] = defaultdict(list) + for qname in symbol_table: + local = qname.rsplit(".", 1)[-1] + index[local].append(qname) + return index + + # -- Resolution ------------------------------------------------------ + + def _resolve_call( + self, + call_node, + caller_qname: str, + imports: Dict[str, str], + symbol_table: Dict[str, Symbol], + local_index: Dict[str, List[str]], + source_sha: str, + ) -> List[CallEdge]: + func_node = call_node.child_by_field_name("function") + if func_node is None: + return [] + + # Case 1: `name()` -- bare identifier + if func_node.type == "identifier": + local_name = func_node.text.decode() + return self._resolve_bare_or_imported( + local_name, caller_qname, imports, symbol_table, local_index, source_sha + ) + + # Case 2: `a.b.c()` -- attribute access; reduce to dotted path + if func_node.type == "attribute": + dotted = self._dotted_path(func_node) + if dotted is None: + return [] + return self._resolve_dotted( + dotted, caller_qname, imports, symbol_table, local_index, source_sha + ) + + # Case 3: `getattr(x, "literal")()` -- the function being called is + # itself a call expression. Emit AMBIGUOUS edges for every symbol + # whose local name matches the literal (recall = 1.0 per spec D2). + # Non-literal getattr is logged once and skipped (known gap). + if func_node.type == "call": + return self._resolve_getattr_call( + func_node, caller_qname, local_index, source_sha + ) + + return [] + + def _resolve_getattr_call( + self, inner_call_node, caller_qname, local_index, source_sha + ) -> List[CallEdge]: + inner_func = inner_call_node.child_by_field_name("function") + if inner_func is None or inner_func.type != "identifier": + return [] + if inner_func.text.decode() != "getattr": + return [] + args_node = inner_call_node.child_by_field_name("arguments") + if args_node is None: + return [] + arg_nodes = [ + c for c in args_node.children + if c.type not in ("(", ")", ",", "comment") + ] + if len(arg_nodes) < 2: + return [] + name_arg = arg_nodes[1] + if name_arg.type != "string": + logger.debug( + "getattr called with non-literal name in caller=%s; " + "skipping (known gap, spec D2)", + caller_qname, + ) + return [] + literal = name_arg.text.decode().strip("\"'") + candidates = local_index.get(literal, []) + return [ + self._make_edge( + caller_qname, c, EdgeConfidence.AMBIGUOUS, "getattr_literal", source_sha + ) + for c in candidates + ] + + def _resolve_bare_or_imported( + self, local_name, caller_qname, imports, symbol_table, local_index, source_sha + ) -> List[CallEdge]: + # Imported name? -> EXTRACTED. + if local_name in imports: + target = imports[local_name] + if target in symbol_table: + return [self._make_edge( + caller_qname, target, EdgeConfidence.EXTRACTED, "import_traced", source_sha + )] + return [] + # Bare name fallback -> name match in symbol_table. + return self._resolve_name_match( + local_name, caller_qname, local_index, symbol_table, source_sha + ) + + def _resolve_dotted( + self, dotted, caller_qname, imports, symbol_table, local_index, source_sha + ) -> List[CallEdge]: + """Resolve `head.tail` calls, possibly through a module import or local var.""" + head, _, tail = dotted.partition(".") + + # `head` is an imported module/package. + if head in imports: + candidate = f"{imports[head]}.{tail}" + if candidate in symbol_table: + return [self._make_edge( + caller_qname, candidate, EdgeConfidence.EXTRACTED, "import_traced", source_sha + )] + # Imported but target unknown — give up (unresolvable, not ambiguous). + return [] + + # `head` is a local variable. We don't track types in PR 1, but the + # leaf method name `tail` may match exactly one Symbol -> INFERRED. + leaf = tail.rsplit(".", 1)[-1] + return self._resolve_name_match( + leaf, caller_qname, local_index, symbol_table, source_sha + ) + + def _resolve_name_match( + self, local_name, caller_qname, local_index, symbol_table, source_sha + ) -> List[CallEdge]: + candidates = local_index.get(local_name, []) + if not candidates: + return [] + if len(candidates) == 1: + return [self._make_edge( + caller_qname, candidates[0], EdgeConfidence.INFERRED, "name_match", source_sha + )] + return [ + self._make_edge( + caller_qname, c, EdgeConfidence.AMBIGUOUS, "ambiguous_candidate", source_sha + ) + for c in candidates + ] + + def _dotted_path(self, attr_node) -> Optional[str]: + """Reduce nested `attribute` nodes into a `a.b.c` string.""" + parts: List[str] = [] + cur = attr_node + while cur.type == "attribute": + attr = cur.child_by_field_name("attribute") + if attr is None: + return None + parts.append(attr.text.decode()) + cur = cur.child_by_field_name("object") + if cur is None: + return None + if cur.type != "identifier": + return None + parts.append(cur.text.decode()) + return ".".join(reversed(parts)) + + def _make_edge( + self, + from_qname, + to_qname, + confidence: EdgeConfidence, + method: str, + source_sha: str, + ) -> CallEdge: + return CallEdge( + from_qname=from_qname, + to_qname=to_qname, + confidence=confidence, + resolution_method=method, + score=confidence.default_score(), + source_sha=source_sha, + ) diff --git a/tests/unit/test_python_call_resolver.py b/tests/unit/test_python_call_resolver.py new file mode 100644 index 0000000..5128201 --- /dev/null +++ b/tests/unit/test_python_call_resolver.py @@ -0,0 +1,191 @@ +"""Tests for PythonCallResolver confidence tiers.""" + +from app.services.blast_radius_types import EdgeConfidence, Symbol +from app.services.python_call_resolver import PythonCallResolver + + +def _sym(qname, path, kind="function"): + return Symbol( + repo_id="o/r", + path=path, + qualified_name=qname, + kind=kind, + start_line=1, + end_line=2, + signature_hash="h", + ) + + +def test_resolves_explicit_from_import_as_extracted(): + source = ''' +from app.bar import callee + +def caller(): + callee() +''' + symbol_table = { + "app.bar.callee": _sym("app.bar.callee", "app/bar.py"), + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source_code=source, + file_path="app/foo.py", + symbol_table=symbol_table, + source_sha="sha1", + ) + by_pair = {(e.from_qname, e.to_qname): e for e in edges} + edge = by_pair[("app.foo.caller", "app.bar.callee")] + assert edge.confidence is EdgeConfidence.EXTRACTED + assert edge.resolution_method == "import_traced" + assert edge.score == 1.0 + assert edge.source_sha == "sha1" + + +def test_resolves_attribute_call_through_module_import_as_extracted(): + source = ''' +from app import bar + +def caller(): + bar.callee() +''' + symbol_table = { + "app.bar.callee": _sym("app.bar.callee", "app/bar.py"), + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + pair = ("app.foo.caller", "app.bar.callee") + assert any( + e.from_qname == pair[0] and e.to_qname == pair[1] + and e.confidence is EdgeConfidence.EXTRACTED + for e in edges + ) + + +def test_bare_name_with_unique_match_is_inferred(): + source = ''' +def caller(): + helper() +''' + symbol_table = { + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + "app.utils.helper": _sym("app.utils.helper", "app/utils.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + edge = next( + e for e in edges + if e.from_qname == "app.foo.caller" and e.to_qname == "app.utils.helper" + ) + assert edge.confidence is EdgeConfidence.INFERRED + assert edge.resolution_method == "name_match" + + +def test_bare_name_with_multiple_matches_emits_all_as_ambiguous(): + source = ''' +def caller(): + process() +''' + symbol_table = { + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + "app.a.process": _sym("app.a.process", "app/a.py"), + "app.b.process": _sym("app.b.process", "app/b.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + targets = { + e.to_qname for e in edges if e.from_qname == "app.foo.caller" + } + assert targets == {"app.a.process", "app.b.process"} + for e in edges: + if e.from_qname == "app.foo.caller": + assert e.confidence is EdgeConfidence.AMBIGUOUS + assert e.resolution_method == "ambiguous_candidate" + + +def test_unresolvable_call_emits_no_edge(): + source = ''' +def caller(): + nowhere_to_be_found() +''' + symbol_table = { + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + assert edges == [] + + +def test_method_call_resolves_to_class_method_when_unambiguous(): + source = ''' +from app.bar import Service + +def caller(): + s = Service() + s.run() +''' + symbol_table = { + "app.bar.Service": _sym("app.bar.Service", "app/bar.py", kind="class"), + "app.bar.Service.run": _sym( + "app.bar.Service.run", "app/bar.py", kind="method" + ), + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + # Calls to .run() on a Service instance should resolve to Service.run. + targets = { + e.to_qname for e in edges + if e.from_qname == "app.foo.caller" + } + assert "app.bar.Service.run" in targets + + +def test_getattr_with_string_literal_emits_ambiguous_for_all_candidates(): + source = ''' +def caller(obj): + getattr(obj, "process")() +''' + symbol_table = { + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + "app.a.process": _sym("app.a.process", "app/a.py"), + "app.b.process": _sym("app.b.process", "app/b.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + targets = {e.to_qname for e in edges if e.from_qname == "app.foo.caller"} + assert targets == {"app.a.process", "app.b.process"} + for e in edges: + if e.from_qname == "app.foo.caller": + assert e.confidence is EdgeConfidence.AMBIGUOUS + assert e.resolution_method == "getattr_literal" + + +def test_getattr_with_dynamic_arg_emits_no_edge(): + source = ''' +def caller(obj, name): + getattr(obj, name)() +''' + symbol_table = { + "app.foo.caller": _sym("app.foo.caller", "app/foo.py"), + "app.a.process": _sym("app.a.process", "app/a.py"), + } + resolver = PythonCallResolver() + edges = resolver.resolve_calls_in_file( + source, "app/foo.py", symbol_table, "sha1" + ) + # Non-literal getattr is a known gap -- skip rather than fabricate edges. + assert [e for e in edges if e.from_qname == "app.foo.caller"] == [] From ed67173a8601f279d308b0128a7bd4ccc1ab5ce9 Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:30:49 -0700 Subject: [PATCH 5/8] feat(graph): TESTED_BY derivation via call-graph BFS --- app/services/dependency_analyzer.py | 84 +++++++++++++++++++ .../test_dependency_analyzer_blast_radius.py | 30 +++++++ 2 files changed, 114 insertions(+) diff --git a/app/services/dependency_analyzer.py b/app/services/dependency_analyzer.py index d021364..bd628c7 100644 --- a/app/services/dependency_analyzer.py +++ b/app/services/dependency_analyzer.py @@ -396,6 +396,90 @@ async def add_tested_by_edge( }, ) + async def derive_tested_by_edges( + self, + owner: str, + repo: str, + source_sha: str, + max_depth: int = 4, + max_reachable: int = 50, + ) -> int: + """Derive `TESTED_BY` edges from existing CALLS edges. + + Algorithm (spec D5, primary path B): + 1. For every Symbol with is_test=true, BFS forward along CALLS up to + `max_depth`, collect up to `max_reachable` non-test symbols. + 2. For each (test_symbol, app_symbol) pair, emit a TESTED_BY edge + whose confidence is the MIN confidence along the path. + + Returns the number of edges written. + """ + graph = self._get_graph(owner, repo) + + # Single Cypher query: for each test symbol, find reachable non-test + # symbols and the minimum-confidence edge on the path. FalkorDB + # supports variable-length patterns; we collect distinct pairs. + bfs_cypher = ( + f"MATCH (t:Symbol {{is_test: true}}) " + f"MATCH path = (t)-[edges:CALLS*1..{max_depth}]->(s:Symbol) " + f"WHERE s.is_test = false " + f"WITH t, s, " + f" [e IN edges | e.confidence] AS confs " + f"RETURN DISTINCT t.qualified_name, s.qualified_name, confs" + ) + result = graph.query(bfs_cypher) + + from collections import defaultdict + + # Cypher's DISTINCT keys on (t, s, confs), so the same (test, app) + # pair can appear multiple times if multiple paths exist with + # different confidence lists. Aggregate Python-side: keep the BEST + # path per pair (highest min-confidence) so the written edge is + # deterministic and reflects the strongest reachability evidence. + order = {"ambiguous": 0, "inferred": 1, "extracted": 2} + best_per_pair = {} + for row in result.result_set: + test_qname, app_qname, confs = row[0], row[1], row[2] + path_min = self._min_confidence(confs) + key = (test_qname, app_qname) + current = best_per_pair.get(key) + if current is None or order[path_min.value] > order[current.value]: + best_per_pair[key] = path_min + + per_test_written = defaultdict(int) + total_written = 0 + # Sort for deterministic write order across runs. + for (test_qname, app_qname), min_confidence in sorted(best_per_pair.items()): + # Per-test cap (spec D5): a single high-fanout test must not + # starve other tests of TESTED_BY edges. + if per_test_written[test_qname] >= max_reachable: + continue + await self.add_tested_by_edge( + owner, + repo, + symbol_qname=app_qname, + test_qname=test_qname, + confidence=min_confidence, + resolution_method="call_graph_bfs", + source_sha=source_sha, + ) + per_test_written[test_qname] += 1 + total_written += 1 + return total_written + + @staticmethod + def _min_confidence(conf_strings) -> EdgeConfidence: + """Return the weakest confidence in a list of confidence-string values.""" + order = { + "ambiguous": 0, + "inferred": 1, + "extracted": 2, + } + if not conf_strings: + return EdgeConfidence.AMBIGUOUS + weakest = min(conf_strings, key=lambda c: order.get(c, 0)) + return EdgeConfidence(weakest) + async def get_dependencies( self, owner: str, diff --git a/tests/unit/test_dependency_analyzer_blast_radius.py b/tests/unit/test_dependency_analyzer_blast_radius.py index 72f86e9..a8e0d5b 100644 --- a/tests/unit/test_dependency_analyzer_blast_radius.py +++ b/tests/unit/test_dependency_analyzer_blast_radius.py @@ -107,3 +107,33 @@ async def test_add_tested_by_edge(analyzer_with_mock_graph): cypher, params = graph.query.call_args[0] assert "MERGE (s)-[r:TESTED_BY" in cypher assert params["test_qname"] == "tests.unit.test_foo.test_bar" + + +@pytest.mark.asyncio +async def test_derive_tested_by_edges_runs_bfs_cypher( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + # Pretend the BFS query returns one path: test_a -> helper -> bar + # represented by FalkorDB as (test_qname, app_qname, min_confidence). + fake_result = MagicMock() + # Each row's third column is the LIST of edge confidences along the + # CALLS path -- the BFS Cypher returns `[e IN edges | e.confidence]`, + # not a pre-reduced single value. _min_confidence reduces it. + fake_result.result_set = [ + ["tests.unit.test_foo.test_bar", "app.foo.bar", ["extracted"]], + ["tests.unit.test_foo.test_bar", "app.foo.helper", ["extracted", "inferred"]], + ] + graph.query = MagicMock(return_value=fake_result) + + edges_written = await analyzer.derive_tested_by_edges( + "o", "r", source_sha="sha1", max_depth=4, max_reachable=50 + ) + + # The first .query call is the BFS read; subsequent calls write TESTED_BY. + assert graph.query.call_count >= 1 + bfs_cypher = graph.query.call_args_list[0][0][0] + assert "MATCH (t:Symbol {is_test: true})" in bfs_cypher + assert "[edges:CALLS*1..4]" in bfs_cypher + # Returned a count of edges written. + assert edges_written == 2 From 8282b644d7d81e140081e25b3ba877a2ee87741a Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:32:34 -0700 Subject: [PATCH 6/8] feat(graph): is_hub computation via P95 of incoming CALLS --- app/services/dependency_analyzer.py | 43 +++++++++++++++++++ .../test_dependency_analyzer_blast_radius.py | 23 ++++++++++ 2 files changed, 66 insertions(+) diff --git a/app/services/dependency_analyzer.py b/app/services/dependency_analyzer.py index bd628c7..503b8eb 100644 --- a/app/services/dependency_analyzer.py +++ b/app/services/dependency_analyzer.py @@ -671,6 +671,49 @@ async def delete_graph(self, owner: str, repo: str) -> None: logger.error(f"Failed to delete graph {graph_name}: {e}") raise + async def compute_is_hub(self, owner: str, repo: str) -> int: + """Mark Symbols with incoming-call count >= P95 as is_hub=true. + + Returns the number of hubs marked. + """ + graph = self._get_graph(owner, repo) + + result = graph.query( + "MATCH (s:Symbol) " + "OPTIONAL MATCH (s)<-[r:CALLS]-() " + "RETURN s.qualified_name, count(r) AS incoming " + "ORDER BY incoming DESC" + ) + rows = result.result_set + if not rows: + return 0 + + counts = [int(row[1]) for row in rows] + threshold = self._percentile(counts, 95) + + n_marked = 0 + for qname, incoming in rows: + if int(incoming) >= threshold and int(incoming) > 0: + graph.query( + "MATCH (s:Symbol {qualified_name: $qname}) SET s.is_hub = true", + {"qname": qname}, + ) + n_marked += 1 + return n_marked + + @staticmethod + def _percentile(values, p) -> float: + """Simple inclusive percentile; values must be sorted descending.""" + if not values: + return 0.0 + # values is descending; reverse for ascending percentile math. + s = sorted(values) + k = (len(s) - 1) * (p / 100) + lo, hi = int(k), min(int(k) + 1, len(s) - 1) + if lo == hi: + return float(s[lo]) + return s[lo] + (s[hi] - s[lo]) * (k - lo) + async def get_graph_stats(self, owner: str, repo: str) -> Dict: """ Get statistics about the graph. diff --git a/tests/unit/test_dependency_analyzer_blast_radius.py b/tests/unit/test_dependency_analyzer_blast_radius.py index a8e0d5b..be2011c 100644 --- a/tests/unit/test_dependency_analyzer_blast_radius.py +++ b/tests/unit/test_dependency_analyzer_blast_radius.py @@ -137,3 +137,26 @@ async def test_derive_tested_by_edges_runs_bfs_cypher( assert "[edges:CALLS*1..4]" in bfs_cypher # Returned a count of edges written. assert edges_written == 2 + + +@pytest.mark.asyncio +async def test_compute_is_hub_marks_top_symbols(analyzer_with_mock_graph): + analyzer, graph = analyzer_with_mock_graph + + # First query returns (qname, incoming_count) pairs. + counts_result = MagicMock() + counts_result.result_set = [ + ["app.popular.func", 50], + ["app.medium.func", 5], + ["app.unused.func", 0], + ] + # Subsequent queries are SET writes and return empty result sets. + write_result = MagicMock() + write_result.result_set = [] + + graph.query = MagicMock(side_effect=[counts_result, write_result, write_result, write_result]) + + n_hubs = await analyzer.compute_is_hub("o", "r") + + # With three symbols and P95 = 50, only `app.popular.func` is at-or-above P95. + assert n_hubs == 1 From 007d5841936c80f3af27fa616f1e4935a7df7b9c Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:35:27 -0700 Subject: [PATCH 7/8] feat(graph): compute_blast_radius with risk_score and why_risky --- app/services/dependency_analyzer.py | 325 ++++++++++++++++++ .../test_dependency_analyzer_blast_radius.py | 102 ++++++ 2 files changed, 427 insertions(+) diff --git a/app/services/dependency_analyzer.py b/app/services/dependency_analyzer.py index 503b8eb..ee95648 100644 --- a/app/services/dependency_analyzer.py +++ b/app/services/dependency_analyzer.py @@ -746,6 +746,331 @@ async def get_graph_stats(self, owner: str, repo: str) -> Dict: "total_edges": imports + calls, } + # Threshold above which a changed symbol gets an aggregate why_risky + # entry regardless of capping (spec D4 guardrail 2). + _AGGREGATE_CALLER_THRESHOLD = 20 + + async def compute_blast_radius( + self, + owner: str, + repo: str, + changed_symbols, + pr_files, + max_depth: int = 3, + max_files_hop2plus: int = 100, + ): + """Compute the BlastRadius for a PR. Spec D4. + + Returns a BlastRadius with `graph_available=False` if any FalkorDB + query raises -- the workflow falls back to today's behaviour. + """ + from app.services.blast_radius_types import ( + BlastRadius, + ImpactedUnchangedFile, + ) + + try: + graph = self._get_graph(owner, repo) + + changed_qnames = [c.qualified_name for c in changed_symbols] + + # 1. Transitive impact set: callers reachable from any changed symbol. + impacted = await self._query_impact(graph, changed_qnames, max_depth) + + # Per spec D4, `impacted_symbols` = changed_symbols ∪ transitive callers. + # Preserve insertion order with a dict so changed symbols appear first. + impacted_qnames_ordered = list( + dict.fromkeys([*changed_qnames, *(r["qname"] for r in impacted)]) + ) + + # 2. Test set: tests reachable from any IMPACTED symbol (not just + # changed) -- otherwise we miss tests that exercise affected callers. + test_set = await self._query_test_set(graph, impacted_qnames_ordered) + + # 3. Risk inputs per changed symbol. + n_no_tests = 0 + n_hub = 0 + for cs in changed_symbols: + if not await self._symbol_has_tests(graph, cs.qualified_name): + n_no_tests += 1 + if await self._symbol_is_hub(graph, cs.qualified_name): + n_hub += 1 + + # 4. Build impacted_unchanged_files. + # + # The impact query returns one row per (caller_symbol, changed_symbol, + # path), so a single file with multiple impacted symbols -- or the + # same caller reaching multiple changed symbols -- appears more than + # once. Cap and ranking are FILE-level, so collapse to per-path + # best-evidence first: lowest hops wins; ties broken by highest + # confidence; further ties broken by the first row encountered. + # + # Hop-1 is NEVER capped (spec D4 guardrail 2) -- direct callers are + # the highest-value signal and silently dropping them is the failure + # mode worth eliminating. Hops 2+ are capped at max_files_hop2plus, + # ranked by (hops, -confidence_score, has_tests) per spec D4 -- files + # WITHOUT test coverage sort first so they're kept when the cap binds + # (they're the riskier ones to leave the model blind to). + pr_set = set(pr_files) + by_path: Dict[str, ImpactedUnchangedFile] = {} + for row in impacted: + if row["path"] in pr_set: + continue + candidate = ImpactedUnchangedFile( + path=row["path"], + reached_via_symbol=row["qname"], + hops=row["hops"], + confidence=EdgeConfidence(row["confidence"]), + ) + existing = by_path.get(row["path"]) + if existing is None: + by_path[row["path"]] = candidate + continue + # Replace if candidate is "better evidence" for this path. + better = ( + candidate.hops < existing.hops + or ( + candidate.hops == existing.hops + and candidate.confidence.default_score() + > existing.confidence.default_score() + ) + ) + if better: + by_path[row["path"]] = candidate + all_unchanged = list(by_path.values()) + + hop1 = [f for f in all_unchanged if f.hops == 1] + hop2plus = [f for f in all_unchanged if f.hops >= 2] + + # Test-aware ranking for hop-2+: batched query so we don't issue + # one query per file. + hop2plus_paths = [f.path for f in hop2plus] + has_tests_by_path = await self._files_have_tests( + graph, hop2plus_paths + ) + hop2plus.sort( + key=lambda f: ( + f.hops, + -f.confidence.default_score(), + # False (no tests) sorts before True -> kept under the cap. + has_tests_by_path.get(f.path, False), + ) + ) + + dropped_due_to_cap: Dict[str, int] = {} + if len(hop2plus) > max_files_hop2plus: + dropped_due_to_cap["hop2plus_cap"] = ( + len(hop2plus) - max_files_hop2plus + ) + hop2plus = hop2plus[:max_files_hop2plus] + unchanged_capped = hop1 + hop2plus + + # 5. Confidence summary. + summary: Dict[str, int] = { + "extracted": 0, "inferred": 0, "ambiguous": 0, + } + for row in impacted: + summary[row["confidence"]] = summary.get(row["confidence"], 0) + 1 + + # 6. Risk score (spec D8). + # `num_impacted_symbols` per spec D8 is |changed ∪ callers|, so a + # leaf-symbol PR (no callers) still scores against its own change. + n_impacted_symbols = len(impacted_qnames_ordered) + # The ambiguous-fraction term divides by the count of caller PATHS + # we observed, not by symbol count -- summary is keyed on path + # confidence, one entry per caller row. + n_caller_paths = sum(summary.values()) + risk_score = ( + 0.4 * min(n_impacted_symbols / 25.0, 1.0) + + 0.2 * (summary["ambiguous"] / max(n_caller_paths, 1)) + + 0.25 * (1.0 if n_no_tests > 0 else 0.0) + + 0.15 * (1.0 if n_hub > 0 else 0.0) + ) + + why_risky: List[str] = [] + + # Aggregate stats per high-fanout changed symbol -- emitted + # regardless of any cap so the model always sees structural + # scale (spec D4 guardrail 2). One small Cypher per symbol. + for cs in changed_symbols: + stats = await self._aggregate_caller_stats( + graph, cs.qualified_name, max_depth + ) + if stats["n_callers"] > self._AGGREGATE_CALLER_THRESHOLD: + why_risky.append( + f"{cs.qualified_name} has {stats['n_callers']} callers " + f"across {stats['n_packages']} packages; " + f"{stats['n_no_tests']} lack test coverage; " + f"{stats['n_ambiguous']} are AMBIGUOUS" + ) + + if n_no_tests: + why_risky.append( + f"{n_no_tests} changed symbol(s) have no test coverage" + ) + if n_hub: + why_risky.append( + f"{n_hub} changed symbol(s) are hub functions (high incoming-call count)" + ) + if summary["ambiguous"]: + why_risky.append( + f"{summary['ambiguous']} caller path(s) resolved as AMBIGUOUS — " + f"manual verification recommended" + ) + if dropped_due_to_cap.get("hop2plus_cap"): + why_risky.append( + f"{dropped_due_to_cap['hop2plus_cap']} hop-2+ impacted file(s) " + f"omitted from per-file context (cap = {max_files_hop2plus}); " + f"see aggregate per-symbol stats above" + ) + + return BlastRadius( + changed_symbols=list(changed_symbols), + impacted_symbols=impacted_qnames_ordered, + pr_files=list(pr_files), + impacted_unchanged_files=unchanged_capped, + test_set=test_set, + dropped_due_to_cap=dropped_due_to_cap, + edge_confidence_summary=summary, + risk_score=round(risk_score, 3), + why_risky=why_risky, + graph_available=True, + ) + + except Exception as e: + logger.warning( + f"compute_blast_radius failed for {owner}/{repo}: {e} " + f"-- falling back to graph_available=False" + ) + return BlastRadius( + changed_symbols=list(changed_symbols), + impacted_symbols=[], + pr_files=list(pr_files), + impacted_unchanged_files=[], + test_set=[], + dropped_due_to_cap={}, + edge_confidence_summary={}, + risk_score=0.0, + why_risky=[], + graph_available=False, + ) + + async def _query_impact(self, graph, changed_qnames, max_depth): + if not changed_qnames: + return [] + cypher = ( + f"MATCH (changed:Symbol) " + f"WHERE changed.qualified_name IN $qnames " + f"MATCH path = (caller:Symbol)-[edges:CALLS*1..{max_depth}]->(changed) " + f"WITH caller, length(path) AS hops, " + f" [e IN edges | e.confidence] AS confs " + f"RETURN DISTINCT caller.qualified_name, caller.path, hops, " + f" reduce(acc='extracted', c IN confs | " + f" CASE " + f" WHEN c='ambiguous' OR acc='ambiguous' THEN 'ambiguous' " + f" WHEN c='inferred' OR acc='inferred' THEN 'inferred' " + f" ELSE 'extracted' END) AS min_conf" + ) + result = graph.query(cypher, {"qnames": changed_qnames}) + return [ + {"qname": row[0], "path": row[1], "hops": int(row[2]), "confidence": row[3]} + for row in result.result_set + ] + + async def _query_test_set(self, graph, changed_qnames): + if not changed_qnames: + return [] + result = graph.query( + "MATCH (s:Symbol)-[:TESTED_BY]->(t:Symbol) " + "WHERE s.qualified_name IN $qnames " + "RETURN DISTINCT t.path", + {"qnames": changed_qnames}, + ) + return [row[0] for row in result.result_set] + + async def _symbol_has_tests(self, graph, qname) -> bool: + result = graph.query( + "MATCH (s:Symbol {qualified_name: $qname})-[:TESTED_BY]->() " + "RETURN count(*)", + {"qname": qname}, + ) + return bool(result.result_set and int(result.result_set[0][0]) > 0) + + async def _symbol_is_hub(self, graph, qname) -> bool: + result = graph.query( + "MATCH (s:Symbol {qualified_name: $qname}) RETURN s.is_hub", + {"qname": qname}, + ) + if not result.result_set: + return False + return bool(result.result_set[0][0]) + + async def _files_have_tests( + self, graph, paths + ) -> Dict[str, bool]: + """Batched check: which of these file paths contain a symbol with + at least one TESTED_BY edge? Returns {path: bool}; missing keys + default to False at lookup time.""" + if not paths: + return {} + # Bind the test target so count() returns 0 when the OPTIONAL MATCH + # finds nothing -- count(*) would count the preserved File row and + # mark every file as tested, breaking the test-aware ranking. + result = graph.query( + "MATCH (f:File) WHERE f.path IN $paths " + "OPTIONAL MATCH (f)-[:DEFINES]->(:Symbol)-[:TESTED_BY]->(t:Symbol) " + "WITH f.path AS path, count(t) AS n_tested " + "RETURN path, n_tested > 0", + {"paths": list(paths)}, + ) + return {row[0]: bool(row[1]) for row in result.result_set} + + async def _aggregate_caller_stats( + self, graph, qname: str, max_depth: int + ) -> Dict[str, int]: + """Per-symbol structural stats, emitted regardless of capping. + + Returns: + n_callers -- distinct caller symbols transitively reaching qname. + n_packages -- distinct package prefixes among callers + (top-level dotted segment). + n_no_tests -- callers with zero TESTED_BY edges. + n_ambiguous -- callers reached via at least one AMBIGUOUS edge. + """ + # One Cypher pass; FalkorDB does not yet support all aggregates we'd + # ideally chain, so we collect rows and compute in Python. + cypher = ( + f"MATCH (changed:Symbol {{qualified_name: $qname}}) " + f"MATCH (caller:Symbol)-[edges:CALLS*1..{max_depth}]->(changed) " + f"OPTIONAL MATCH (caller)-[:TESTED_BY]->(t:Symbol) " + f"WITH DISTINCT caller, " + f" [e IN edges | e.confidence] AS confs, " + f" count(t) AS n_tests " + f"RETURN caller.qualified_name, confs, n_tests" + ) + result = graph.query(cypher, {"qname": qname}) + + callers = set() + packages = set() + n_no_tests = 0 + n_ambig = 0 + for row in result.result_set: + caller_qname, confs, n_tests = row[0], row[1], int(row[2]) + if caller_qname in callers: + continue + callers.add(caller_qname) + packages.add(caller_qname.split(".", 1)[0]) + if n_tests == 0: + n_no_tests += 1 + if confs and "ambiguous" in confs: + n_ambig += 1 + return { + "n_callers": len(callers), + "n_packages": len(packages), + "n_no_tests": n_no_tests, + "n_ambiguous": n_ambig, + } + async def close(self) -> None: """Close FalkorDB connection.""" self.db = None diff --git a/tests/unit/test_dependency_analyzer_blast_radius.py b/tests/unit/test_dependency_analyzer_blast_radius.py index be2011c..51eef81 100644 --- a/tests/unit/test_dependency_analyzer_blast_radius.py +++ b/tests/unit/test_dependency_analyzer_blast_radius.py @@ -160,3 +160,105 @@ async def test_compute_is_hub_marks_top_symbols(analyzer_with_mock_graph): # With three symbols and P95 = 50, only `app.popular.func` is at-or-above P95. assert n_hubs == 1 + + +@pytest.mark.asyncio +async def test_compute_blast_radius_assembles_full_payload( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + + # The implementation issues several queries; we sequence the mocked returns: + # 1. impact query: (caller_qname, caller_path, hops, min_confidence) + impact_result = MagicMock() + impact_result.result_set = [ + ["app.consumer.use_bar", "app/consumer.py", 1, "extracted"], + ["app.other.also_use", "app/other.py", 2, "ambiguous"], + ] + # 2. test-set query: (test_path) + tests_result = MagicMock() + tests_result.result_set = [["tests/unit/test_bar.py"]] + # 3. has-tests check per changed symbol: count int + no_tests_result = MagicMock() + no_tests_result.result_set = [[0]] + # 4. is_hub flag per changed symbol: bool + not_hub_result = MagicMock() + not_hub_result.result_set = [[False]] + # 5. _files_have_tests batched query for hop-2+ paths: (path, has_tests). + # Only "app/other.py" is hop-2+ in this fixture. + files_have_tests_result = MagicMock() + files_have_tests_result.result_set = [ + ["app/other.py", False], + ] + # 6. aggregate-caller-stats per changed symbol: (caller_qname, confs, n_tests). + # Two callers, well below the threshold of 20 so no aggregate why_risky entry. + aggregate_result = MagicMock() + aggregate_result.result_set = [ + ["app.consumer.use_bar", ["extracted"], 1], + ["app.other.also_use", ["ambiguous", "inferred"], 0], + ] + + graph.query = MagicMock( + side_effect=[ + impact_result, + tests_result, + no_tests_result, + not_hub_result, + files_have_tests_result, + aggregate_result, + ] + ) + + from app.services.blast_radius_types import ChangedSymbol + + br = await analyzer.compute_blast_radius( + owner="o", + repo="r", + changed_symbols=[ + ChangedSymbol( + path="app/bar.py", + qualified_name="app.bar.bar", + change_kind="modified", + ) + ], + pr_files=["app/bar.py"], + max_depth=3, + max_files_hop2plus=100, + ) + + assert br.graph_available is True + assert "app.consumer.use_bar" in br.impacted_symbols + assert any( + f.path == "app/consumer.py" for f in br.impacted_unchanged_files + ) + assert "tests/unit/test_bar.py" in br.test_set + # One ambiguous + one extracted in the path-confidence summary. + assert br.edge_confidence_summary.get("extracted", 0) >= 1 + assert br.edge_confidence_summary.get("ambiguous", 0) >= 1 + # No-tests for changed symbol triggers a why_risky entry. + assert any("no test coverage" in r.lower() for r in br.why_risky) + assert 0.0 <= br.risk_score <= 1.0 + # dropped_due_to_cap is a dict; nothing dropped at this scale. + assert isinstance(br.dropped_due_to_cap, dict) + assert br.dropped_due_to_cap.get("hop2plus_cap", 0) == 0 + + +@pytest.mark.asyncio +async def test_compute_blast_radius_returns_unavailable_on_query_failure( + analyzer_with_mock_graph, +): + analyzer, graph = analyzer_with_mock_graph + graph.query = MagicMock(side_effect=RuntimeError("falkor down")) + + from app.services.blast_radius_types import ChangedSymbol + + br = await analyzer.compute_blast_radius( + owner="o", + repo="r", + changed_symbols=[ + ChangedSymbol(path="app/bar.py", qualified_name="app.bar.bar", change_kind="modified") + ], + pr_files=["app/bar.py"], + ) + assert br.graph_available is False + assert br.impacted_symbols == [] From 5bd7d7d4aa5b733bf2f4cfd6e0a9eaba76bb18a8 Mon Sep 17 00:00:00 2001 From: Nishant Date: Sat, 2 May 2026 23:43:52 -0700 Subject: [PATCH 8/8] fix(graph): _aggregate_caller_stats dedupes per caller across paths When a caller reached a changed symbol via multiple CALLS paths (e.g. one EXTRACTED + one AMBIGUOUS), the previous first-row-wins loop kept only the first row's confs and skipped the rest. Since FalkorDB row order is not guaranteed, the AMBIGUOUS count in the high-fanout why_risky summary was silently order-dependent and could understate ambiguity. Aggregate per caller across all rows: OR the is_ambiguous flag and has_tests flag, then count distinct callers. Added regression test that fails under the old logic when the EXTRACTED row sorts first. --- app/services/dependency_analyzer.py | 31 ++++++++++++------- ...02-blast-radius-pr1-schema-and-resolver.md | 30 +++++++++++------- .../test_dependency_analyzer_blast_radius.py | 31 +++++++++++++++++++ 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/app/services/dependency_analyzer.py b/app/services/dependency_analyzer.py index ee95648..2f28916 100644 --- a/app/services/dependency_analyzer.py +++ b/app/services/dependency_analyzer.py @@ -1050,22 +1050,29 @@ async def _aggregate_caller_stats( ) result = graph.query(cypher, {"qname": qname}) - callers = set() - packages = set() - n_no_tests = 0 - n_ambig = 0 + # A single caller can produce multiple rows when reached via several + # CALLS paths with different confidence lists. Aggregate ALL rows per + # caller before counting -- otherwise the first-row-wins shortcut is + # order-dependent (a caller with one extracted path and one ambiguous + # path would only be counted as ambiguous if the ambiguous row arrived + # first), which would silently understate the AMBIGUOUS count in the + # high-fanout why_risky summary. + per_caller: Dict[str, Dict[str, bool]] = {} for row in result.result_set: caller_qname, confs, n_tests = row[0], row[1], int(row[2]) - if caller_qname in callers: - continue - callers.add(caller_qname) - packages.add(caller_qname.split(".", 1)[0]) - if n_tests == 0: - n_no_tests += 1 + entry = per_caller.setdefault( + caller_qname, {"is_ambiguous": False, "has_tests": False} + ) if confs and "ambiguous" in confs: - n_ambig += 1 + entry["is_ambiguous"] = True + if n_tests > 0: + entry["has_tests"] = True + + packages = {q.split(".", 1)[0] for q in per_caller} + n_no_tests = sum(1 for e in per_caller.values() if not e["has_tests"]) + n_ambig = sum(1 for e in per_caller.values() if e["is_ambiguous"]) return { - "n_callers": len(callers), + "n_callers": len(per_caller), "n_packages": len(packages), "n_no_tests": n_no_tests, "n_ambiguous": n_ambig, diff --git a/docs/superpowers/plans/2026-05-02-blast-radius-pr1-schema-and-resolver.md b/docs/superpowers/plans/2026-05-02-blast-radius-pr1-schema-and-resolver.md index 89ac556..3ad3433 100644 --- a/docs/superpowers/plans/2026-05-02-blast-radius-pr1-schema-and-resolver.md +++ b/docs/superpowers/plans/2026-05-02-blast-radius-pr1-schema-and-resolver.md @@ -2245,22 +2245,28 @@ Append to `DependencyGraphAnalyzer`: ) result = graph.query(cypher, {"qname": qname}) - callers = set() - packages = set() - n_no_tests = 0 - n_ambig = 0 + # A single caller can produce multiple rows when reached via several + # CALLS paths with different confidence lists. Aggregate ALL rows per + # caller before counting -- otherwise the first-row-wins shortcut is + # order-dependent (a caller with one extracted path and one ambiguous + # path would only be counted as ambiguous if the ambiguous row arrived + # first), which would silently understate the AMBIGUOUS count. + per_caller: Dict[str, Dict[str, bool]] = {} for row in result.result_set: caller_qname, confs, n_tests = row[0], row[1], int(row[2]) - if caller_qname in callers: - continue - callers.add(caller_qname) - packages.add(caller_qname.split(".", 1)[0]) - if n_tests == 0: - n_no_tests += 1 + entry = per_caller.setdefault( + caller_qname, {"is_ambiguous": False, "has_tests": False} + ) if confs and "ambiguous" in confs: - n_ambig += 1 + entry["is_ambiguous"] = True + if n_tests > 0: + entry["has_tests"] = True + + packages = {q.split(".", 1)[0] for q in per_caller} + n_no_tests = sum(1 for e in per_caller.values() if not e["has_tests"]) + n_ambig = sum(1 for e in per_caller.values() if e["is_ambiguous"]) return { - "n_callers": len(callers), + "n_callers": len(per_caller), "n_packages": len(packages), "n_no_tests": n_no_tests, "n_ambiguous": n_ambig, diff --git a/tests/unit/test_dependency_analyzer_blast_radius.py b/tests/unit/test_dependency_analyzer_blast_radius.py index 51eef81..9cfa27e 100644 --- a/tests/unit/test_dependency_analyzer_blast_radius.py +++ b/tests/unit/test_dependency_analyzer_blast_radius.py @@ -262,3 +262,34 @@ async def test_compute_blast_radius_returns_unavailable_on_query_failure( ) assert br.graph_available is False assert br.impacted_symbols == [] + + +@pytest.mark.asyncio +async def test_aggregate_caller_stats_dedupes_per_caller_across_paths( + analyzer_with_mock_graph, +): + """Regression: a caller with both an EXTRACTED and an AMBIGUOUS path + must be counted once, AND counted as ambiguous regardless of row order. + + The previous implementation kept only the first row per caller, so a + caller whose EXTRACTED row arrived before its AMBIGUOUS row was + silently miscounted as non-ambiguous. + """ + analyzer, graph = analyzer_with_mock_graph + + multi_path_result = MagicMock() + # Same caller appears twice, EXTRACTED row first -- order that previously + # caused the bug. The other caller appears only with AMBIGUOUS rows. + multi_path_result.result_set = [ + ["app.consumer.use_bar", ["extracted"], 1], + ["app.consumer.use_bar", ["ambiguous", "inferred"], 1], + ["app.other.also_use", ["ambiguous"], 0], + ] + graph.query = MagicMock(return_value=multi_path_result) + + stats = await analyzer._aggregate_caller_stats(graph, "app.bar.bar", max_depth=3) + + assert stats["n_callers"] == 2 # callers deduped across paths + assert stats["n_ambiguous"] == 2 # both callers have at least one ambiguous path + assert stats["n_no_tests"] == 1 # only "app.other.also_use" has 0 tests + assert stats["n_packages"] == 1 # both callers in package "app"