Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions graphify/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
}


_LANG_EXT_SUFFIX_RE = re.compile(
r"_(?:py|pyi|pyw|js|jsx|mjs|cjs|ts|tsx|vue|svelte|astro|go|rs|java|kt|kts|scala|groovy|c|h|cc|cpp|cxx|hh|hpp|hxx|rb|php|cs|swift|lua|r|sh|bash|zsh|fish|ps1|sql|html|css|scss|sass|less|md|mdx|json|yaml|yml|toml|xml|proto|graphql|gql|dart|ex|exs|erl|hrl|clj|cljs|fs|fsx|vb|pl|pm|pas|pp|dpr|dpk|inc|sol|move|sv|svh|v)$"
)


def _strip_ext_suffix(s: str) -> str:
"""Drop legacy extension suffixes from path-derived IDs (#1033)."""
return _LANG_EXT_SUFFIX_RE.sub("", s)


def _normalize_id(s: str) -> str:
r"""Normalize an ID string the same way extract._make_id does.

Expand All @@ -65,6 +75,43 @@ def _normalize_id(s: str) -> str:
return cleaned.strip("_").casefold()


def _canonical_id(s: str) -> str:
return _normalize_id(str(s))


def _file_stem_from_path(path_text: str) -> str:
path = Path(path_text.replace("\\", "/"))
parent = path.parent.name
if parent and parent not in (".", ""):
return f"{parent}.{path.stem}"
return path.stem


def _looks_like_file_node(node: dict) -> bool:
label = str(node.get("label") or "")
source_file = str(node.get("source_file") or "")
if source_file and label == Path(source_file.replace("\\", "/")).name:
return True
return bool(label and Path(label).suffix)


def _legacy_file_node_id(node: dict) -> str | None:
if not _looks_like_file_node(node):
return None
node_id = _canonical_id(str(node.get("id") or ""))
source_file = str(node.get("source_file") or "")
label = str(node.get("label") or "")
candidates: set[str] = set()
if source_file:
candidates.add(_normalize_id(source_file))
if label:
candidates.add(_strip_ext_suffix(node_id))
if node_id in candidates or _strip_ext_suffix(node_id) in candidates:
stem_source = source_file or label
return _normalize_id(_file_stem_from_path(stem_source))
return None


def _norm_source_file(p: str | None, root: str | None = None) -> str | None:
"""Normalize path separators and relativize absolute paths.

Expand Down Expand Up @@ -118,9 +165,19 @@ def build_from_json(extraction: dict, *, directed: bool = False, root: str | Pat
extraction = dict(extraction, edges=extraction["links"])

# Canonicalize legacy node/edge schema before validation.
id_remap: dict[str, str] = {}
for node in extraction.get("nodes", []):
if not isinstance(node, dict):
continue
original_id = str(node.get("id") or "")
if node.get("id") not in (None, ""):
node["id"] = _canonical_id(str(node["id"]))
file_node_id = _legacy_file_node_id(node)
if file_node_id and file_node_id != node.get("id"):
id_remap[node["id"]] = file_node_id
if original_id:
id_remap[_canonical_id(original_id)] = file_node_id
node["id"] = file_node_id
if "source" in node and "source_file" not in node:
# Count edges that reference this node so the warning is actionable (#479)
node_id = node.get("id", "?")
Expand All @@ -144,6 +201,19 @@ def build_from_json(extraction: dict, *, directed: bool = False, root: str | Pat
ft = node.get("file_type", "")
if ft and ft not in {"code", "document", "paper", "image", "rationale", "concept"}:
node["file_type"] = _FILE_TYPE_SYNONYMS.get(ft, "concept")
for edge in extraction.get("edges", []):
if not isinstance(edge, dict):
continue
if "source" not in edge and "from" in edge:
edge["source"] = edge["from"]
if "target" not in edge and "to" in edge:
edge["target"] = edge["to"]
if edge.get("source") not in (None, ""):
edge["source"] = _canonical_id(str(edge["source"]))
edge["source"] = id_remap.get(edge["source"], edge["source"])
if edge.get("target") not in (None, ""):
edge["target"] = _canonical_id(str(edge["target"]))
edge["target"] = id_remap.get(edge["target"], edge["target"])

errors = validate_extraction(extraction)
# Dangling edges (stdlib/external imports) are expected - only warn about real schema errors.
Expand All @@ -159,7 +229,7 @@ def build_from_json(extraction: dict, *, directed: bool = False, root: str | Pat
# Normalized ID map: lets edges survive when the LLM generates IDs with
# slightly different casing or punctuation than the AST extractor.
# e.g. "Session_ValidateToken" maps to "session_validatetoken".
norm_to_id: dict[str, str] = {_normalize_id(nid): nid for nid in node_set}
norm_to_id: dict[str, str] = {_canonical_id(nid): nid for nid in node_set}
# Iterate edges in a deterministic order. The graph is undirected and stores
# direction in _src/_tgt; when two edges collapse onto the same node pair the
# last write wins, so an unstable iteration order flips _src/_tgt run-to-run
Expand All @@ -181,9 +251,9 @@ def build_from_json(extraction: dict, *, directed: bool = False, root: str | Pat
src, tgt = edge["source"], edge["target"]
# Remap mismatched IDs via normalization before dropping the edge.
if src not in node_set:
src = norm_to_id.get(_normalize_id(src), src)
src = norm_to_id.get(_canonical_id(src), src)
if tgt not in node_set:
tgt = norm_to_id.get(_normalize_id(tgt), tgt)
tgt = norm_to_id.get(_canonical_id(tgt), tgt)
if src not in node_set or tgt not in node_set:
continue # skip edges to external/stdlib nodes - expected, not an error
attrs = {k: v for k, v in edge.items() if k not in ("source", "target")}
Expand Down
Loading