diff --git a/libs/openant-core/context/application_context.py b/libs/openant-core/context/application_context.py index f7fa55d..11940db 100644 --- a/libs/openant-core/context/application_context.py +++ b/libs/openant-core/context/application_context.py @@ -31,6 +31,7 @@ from anthropic import Anthropic from dotenv import load_dotenv +from utilities.file_io import open_utf8, read_json, write_json # Load environment variables load_dotenv() @@ -208,7 +209,8 @@ def gather_context_sources(repo_path: Path) -> dict[str, str]: filepath = repo_path / filename if filepath.exists(): try: - content = filepath.read_text(errors="ignore") + with open_utf8(filepath, errors="ignore") as _f: + content = _f.read() # Limit size to avoid token overflow if len(content) > 10000: content = content[:10000] + "\n\n[... truncated ...]" @@ -289,7 +291,8 @@ def detect_entry_points(repo_path: Path) -> str: continue try: - content = py_file.read_text(errors="ignore") + with open_utf8(py_file, errors="ignore") as _f: + content = _f.read() rel_path = py_file.relative_to(repo_path) for category, patterns in ENTRY_POINT_PATTERNS.items(): @@ -308,7 +311,8 @@ def detect_entry_points(repo_path: Path) -> str: continue try: - content = js_file.read_text(errors="ignore") + with open_utf8(js_file, errors="ignore") as _f: + content = _f.read() rel_path = js_file.relative_to(repo_path) if re.search(r"express\(\)|require\(['\"]express['\"]\)", content): @@ -340,15 +344,17 @@ def check_manual_override(repo_path: Path) -> ApplicationContext | None: continue try: - content = filepath.read_text() - if filename.endswith('.json'): # Direct JSON format - data = json.loads(content) + data = read_json(filepath) data['source'] = 'manual' return ApplicationContext(**data) - elif filename.endswith('.md'): + # .md files need raw text so regex can extract the embedded JSON block. + with open_utf8(filepath) as _f: + content = _f.read() + + if filename.endswith('.md'): # Markdown format - check for JSON code block json_match = re.search(r'```json\s*(.*?)\s*```', content, re.DOTALL) if json_match: @@ -545,8 +551,7 @@ def save_context(context: ApplicationContext, output_path: Path) -> None: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, 'w') as f: - json.dump(asdict(context), f, indent=2) + write_json(output_path, asdict(context)) print(f"Context saved to {output_path}", file=sys.stderr) @@ -560,9 +565,7 @@ def load_context(input_path: Path) -> ApplicationContext: Returns: ApplicationContext loaded from file. """ - with open(input_path) as f: - data = json.load(f) - + data = read_json(input_path) # Mark as manual to skip validation (already validated when saved) original_source = data.get('source', 'llm') data['source'] = 'manual' # Temporarily bypass validation diff --git a/libs/openant-core/core/analyzer.py b/libs/openant-core/core/analyzer.py index 7fb5966..cf9cafd 100644 --- a/libs/openant-core/core/analyzer.py +++ b/libs/openant-core/core/analyzer.py @@ -27,6 +27,7 @@ # Import existing analysis machinery from utilities.llm_client import AnthropicClient, get_global_tracker +from utilities.file_io import read_json, write_json from utilities.json_corrector import JSONCorrector from utilities.rate_limiter import get_rate_limiter, is_rate_limit_error, is_retryable_error @@ -330,9 +331,7 @@ def run_analysis( # Load dataset print(f"[Analyze] Loading dataset: {dataset_path}", file=sys.stderr) - with open(dataset_path) as f: - dataset = json.load(f) - + dataset = read_json(dataset_path) units = dataset.get("units", []) # Diff filter: if upstream parse stamped diff_selected on units (PR-diff @@ -513,9 +512,7 @@ def _summary_callback(finding, usage=None): "code_by_route": code_by_route, } - with open(results_path, "w") as f: - json.dump(experiment_result, f, indent=2) - + write_json(results_path, experiment_result) print(f"\n[Analyze] Results written to {results_path}", file=sys.stderr) # Checkpoints are preserved as a permanent artifact alongside results. diff --git a/libs/openant-core/core/checkpoint.py b/libs/openant-core/core/checkpoint.py index 7c42f52..3b2015a 100644 --- a/libs/openant-core/core/checkpoint.py +++ b/libs/openant-core/core/checkpoint.py @@ -27,6 +27,7 @@ from datetime import datetime, timezone from utilities.safe_filename import safe_filename +from utilities.file_io import read_json, write_json from pathlib import Path @@ -79,8 +80,7 @@ def load(self) -> dict[str, dict]: continue filepath = os.path.join(self.dir, filename) try: - with open(filepath, "r") as f: - data = json.load(f) + data = read_json(filepath) unit_id = data.get("id") if unit_id: results[unit_id] = data @@ -130,9 +130,7 @@ def save(self, unit_id: str, data: dict): filename = self._safe_filename(unit_id) + ".json" filepath = os.path.join(self.dir, filename) data["id"] = unit_id # ensure id is always present - with open(filepath, "w") as f: - json.dump(data, f, indent=2) - + write_json(filepath, data) def write_summary( self, total_units: int, @@ -168,9 +166,7 @@ def write_summary( } if usage is not None: data["usage"] = usage - with open(filepath, "w") as f: - json.dump(data, f, indent=2) - + write_json(filepath, data) @staticmethod def read_summary(checkpoint_dir: str) -> dict | None: """Read _summary.json from a checkpoint directory. @@ -182,8 +178,7 @@ def read_summary(checkpoint_dir: str) -> dict | None: if not os.path.isfile(filepath): return None try: - with open(filepath, "r") as f: - return json.load(f) + return read_json(filepath) except (json.JSONDecodeError, OSError): return None @@ -241,8 +236,7 @@ def status(checkpoint_dir: str) -> dict: continue filepath = os.path.join(checkpoint_dir, filename) try: - with open(filepath, "r") as f: - data = json.load(f) + data = read_json(filepath) except (json.JSONDecodeError, OSError): errors += 1 error_breakdown["unreadable"] = error_breakdown.get("unreadable", 0) + 1 diff --git a/libs/openant-core/core/diff_filter.py b/libs/openant-core/core/diff_filter.py index bd93917..07b832c 100644 --- a/libs/openant-core/core/diff_filter.py +++ b/libs/openant-core/core/diff_filter.py @@ -30,10 +30,11 @@ from __future__ import annotations -import json import sys from dataclasses import dataclass, asdict +from utilities.file_io import read_json + # Scope constants (must match internal/git/manifest.go). SCOPE_CHANGED_FILES = "changed_files" @@ -65,8 +66,7 @@ def to_dict(self) -> dict: def load_manifest(path: str) -> dict: """Read and minimally validate a diff manifest file.""" - with open(path, "r", encoding="utf-8") as f: - m = json.load(f) + m = read_json(path) scope = m.get("scope") if scope not in _VALID_SCOPES: raise ValueError( diff --git a/libs/openant-core/core/dynamic_tester.py b/libs/openant-core/core/dynamic_tester.py index 9f9c10d..41b1a10 100644 --- a/libs/openant-core/core/dynamic_tester.py +++ b/libs/openant-core/core/dynamic_tester.py @@ -12,6 +12,7 @@ from core.schemas import DynamicTestStepResult, UsageInfo from core import tracking +from utilities.file_io import read_json, write_json def run_tests( @@ -51,9 +52,7 @@ def run_tests( os.makedirs(output_dir, exist_ok=True) # Check how many findings to test - with open(pipeline_output_path) as f: - pipeline_data = json.load(f) - + pipeline_data = read_json(pipeline_output_path) findings = pipeline_data.get("findings", []) testable = [ f for f in findings @@ -65,8 +64,7 @@ def run_tests( if not testable: results_path = os.path.join(output_dir, "dynamic_test_results.json") - with open(results_path, "w") as f: - json.dump({"findings_tested": 0, "results": []}, f, indent=2) + write_json(results_path, {"findings_tested": 0, "results": []}) return DynamicTestStepResult( results_json_path=results_path, diff --git a/libs/openant-core/core/enhancer.py b/libs/openant-core/core/enhancer.py index fef1453..70879b8 100644 --- a/libs/openant-core/core/enhancer.py +++ b/libs/openant-core/core/enhancer.py @@ -17,6 +17,7 @@ from core import tracking from core.progress import ProgressReporter from utilities.rate_limiter import configure_rate_limiter +from utilities.file_io import read_json, write_json def enhance_dataset( @@ -69,9 +70,7 @@ def enhance_dataset( # Load dataset print(f"[Enhance] Loading dataset: {dataset_path}", file=sys.stderr) - with open(dataset_path) as f: - dataset = json.load(f) - + dataset = read_json(dataset_path) units = dataset.get("units", []) print(f"[Enhance] Units to enhance: {len(units)}", file=sys.stderr) @@ -138,9 +137,7 @@ def _on_restored(count: int): # Write enhanced dataset os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: - json.dump(enhanced, f, indent=2) - + write_json(output_path, enhanced) print(f"[Enhance] Enhanced dataset: {output_path}", file=sys.stderr) print(f"[Enhance] Classifications: {classifications}", file=sys.stderr) if error_count: diff --git a/libs/openant-core/core/parser_adapter.py b/libs/openant-core/core/parser_adapter.py index 314d470..46fc08c 100644 --- a/libs/openant-core/core/parser_adapter.py +++ b/libs/openant-core/core/parser_adapter.py @@ -16,6 +16,7 @@ from pathlib import Path from core.schemas import ParseResult +from utilities.file_io import read_json, write_json # Root of openant-core (where parsers/ lives) _CORE_ROOT = Path(__file__).parent.parent @@ -161,9 +162,7 @@ def _maybe_apply_diff_filter( ) return - with open(result.dataset_path, "r") as f: - dataset = json.load(f) - + dataset = read_json(result.dataset_path) # Dataset may be a dict with "units" or a raw list. if isinstance(dataset, dict): units = dataset.get("units", []) @@ -172,14 +171,11 @@ def _maybe_apply_diff_filter( stats = apply_diff_filter(units, manifest) - with open(result.dataset_path, "w") as f: - json.dump(dataset, f, indent=2) - + write_json(result.dataset_path, dataset) # Expose stats on the ParseResult via a side-channel file; the parse # step_context reads this when assembling parse.report.json. diff_report_path = os.path.join(output_dir, "diff_filter.report.json") - with open(diff_report_path, "w") as f: - json.dump(stats.to_dict(), f, indent=2) + write_json(diff_report_path, stats.to_dict()) print( f" Diff filter ({stats.scope}): {stats.selected}/{stats.total} units selected" @@ -245,9 +241,7 @@ def _load_module(name, filename): print(f"\n[Reachability Filter] Filtering to {processing_level} units...", file=sys.stderr) - with open(call_graph_path, "r") as f: - call_graph_data = json.load(f) - + call_graph_data = read_json(call_graph_path) functions = call_graph_data.get("functions", {}) call_graph = call_graph_data.get("call_graph", {}) reverse_call_graph = call_graph_data.get("reverse_call_graph", {}) @@ -352,12 +346,8 @@ def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_t dataset = _apply_reachability_filter(dataset, output_dir, processing_level) # Write outputs - with open(dataset_path, "w") as f: - json.dump(dataset, f, indent=2) - - with open(analyzer_output_path, "w") as f: - json.dump(analyzer_output, f, indent=2) - + write_json(dataset_path, dataset) + write_json(analyzer_output_path, analyzer_output) units_count = len(dataset.get("units", [])) print(f" Python parser complete: {units_count} units", file=sys.stderr) @@ -413,8 +403,7 @@ def _parse_javascript(repo_path: str, output_dir: str, processing_level: str, sk # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" JavaScript parser complete: {units_count} units", file=sys.stderr) @@ -470,8 +459,7 @@ def _parse_go(repo_path: str, output_dir: str, processing_level: str, skip_tests # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" Go parser complete: {units_count} units", file=sys.stderr) @@ -530,8 +518,7 @@ def _parse_c(repo_path: str, output_dir: str, processing_level: str, skip_tests: # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" C/C++ parser complete: {units_count} units", file=sys.stderr) @@ -590,8 +577,7 @@ def _parse_ruby(repo_path: str, output_dir: str, processing_level: str, skip_tes # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" Ruby parser complete: {units_count} units", file=sys.stderr) @@ -650,8 +636,7 @@ def _parse_php(repo_path: str, output_dir: str, processing_level: str, skip_test # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" PHP parser complete: {units_count} units", file=sys.stderr) @@ -710,8 +695,7 @@ def _parse_zig(repo_path: str, output_dir: str, processing_level: str, skip_test # Count units units_count = 0 if os.path.exists(dataset_path): - with open(dataset_path) as f: - data = json.load(f) + data = read_json(dataset_path) units_count = len(data.get("units", [])) print(f" Zig parser complete: {units_count} units", file=sys.stderr) diff --git a/libs/openant-core/core/reporter.py b/libs/openant-core/core/reporter.py index 7153dab..9536c4d 100644 --- a/libs/openant-core/core/reporter.py +++ b/libs/openant-core/core/reporter.py @@ -19,6 +19,7 @@ from pathlib import Path from core.schemas import ReportResult +from utilities.file_io import open_utf8, read_json, write_json # Root of openant-core _CORE_ROOT = Path(__file__).parent.parent @@ -34,8 +35,7 @@ def _load_diff_metadata(scan_dir: str) -> dict | None: if not os.path.exists(manifest_path): return None try: - with open(manifest_path) as f: - manifest = json.load(f) + manifest = read_json(manifest_path) except (json.JSONDecodeError, OSError): return None out = { @@ -50,8 +50,7 @@ def _load_diff_metadata(scan_dir: str) -> dict | None: filter_report = os.path.join(scan_dir, "diff_filter.report.json") if os.path.exists(filter_report): try: - with open(filter_report) as f: - stats = json.load(f) + stats = read_json(filter_report) out["units_in_diff"] = stats.get("selected") out["units_total_parsed"] = stats.get("total") out["callers_added"] = stats.get("callers_added") or 0 @@ -129,8 +128,7 @@ def _dedup_caller_callee( return confirmed try: - with open(call_graph_path) as f: - cg_data = json.load(f) + cg_data = read_json(call_graph_path) except (json.JSONDecodeError, OSError): return confirmed @@ -212,9 +210,7 @@ def build_pipeline_output( """ print(f"[Report] Building pipeline_output.json...", file=sys.stderr) - with open(results_path) as f: - experiment = json.load(f) - + experiment = read_json(results_path) all_results = experiment.get("results", []) code_by_route = experiment.get("code_by_route", {}) metrics = experiment.get("metrics", {}) @@ -371,9 +367,7 @@ def build_pipeline_output( print(_banner, file=sys.stderr) os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: - json.dump(pipeline_output, f, indent=2, ensure_ascii=False) - + write_json(output_path, pipeline_output, ensure_ascii=False) print(f" pipeline_output.json: {len(findings_data)} findings", file=sys.stderr) print(f" Written to {output_path}", file=sys.stderr) @@ -469,9 +463,7 @@ def generate_summary_report( print("[Report] Generating summary report (LLM)...", file=sys.stderr) - with open(results_path) as f: - pipeline_data = json.load(f) - + pipeline_data = read_json(results_path) # Merge dynamic test results if available pipeline_data = merge_dynamic_results(pipeline_data, results_path) @@ -483,7 +475,7 @@ def generate_summary_report( report_text, usage = _generate_summary(pipeline_data) os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - with open(output_path, "w") as f: + with open_utf8(output_path, "w") as f: f.write(report_text) print(f" Summary report: {output_path}", file=sys.stderr) @@ -517,9 +509,7 @@ def generate_disclosure_docs( print("[Report] Generating disclosure documents (LLM)...", file=sys.stderr) - with open(results_path) as f: - pipeline_data = json.load(f) - + pipeline_data = read_json(results_path) # Merge dynamic test results if available pipeline_data = merge_dynamic_results(pipeline_data, results_path) @@ -552,7 +542,7 @@ def _one(args): safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" filepath = os.path.join(output_dir, filename) - with open(filepath, "w") as f: + with open_utf8(filepath, "w") as f: f.write(disclosure_text) return finding["short_name"], filepath, usage diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index f081352..2eba6ee 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -27,6 +27,7 @@ ) from core.step_report import step_context from core import tracking +from utilities.file_io import read_json # Import app context generator (optional) try: @@ -149,8 +150,7 @@ def _step_label(name: str) -> str: _diff_report = os.path.join(output_dir, "diff_filter.report.json") if os.path.exists(_diff_report): try: - with open(_diff_report) as _f: - ctx.summary["diff_stats"] = json.load(_f) + ctx.summary["diff_stats"] = read_json(_diff_report) except (json.JSONDecodeError, OSError): pass ctx.outputs = { @@ -542,8 +542,7 @@ def _load_step_report(output_dir: str, step: str) -> dict: """Load a step report JSON from disk. Returns empty dict on failure.""" path = os.path.join(output_dir, f"{step}.report.json") try: - with open(path) as f: - return json.load(f) + return read_json(path) except Exception: return {"step": step, "status": "unknown"} @@ -551,8 +550,7 @@ def _load_step_report(output_dir: str, step: str) -> dict: def _read_app_type(app_context_path: str) -> str | None: """Read application_type from an app context JSON file.""" try: - with open(app_context_path) as f: - data = json.load(f) + data = read_json(app_context_path) return data.get("application_type") except Exception: return None diff --git a/libs/openant-core/core/schemas.py b/libs/openant-core/core/schemas.py index 88d30d4..43886eb 100644 --- a/libs/openant-core/core/schemas.py +++ b/libs/openant-core/core/schemas.py @@ -10,12 +10,13 @@ standardized metadata (timing, cost, inputs, outputs). """ -import json import os from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from typing import Any +from utilities.file_io import write_json + # --------------------------------------------------------------------------- # JSON Envelope @@ -268,6 +269,5 @@ def write(self, output_dir: str) -> str: """Write ``{step}.report.json`` to *output_dir*. Returns the path.""" os.makedirs(output_dir, exist_ok=True) path = os.path.join(output_dir, f"{self.step}.report.json") - with open(path, "w") as f: - json.dump(self.to_dict(), f, indent=2) + write_json(path, self.to_dict()) return path diff --git a/libs/openant-core/core/verifier.py b/libs/openant-core/core/verifier.py index fa7a43f..705ca4a 100644 --- a/libs/openant-core/core/verifier.py +++ b/libs/openant-core/core/verifier.py @@ -20,6 +20,7 @@ from core.progress import ProgressReporter from utilities.llm_client import TokenTracker, get_global_tracker +from utilities.file_io import read_json, write_json from utilities.finding_verifier import FindingVerifier from utilities.agentic_enhancer.repository_index import load_index_from_file @@ -80,9 +81,7 @@ def run_verification( # Load Stage 1 results print(f"[Verify] Loading results: {results_path}", file=sys.stderr) - with open(results_path) as f: - experiment = json.load(f) - + experiment = read_json(results_path) all_results = experiment.get("results", []) code_by_route = experiment.get("code_by_route", {}) @@ -268,10 +267,7 @@ def _write_verified_results( output["metrics"] = {"total": len(merged_results), **counts} - with open(path, "w") as f: - json.dump(output, f, indent=2, ensure_ascii=False) - - + write_json(path, output, ensure_ascii=False) def _build_code_by_route(results: list) -> dict: """Build code_by_route from result entries (fallback).""" code_by_route = {} diff --git a/libs/openant-core/experiment.py b/libs/openant-core/experiment.py index 359d41f..7eb8dda 100644 --- a/libs/openant-core/experiment.py +++ b/libs/openant-core/experiment.py @@ -35,6 +35,7 @@ from pathlib import Path from utilities.llm_client import AnthropicClient, get_global_tracker +from utilities.file_io import read_json, write_json from prompts.prompt_selector import get_analysis_prompt from prompts.vulnerability_analysis import get_system_prompt as get_stage1_system_prompt from utilities.context_corrector import ContextCorrector @@ -211,8 +212,7 @@ def load_dataset(name: str, enhanced: bool = False) -> dict: if not path or not os.path.exists(path): raise ValueError(f"Dataset not found: {name} (enhanced={enhanced})") - with open(path, "r") as f: - return json.load(f) + return read_json(path) def load_ground_truth(name: str) -> dict: @@ -221,8 +221,7 @@ def load_ground_truth(name: str) -> dict: if not path or not os.path.exists(path): return {} - with open(path, "r") as f: - return json.load(f) + return read_json(path) def get_ground_truth_verdict(ground_truth: dict, route_key: str) -> str: @@ -1034,9 +1033,7 @@ def main(): suffix = "" if args.no_enhanced else "_enhanced" output_path = f"experiment_{args.dataset}_{args.model}{suffix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - with open(output_path, "w") as f: - json.dump(experiment, f, indent=2) - + write_json(output_path, experiment) print() print(f"Results saved to: {output_path}") diff --git a/libs/openant-core/export_csv.py b/libs/openant-core/export_csv.py index 8b69300..b330a45 100644 --- a/libs/openant-core/export_csv.py +++ b/libs/openant-core/export_csv.py @@ -29,6 +29,7 @@ import json import os import sys +from utilities.file_io import read_json def _load_diff_block(experiment_path: str) -> dict | None: @@ -41,8 +42,7 @@ def _load_diff_block(experiment_path: str) -> dict | None: if not os.path.exists(candidate): return None try: - with open(candidate) as f: - data = json.load(f) + data = read_json(candidate) except (json.JSONDecodeError, OSError): return None diff = data.get("diff") @@ -67,8 +67,7 @@ def _format_diff_banner(diff: dict) -> str: def load_json(path: str) -> dict: """Load JSON file.""" - with open(path, 'r') as f: - return json.load(f) + return read_json(path) def extract_file(unit_id: str) -> str: diff --git a/libs/openant-core/generate_report.py b/libs/openant-core/generate_report.py index 633cd9b..5af97f9 100644 --- a/libs/openant-core/generate_report.py +++ b/libs/openant-core/generate_report.py @@ -31,6 +31,7 @@ import anthropic from dotenv import load_dotenv +from utilities.file_io import read_json # Load environment variables from .env file load_dotenv() @@ -42,8 +43,7 @@ def load_json(path: str) -> dict: """Load JSON file.""" - with open(path, 'r') as f: - return json.load(f) + return read_json(path) def extract_file(unit_id: str) -> str: @@ -83,8 +83,7 @@ def _load_pipeline_metadata(experiment_path: str) -> tuple[dict | None, dict | N if not os.path.exists(candidate): return None, None try: - with open(candidate, 'r') as f: - data = json.load(f) + data = read_json(candidate) except (json.JSONDecodeError, OSError): return None, None return data.get("repository"), data.get("diff") diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index b0ce345..e521b22 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -22,6 +22,8 @@ import sys import tempfile +from utilities.file_io import read_json + def _output_json(data: dict): """Write JSON to stdout.""" @@ -39,8 +41,7 @@ def _load_step_reports(directory: str) -> list[dict]: reports = [] for path in glob.glob(os.path.join(directory, "*.report.json")): try: - with open(path) as f: - reports.append(json.load(f)) + reports.append(read_json(path)) except (json.JSONDecodeError, OSError): continue return reports @@ -82,8 +83,7 @@ def cmd_scan(args): # is the same one written into pipeline_output.json by reporter.py. if result.pipeline_output_path and os.path.exists(result.pipeline_output_path): try: - with open(result.pipeline_output_path) as f: - po = json.load(f) + po = read_json(result.pipeline_output_path) diff_block = po.get("diff") if isinstance(diff_block, dict) and diff_block.get("mode") == "incremental": scan_payload["diff"] = diff_block @@ -135,8 +135,7 @@ def cmd_parse(args): diff_report = os.path.join(output_dir, "diff_filter.report.json") if os.path.exists(diff_report): try: - with open(diff_report) as f: - ctx.summary["diff_stats"] = json.load(f) + ctx.summary["diff_stats"] = read_json(diff_report) except (json.JSONDecodeError, OSError): pass ctx.outputs = { @@ -607,10 +606,8 @@ def cmd_report_data(args): "dataset_path": os.path.abspath(dataset_path), }) as ctx: # Load data - with open(results_path) as f: - experiment = json.load(f) - with open(dataset_path) as f: - dataset = json.load(f) + experiment = read_json(results_path) + dataset = read_json(dataset_path) # --- Load dynamic test results if available --- # Dynamic tests use VULN-XXX IDs from pipeline_output.json, @@ -620,10 +617,8 @@ def cmd_report_data(args): dt_path = os.path.join(results_dir, "dynamic_test_results.json") po_path = os.path.join(results_dir, "pipeline_output.json") if os.path.exists(dt_path) and os.path.exists(po_path): - with open(dt_path) as f: - dt_data = json.load(f) - with open(po_path) as f: - po_data = json.load(f) + dt_data = read_json(dt_path) + po_data = read_json(po_path) # Map VULN-ID → route_key from pipeline_output vuln_id_to_route = {} @@ -876,8 +871,7 @@ def _linkify_finding(m): diff_block = None if os.path.exists(po_path): try: - with open(po_path) as f: - po = json.load(f) + po = read_json(po_path) repo_info = po.get("repository", {}) repo_name = repo_info.get("name", "") commit_sha = repo_info.get("commit_sha", "") diff --git a/libs/openant-core/parsers/c/call_graph_builder.py b/libs/openant-core/parsers/c/call_graph_builder.py index 84e5988..f5940ba 100644 --- a/libs/openant-core/parsers/c/call_graph_builder.py +++ b/libs/openant-core/parsers/c/call_graph_builder.py @@ -40,6 +40,7 @@ import tree_sitter_c as tsc import tree_sitter_cpp as tscpp from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 C_LANGUAGE = Language(tsc.language()) @@ -423,9 +424,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - extractor_output = json.load(f) - + extractor_output = read_json(args.input_file) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) builder = CallGraphBuilder(extractor_output, {'max_depth': args.depth}) @@ -444,7 +443,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/c/function_extractor.py b/libs/openant-core/parsers/c/function_extractor.py index 10b5f70..8e5b1cf 100644 --- a/libs/openant-core/parsers/c/function_extractor.py +++ b/libs/openant-core/parsers/c/function_extractor.py @@ -42,6 +42,7 @@ import tree_sitter_c as tsc import tree_sitter_cpp as tscpp from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 C_LANGUAGE = Language(tsc.language()) @@ -575,8 +576,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: - scan_result = json.load(f) + scan_result = read_json(args.scan_file) result = extractor.extract_from_scan(scan_result) else: result = extractor.extract_all() @@ -584,7 +584,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/c/repository_scanner.py b/libs/openant-core/parsers/c/repository_scanner.py index 6706f92..a6ec241 100644 --- a/libs/openant-core/parsers/c/repository_scanner.py +++ b/libs/openant-core/parsers/c/repository_scanner.py @@ -30,6 +30,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 class RepositoryScanner: @@ -225,7 +226,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/c/test_pipeline.py b/libs/openant-core/parsers/c/test_pipeline.py index 3f18635..5072d68 100644 --- a/libs/openant-core/parsers/c/test_pipeline.py +++ b/libs/openant-core/parsers/c/test_pipeline.py @@ -42,6 +42,7 @@ from enum import Enum from pathlib import Path from typing import Set +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) @@ -139,8 +140,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: - json.dump(scan_result, f, indent=2) + write_json(self.scan_results_file, scan_result) # Stage 2: Extract functions print(" [2/4] Extracting functions via tree-sitter...") @@ -178,13 +178,11 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: - json.dump(analyzer_output, f, indent=2) + write_json(self.analyzer_output_file, analyzer_output) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,8 +240,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) @@ -262,8 +259,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) call_graph = {} reverse_call_graph = {} @@ -313,8 +309,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -379,7 +374,7 @@ def run_codeql_analysis(self) -> bool: '--overwrite' ] - result = subprocess.run( + result = run_utf8( create_db_cmd, capture_output=True, text=True, @@ -410,7 +405,7 @@ def run_codeql_analysis(self) -> bool: f'codeql/{language}-queries:codeql-suites/{language}-security-extended.qls' ] - result = subprocess.run( + result = run_utf8( analyze_cmd, capture_output=True, text=True, @@ -443,8 +438,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: - sarif_data = json.load(f) + sarif_data = read_json(sarif_output) self.codeql_findings = [] @@ -555,8 +549,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Build mapping of file -> [(start_line, end_line, func_id)] file_functions = {} @@ -605,8 +598,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,8 +654,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) enhancer = ContextEnhancer() @@ -695,8 +686,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(self.dataset_file, enhanced) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,8 +730,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -767,8 +756,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +896,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open_utf8(results_file, 'w') as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/c/unit_generator.py b/libs/openant-core/parsers/c/unit_generator.py index a0391d7..fcca506 100644 --- a/libs/openant-core/parsers/c/unit_generator.py +++ b/libs/openant-core/parsers/c/unit_generator.py @@ -28,6 +28,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 # File boundary marker for enhanced code (C-style comment, matching Go parser) @@ -343,9 +344,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - call_graph_data = json.load(f) - + call_graph_data = read_json(args.input_file) options = { 'max_depth': args.depth, } @@ -373,7 +372,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -382,8 +381,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: - json.dump(analyzer, f, indent=2) + write_json(args.analyzer_output, analyzer) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) except Exception as e: diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 8fe05b8..d37787c 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -42,6 +42,7 @@ from enum import Enum from pathlib import Path from typing import Set +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) @@ -115,11 +116,11 @@ def setup(self): if not os.path.exists(self.go_parser): print("Building Go parser...") go_parser_dir = os.path.join(self.parser_dir, 'go_parser') - result = subprocess.run( + result = run_utf8( ['go', 'build', '-o', 'go_parser', '.'], cwd=go_parser_dir, capture_output=True, - text=True + text=True, ) if result.returncode != 0: print(f"Error building Go parser: {result.stderr}") @@ -140,7 +141,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: start_time = datetime.now() try: - result = subprocess.run( + result = run_utf8( command, capture_output=True, text=True, @@ -168,8 +169,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: - data = json.load(f) + data = read_json(output_file) stage_result['summary'] = self._summarize_output(name, data) else: print(f"✗ Failed (exit code {result.returncode})") @@ -244,11 +244,9 @@ def run_go_parser_all(self) -> bool: # Post-process: apply dataset name if specified (Go binary doesn't support --name) if result.get('success', False) and self.dataset_name and os.path.exists(self.dataset_file): try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) dataset['name'] = self.dataset_name - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) except Exception as e: print(f"Warning: Could not apply dataset name: {e}") @@ -282,8 +280,7 @@ def apply_reachability_filter(self) -> bool: try: # Load analyzer output for call graph - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) @@ -304,8 +301,7 @@ def apply_reachability_filter(self) -> bool: } # Load call graph from dataset (go_parser puts it in statistics) - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Build call graph from unit metadata call_graph = {} @@ -359,8 +355,7 @@ def apply_reachability_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -434,7 +429,7 @@ def run_codeql_analysis(self) -> bool: '--overwrite' ] - result = subprocess.run( + result = run_utf8( create_db_cmd, capture_output=True, text=True, @@ -465,7 +460,7 @@ def run_codeql_analysis(self) -> bool: f'codeql/{language}-queries:codeql-suites/{language}-security-extended.qls' ] - result = subprocess.run( + result = run_utf8( analyze_cmd, capture_output=True, text=True, @@ -498,8 +493,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: - sarif_data = json.load(f) + sarif_data = read_json(sarif_output) # Extract findings and map to file:line self.codeql_findings = [] @@ -620,8 +614,7 @@ def apply_codeql_filter(self) -> bool: try: # Load dataset to get function line ranges - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Build mapping of file -> [(start_line, end_line, func_id)] file_functions = {} @@ -675,8 +668,7 @@ def apply_codeql_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -733,8 +725,7 @@ def run_context_enhancer(self) -> bool: try: # Load dataset - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Enhance with LLM enhancer = ContextEnhancer() @@ -771,8 +762,7 @@ def run_context_enhancer(self) -> bool: } # Write back - with open(self.dataset_file, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(self.dataset_file, enhanced) elapsed = (datetime.now() - start_time).total_seconds() @@ -824,8 +814,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -854,8 +843,7 @@ def apply_exploitable_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -1002,7 +990,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open_utf8(results_file, 'w') as f: # Remove stdout/stderr from saved results (too verbose) clean_results = { 'repository': self.results['repository'], diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index 77ab9c4..c069a79 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -41,6 +41,7 @@ from enum import Enum from pathlib import Path from typing import Set, Tuple +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) @@ -126,7 +127,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: start_time = datetime.now() try: - result = subprocess.run( + result = run_utf8( command, capture_output=True, text=True, @@ -154,8 +155,7 @@ def run_stage(self, name: str, command: list, output_file: str) -> dict: # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: - data = json.load(f) + data = read_json(output_file) stage_result['summary'] = self._summarize_output(name, data) else: print(f"✗ Failed (exit code {result.returncode})") @@ -242,8 +242,7 @@ def run_typescript_analyzer(self, files: list = None) -> bool: # If no specific files, use ALL files from scan results if not files and self.scan_results_file and os.path.exists(self.scan_results_file): - with open(self.scan_results_file, 'r') as f: - scan_data = json.load(f) + scan_data = read_json(self.scan_results_file) files = [f['path'] for f in scan_data.get('files', [])] if not files: @@ -252,7 +251,7 @@ def run_typescript_analyzer(self, files: list = None) -> bool: # Write file list to a temporary file to avoid command-line length limits file_list_path = os.path.join(self.output_dir, 'file_list.txt') - with open(file_list_path, 'w') as f: + with open_utf8(file_list_path, 'w') as f: for file_path in files: # Convert relative path to absolute if not os.path.isabs(file_path): @@ -289,7 +288,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s start_time = datetime.now() try: - result = subprocess.run( + result = run_utf8( command, capture_output=True, text=True, @@ -300,7 +299,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s if result.returncode == 0: # Write stdout to output file - with open(output_file, 'w') as f: + with open_utf8(output_file, 'w') as f: f.write(result.stdout) print(f"✓ Success ({elapsed:.2f}s)") @@ -313,8 +312,7 @@ def run_stage_with_stdout_capture(self, name: str, command: list, output_file: s # Load and summarize output if os.path.exists(output_file): - with open(output_file, 'r') as f: - data = json.load(f) + data = read_json(output_file) summary = self._summarize_output(name, data) else: summary = {} @@ -391,8 +389,7 @@ def run_context_enhancer(self) -> bool: try: # Load dataset - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Enhance with LLM enhancer = ContextEnhancer() @@ -432,8 +429,7 @@ def run_context_enhancer(self) -> bool: } # Write back - with open(self.dataset_file, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(self.dataset_file, enhanced) elapsed = (datetime.now() - start_time).total_seconds() @@ -490,8 +486,7 @@ def apply_reachability_filter(self) -> bool: try: # Load analyzer output for call graph - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) call_graph = analyzer.get("call_graph", analyzer.get("callGraph", {})) @@ -510,8 +505,7 @@ def apply_reachability_filter(self) -> bool: self.reachable_units = reachability.get_all_reachable() # Load and filter dataset - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -539,8 +533,7 @@ def apply_reachability_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -590,8 +583,7 @@ def _detect_codeql_language(self) -> str: return "javascript" # Default try: - with open(self.scan_results_file, 'r') as f: - scan_data = json.load(f) + scan_data = read_json(self.scan_results_file) stats = scan_data.get('statistics', {}) by_extension = stats.get('byExtension', {}) @@ -642,7 +634,7 @@ def run_codeql_analysis(self) -> bool: '--overwrite' ] - result = subprocess.run( + result = run_utf8( create_db_cmd, capture_output=True, text=True, @@ -673,7 +665,7 @@ def run_codeql_analysis(self) -> bool: f'codeql/{language}-queries:codeql-suites/{language}-security-extended.qls' ] - result = subprocess.run( + result = run_utf8( analyze_cmd, capture_output=True, text=True, @@ -706,8 +698,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: - sarif_data = json.load(f) + sarif_data = read_json(sarif_output) # Extract findings and map to file:line self.codeql_findings = [] @@ -830,8 +821,7 @@ def apply_codeql_filter(self) -> bool: try: # Load analyzer output to get function line ranges - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) @@ -869,8 +859,7 @@ def apply_codeql_filter(self) -> bool: self.codeql_flagged_units.add(func_id) # Load and filter dataset - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -891,8 +880,7 @@ def apply_codeql_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -955,8 +943,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -985,8 +972,7 @@ def apply_exploitable_filter(self) -> bool: } # Write filtered dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -1143,7 +1129,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open_utf8(results_file, 'w') as f: # Remove stdout/stderr from saved results (too verbose) clean_results = { 'repository': self.results['repository'], diff --git a/libs/openant-core/parsers/php/call_graph_builder.py b/libs/openant-core/parsers/php/call_graph_builder.py index dfa441e..42e37bb 100644 --- a/libs/openant-core/parsers/php/call_graph_builder.py +++ b/libs/openant-core/parsers/php/call_graph_builder.py @@ -39,6 +39,7 @@ import tree_sitter_php as ts_php from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 PHP_LANGUAGE = Language(ts_php.language_php()) @@ -482,9 +483,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - extractor_output = json.load(f) - + extractor_output = read_json(args.input_file) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) builder = CallGraphBuilder(extractor_output, {'max_depth': args.depth}) @@ -503,7 +502,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/php/function_extractor.py b/libs/openant-core/parsers/php/function_extractor.py index bdedecf..2c9039a 100644 --- a/libs/openant-core/parsers/php/function_extractor.py +++ b/libs/openant-core/parsers/php/function_extractor.py @@ -42,6 +42,7 @@ import tree_sitter_php as ts_php from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 PHP_LANGUAGE = Language(ts_php.language_php()) @@ -547,8 +548,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: - scan_result = json.load(f) + scan_result = read_json(args.scan_file) result = extractor.extract_from_scan(scan_result) else: result = extractor.extract_all() @@ -556,7 +556,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/php/repository_scanner.py b/libs/openant-core/parsers/php/repository_scanner.py index bd8a2d9..89781ff 100644 --- a/libs/openant-core/parsers/php/repository_scanner.py +++ b/libs/openant-core/parsers/php/repository_scanner.py @@ -30,6 +30,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 class RepositoryScanner: @@ -236,7 +237,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/php/test_pipeline.py b/libs/openant-core/parsers/php/test_pipeline.py index fd10477..7529ea9 100644 --- a/libs/openant-core/parsers/php/test_pipeline.py +++ b/libs/openant-core/parsers/php/test_pipeline.py @@ -42,6 +42,7 @@ from enum import Enum from pathlib import Path from typing import Set +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) @@ -139,8 +140,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: - json.dump(scan_result, f, indent=2) + write_json(self.scan_results_file, scan_result) # Stage 2: Extract functions print(" [2/4] Extracting functions via tree-sitter...") @@ -178,13 +178,11 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: - json.dump(analyzer_output, f, indent=2) + write_json(self.analyzer_output_file, analyzer_output) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,8 +240,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) @@ -262,8 +259,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) call_graph = {} reverse_call_graph = {} @@ -313,8 +309,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -379,7 +374,7 @@ def run_codeql_analysis(self) -> bool: '--overwrite' ] - result = subprocess.run( + result = run_utf8( create_db_cmd, capture_output=True, text=True, @@ -410,7 +405,7 @@ def run_codeql_analysis(self) -> bool: f'codeql/{language}-queries:codeql-suites/{language}-security-extended.qls' ] - result = subprocess.run( + result = run_utf8( analyze_cmd, capture_output=True, text=True, @@ -443,8 +438,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: - sarif_data = json.load(f) + sarif_data = read_json(sarif_output) self.codeql_findings = [] @@ -555,8 +549,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Build mapping of file -> [(start_line, end_line, func_id)] file_functions = {} @@ -605,8 +598,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,8 +654,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) enhancer = ContextEnhancer() @@ -695,8 +686,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(self.dataset_file, enhanced) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,8 +730,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -767,8 +756,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +896,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open_utf8(results_file, 'w') as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/php/unit_generator.py b/libs/openant-core/parsers/php/unit_generator.py index 9b36684..63f9fff 100644 --- a/libs/openant-core/parsers/php/unit_generator.py +++ b/libs/openant-core/parsers/php/unit_generator.py @@ -28,6 +28,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 # File boundary marker for enhanced code (PHP uses // comments) @@ -344,9 +345,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - call_graph_data = json.load(f) - + call_graph_data = read_json(args.input_file) options = { 'max_depth': args.depth, } @@ -374,7 +373,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -383,8 +382,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: - json.dump(analyzer, f, indent=2) + write_json(args.analyzer_output, analyzer) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) except Exception as e: diff --git a/libs/openant-core/parsers/python/ast_parser.py b/libs/openant-core/parsers/python/ast_parser.py index e4cdc21..63b4895 100644 --- a/libs/openant-core/parsers/python/ast_parser.py +++ b/libs/openant-core/parsers/python/ast_parser.py @@ -13,10 +13,12 @@ import ast import json import os +from utilities.file_io import open_utf8 import re import sys from pathlib import Path from typing import Dict, List, Optional, Tuple +from utilities.file_io import read_json, write_json, open_utf8 class PythonRouteParser: @@ -35,7 +37,8 @@ def detect_framework(self) -> str: for f in files: try: - content = f.read_text() + with open_utf8(f, errors="replace") as _f: + content = _f.read() if "from django" in content or "django.urls" in content: return "django" if "from flask" in content or "Flask(" in content: @@ -76,7 +79,8 @@ def _read_file(self, file_path: Path) -> str: path_str = str(file_path) if path_str not in self.file_cache: try: - self.file_cache[path_str] = file_path.read_text() + with open_utf8(file_path, errors="replace") as _f: + self.file_cache[path_str] = _f.read() except Exception as e: print(f"Error reading {file_path}: {e}") self.file_cache[path_str] = "" @@ -461,8 +465,7 @@ def main(): result = parser.parse() if output_file: - with open(output_file, 'w') as f: - json.dump(result, f, indent=2) + write_json(output_file, result) print(f"Output written to {output_file}") else: print(json.dumps(result, indent=2)) diff --git a/libs/openant-core/parsers/python/call_graph_builder.py b/libs/openant-core/parsers/python/call_graph_builder.py index 3d92b25..a6741cc 100644 --- a/libs/openant-core/parsers/python/call_graph_builder.py +++ b/libs/openant-core/parsers/python/call_graph_builder.py @@ -38,6 +38,7 @@ import textwrap from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple +from utilities.file_io import read_json, write_json, open_utf8 class CallGraphBuilder: @@ -492,9 +493,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - extractor_output = json.load(f) - + extractor_output = read_json(args.input_file) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) builder = CallGraphBuilder(extractor_output, {'max_depth': args.depth}) @@ -513,7 +512,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/python/dataset_enhancer.py b/libs/openant-core/parsers/python/dataset_enhancer.py index d41f8a8..73efe06 100644 --- a/libs/openant-core/parsers/python/dataset_enhancer.py +++ b/libs/openant-core/parsers/python/dataset_enhancer.py @@ -13,6 +13,7 @@ import sys from pathlib import Path from typing import Dict, List, Optional, Set, Tuple +from utilities.file_io import read_json, write_json, open_utf8 class PythonDependencyResolver: @@ -29,7 +30,8 @@ def _read_file(self, file_path: Path) -> str: path_str = str(file_path) if path_str not in self.file_cache: try: - self.file_cache[path_str] = file_path.read_text() + with open_utf8(file_path, errors="replace") as _f: + self.file_cache[path_str] = _f.read() except Exception as e: self.file_cache[path_str] = "" return self.file_cache[path_str] @@ -226,9 +228,7 @@ def resolve_recursive(current_file: Path, current_code: str, depth: int): def enhance_dataset(dataset_path: str, repo_path: str, output_path: str = None): """Enhance a dataset with resolved dependencies.""" - with open(dataset_path, 'r') as f: - dataset = json.load(f) - + dataset = read_json(dataset_path) resolver = PythonDependencyResolver(repo_path) enhanced_units = [] @@ -263,8 +263,7 @@ def enhance_dataset(dataset_path: str, repo_path: str, output_path: str = None): dataset['enhanced'] = True if output_path: - with open(output_path, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(output_path, dataset) print(f"Enhanced dataset written to {output_path}") else: print(json.dumps(dataset, indent=2)) diff --git a/libs/openant-core/parsers/python/function_extractor.py b/libs/openant-core/parsers/python/function_extractor.py index 574ba08..8714e9d 100644 --- a/libs/openant-core/parsers/python/function_extractor.py +++ b/libs/openant-core/parsers/python/function_extractor.py @@ -64,6 +64,7 @@ from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple +from utilities.file_io import read_json, write_json, open_utf8 class FunctionExtractor: @@ -596,8 +597,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: - scan_result = json.load(f) + scan_result = read_json(args.scan_file) result = extractor.extract_from_scan(scan_result) else: result = extractor.extract_all() @@ -605,7 +605,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/parse_repository.py b/libs/openant-core/parsers/python/parse_repository.py index 45af852..18a61b7 100644 --- a/libs/openant-core/parsers/python/parse_repository.py +++ b/libs/openant-core/parsers/python/parse_repository.py @@ -52,6 +52,7 @@ from function_extractor import FunctionExtractor from call_graph_builder import CallGraphBuilder from unit_generator import UnitGenerator +from utilities.file_io import read_json, write_json, open_utf8 def generate_analyzer_output(extractor_result: dict) -> dict: @@ -138,8 +139,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: scan_file = Path(output_dir) / 'scan_result.json' - with open(scan_file, 'w') as f: - json.dump(scan_result, f, indent=2) + write_json(scan_file, scan_result) print(f" Saved: {scan_file}", file=sys.stderr) # Phase 2: Extract functions @@ -154,8 +154,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: extract_file = Path(output_dir) / 'functions.json' - with open(extract_file, 'w') as f: - json.dump(extractor_result, f, indent=2) + write_json(extract_file, extractor_result) print(f" Saved: {extract_file}", file=sys.stderr) # Phase 3: Build call graph @@ -171,8 +170,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: graph_file = Path(output_dir) / 'call_graph.json' - with open(graph_file, 'w') as f: - json.dump(call_graph_result, f, indent=2) + write_json(graph_file, call_graph_result) print(f" Saved: {graph_file}", file=sys.stderr) # Phase 4: Generate units @@ -199,8 +197,7 @@ def parse_repository(repo_path: str, options: dict = None) -> tuple: if output_dir: analyzer_file = Path(output_dir) / 'analyzer_output.json' - with open(analyzer_file, 'w') as f: - json.dump(analyzer_output, f, indent=2) + write_json(analyzer_file, analyzer_output) print(f" Saved: {analyzer_file}", file=sys.stderr) print(f"\n" + "=" * 60, file=sys.stderr) @@ -253,7 +250,7 @@ def main(): # Save dataset dataset_json = json.dumps(dataset, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(dataset_json) print(f"\nDataset written to: {args.output}", file=sys.stderr) else: @@ -261,8 +258,7 @@ def main(): # Save analyzer output if requested if args.analyzer_output: - with open(args.analyzer_output, 'w') as f: - json.dump(analyzer_output, f, indent=2) + write_json(args.analyzer_output, analyzer_output) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) except Exception as e: diff --git a/libs/openant-core/parsers/python/repository_scanner.py b/libs/openant-core/parsers/python/repository_scanner.py index e2ab1f0..108eac5 100644 --- a/libs/openant-core/parsers/python/repository_scanner.py +++ b/libs/openant-core/parsers/python/repository_scanner.py @@ -30,6 +30,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 class RepositoryScanner: @@ -289,7 +290,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/python/unit_generator.py b/libs/openant-core/parsers/python/unit_generator.py index a7d2680..19af301 100644 --- a/libs/openant-core/parsers/python/unit_generator.py +++ b/libs/openant-core/parsers/python/unit_generator.py @@ -53,6 +53,7 @@ from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 # File boundary marker for enhanced code @@ -400,9 +401,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - call_graph_data = json.load(f) - + call_graph_data = read_json(args.input_file) options = { 'max_depth': args.depth, } @@ -430,7 +429,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/ruby/call_graph_builder.py b/libs/openant-core/parsers/ruby/call_graph_builder.py index 3c4b3ea..7e5d533 100644 --- a/libs/openant-core/parsers/ruby/call_graph_builder.py +++ b/libs/openant-core/parsers/ruby/call_graph_builder.py @@ -39,6 +39,7 @@ import tree_sitter_ruby as ts_ruby from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 RUBY_LANGUAGE = Language(ts_ruby.language()) @@ -441,9 +442,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - extractor_output = json.load(f) - + extractor_output = read_json(args.input_file) print(f"Processing {len(extractor_output.get('functions', {}))} functions...", file=sys.stderr) builder = CallGraphBuilder(extractor_output, {'max_depth': args.depth}) @@ -462,7 +461,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Output written to: {args.output}", file=sys.stderr) else: diff --git a/libs/openant-core/parsers/ruby/function_extractor.py b/libs/openant-core/parsers/ruby/function_extractor.py index f2f1dc3..798945b 100644 --- a/libs/openant-core/parsers/ruby/function_extractor.py +++ b/libs/openant-core/parsers/ruby/function_extractor.py @@ -42,6 +42,7 @@ import tree_sitter_ruby as ts_ruby from tree_sitter import Language, Parser +from utilities.file_io import read_json, write_json, open_utf8 RUBY_LANGUAGE = Language(ts_ruby.language()) @@ -444,8 +445,7 @@ def main(): extractor = FunctionExtractor(args.repo_path) if args.scan_file: - with open(args.scan_file) as f: - scan_result = json.load(f) + scan_result = read_json(args.scan_file) result = extractor.extract_from_scan(scan_result) else: result = extractor.extract_all() @@ -453,7 +453,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Extraction complete. Results written to: {args.output}", file=sys.stderr) print(f"Total functions: {result['statistics']['total_functions']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/ruby/repository_scanner.py b/libs/openant-core/parsers/ruby/repository_scanner.py index 65b9a14..d561e5b 100644 --- a/libs/openant-core/parsers/ruby/repository_scanner.py +++ b/libs/openant-core/parsers/ruby/repository_scanner.py @@ -30,6 +30,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 class RepositoryScanner: @@ -240,7 +241,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"Scan complete. Results written to: {args.output}", file=sys.stderr) print(f"Total files found: {result['statistics']['total_files']}", file=sys.stderr) diff --git a/libs/openant-core/parsers/ruby/test_pipeline.py b/libs/openant-core/parsers/ruby/test_pipeline.py index cffe880..947d495 100644 --- a/libs/openant-core/parsers/ruby/test_pipeline.py +++ b/libs/openant-core/parsers/ruby/test_pipeline.py @@ -42,6 +42,7 @@ from enum import Enum from pathlib import Path from typing import Set +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # Add parent directory to path for utilities import sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) @@ -139,8 +140,7 @@ def run_parser_pipeline(self) -> bool: # Save scan results self.scan_results_file = os.path.join(self.output_dir, 'scan_results.json') - with open(self.scan_results_file, 'w') as f: - json.dump(scan_result, f, indent=2) + write_json(self.scan_results_file, scan_result) # Stage 2: Extract functions print(" [2/4] Extracting functions via tree-sitter...") @@ -178,13 +178,11 @@ def run_parser_pipeline(self) -> bool: print(f" Avg upstream deps: {dataset['statistics']['avg_upstream']}") # Write dataset - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) # Write analyzer output analyzer_output = generator.generate_analyzer_output() - with open(self.analyzer_output_file, 'w') as f: - json.dump(analyzer_output, f, indent=2) + write_json(self.analyzer_output_file, analyzer_output) elapsed = (datetime.now() - start_time).total_seconds() @@ -242,8 +240,7 @@ def apply_reachability_filter(self) -> bool: start_time = datetime.now() try: - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) functions = analyzer.get("functions", {}) @@ -262,8 +259,7 @@ def apply_reachability_filter(self) -> bool: } # Build call graph from dataset unit metadata - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) call_graph = {} reverse_call_graph = {} @@ -313,8 +309,7 @@ def apply_reachability_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -379,7 +374,7 @@ def run_codeql_analysis(self) -> bool: '--overwrite' ] - result = subprocess.run( + result = run_utf8( create_db_cmd, capture_output=True, text=True, @@ -410,7 +405,7 @@ def run_codeql_analysis(self) -> bool: f'codeql/{language}-queries:codeql-suites/{language}-security-extended.qls' ] - result = subprocess.run( + result = run_utf8( analyze_cmd, capture_output=True, text=True, @@ -443,8 +438,7 @@ def run_codeql_analysis(self) -> bool: } return False - with open(sarif_output, 'r') as f: - sarif_data = json.load(f) + sarif_data = read_json(sarif_output) self.codeql_findings = [] @@ -555,8 +549,7 @@ def apply_codeql_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) # Build mapping of file -> [(start_line, end_line, func_id)] file_functions = {} @@ -605,8 +598,7 @@ def apply_codeql_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -662,8 +654,7 @@ def run_context_enhancer(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) enhancer = ContextEnhancer() @@ -695,8 +686,7 @@ def run_context_enhancer(self) -> bool: 'data_flows_extracted': enhancer.stats['data_flows_extracted'] } - with open(self.dataset_file, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(self.dataset_file, enhanced) elapsed = (datetime.now() - start_time).total_seconds() @@ -740,8 +730,7 @@ def apply_exploitable_filter(self) -> bool: start_time = datetime.now() try: - with open(self.dataset_file, 'r') as f: - dataset = json.load(f) + dataset = read_json(self.dataset_file) units = dataset.get("units", []) original_count = len(units) @@ -767,8 +756,7 @@ def apply_exploitable_filter(self) -> bool: "reduction_percentage": round((1 - len(filtered_units) / original_count) * 100, 1) if original_count > 0 else 0 } - with open(self.dataset_file, 'w') as f: - json.dump(dataset, f, indent=2) + write_json(self.dataset_file, dataset) elapsed = (datetime.now() - start_time).total_seconds() @@ -908,7 +896,7 @@ def run_full_pipeline(self): # Save results summary results_file = os.path.join(self.output_dir, 'pipeline_results.json') - with open(results_file, 'w') as f: + with open_utf8(results_file, 'w') as f: clean_results = { 'repository': self.results['repository'], 'test_time': self.results['test_time'], diff --git a/libs/openant-core/parsers/ruby/unit_generator.py b/libs/openant-core/parsers/ruby/unit_generator.py index 184a221..424d215 100644 --- a/libs/openant-core/parsers/ruby/unit_generator.py +++ b/libs/openant-core/parsers/ruby/unit_generator.py @@ -28,6 +28,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set +from utilities.file_io import read_json, write_json, open_utf8 # File boundary marker for enhanced code (Ruby uses # comments) @@ -344,9 +345,7 @@ def main(): args = parser.parse_args() try: - with open(args.input_file) as f: - call_graph_data = json.load(f) - + call_graph_data = read_json(args.input_file) options = { 'max_depth': args.depth, } @@ -374,7 +373,7 @@ def main(): output = json.dumps(result, indent=2) if args.output: - with open(args.output, 'w') as f: + with open_utf8(args.output, 'w') as f: f.write(output) print(f"\nOutput written to: {args.output}", file=sys.stderr) else: @@ -383,8 +382,7 @@ def main(): # Write analyzer output if requested if args.analyzer_output: analyzer = generator.generate_analyzer_output() - with open(args.analyzer_output, 'w') as f: - json.dump(analyzer, f, indent=2) + write_json(args.analyzer_output, analyzer) print(f"Analyzer output written to: {args.analyzer_output}", file=sys.stderr) except Exception as e: diff --git a/libs/openant-core/parsers/zig/call_graph_builder.py b/libs/openant-core/parsers/zig/call_graph_builder.py index 52f661d..fbd6fd5 100644 --- a/libs/openant-core/parsers/zig/call_graph_builder.py +++ b/libs/openant-core/parsers/zig/call_graph_builder.py @@ -4,11 +4,12 @@ Builds bidirectional call graphs showing function dependencies. """ -import json import re from collections import defaultdict from typing import Dict, Any, List, Set +from utilities.file_io import write_json + import tree_sitter_zig as ts_zig from tree_sitter import Language, Parser, Node @@ -321,5 +322,4 @@ def _resolve_call( def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save call graph to a JSON file.""" - with open(output_path, "w") as f: - json.dump(results, f, indent=2) + write_json(output_path, results) diff --git a/libs/openant-core/parsers/zig/function_extractor.py b/libs/openant-core/parsers/zig/function_extractor.py index f3348a0..647f0cd 100644 --- a/libs/openant-core/parsers/zig/function_extractor.py +++ b/libs/openant-core/parsers/zig/function_extractor.py @@ -4,11 +4,12 @@ Extracts functions, methods, and structs from Zig source files using tree-sitter. """ -import json from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, List +from utilities.file_io import write_json + import tree_sitter_zig as ts_zig from tree_sitter import Language, Parser, Node @@ -276,5 +277,4 @@ def _classify_function(self, name: str, file_path: str) -> str: def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save extraction results to a JSON file.""" - with open(output_path, "w") as f: - json.dump(results, f, indent=2) + write_json(output_path, results) diff --git a/libs/openant-core/parsers/zig/repository_scanner.py b/libs/openant-core/parsers/zig/repository_scanner.py index ae09564..bb98819 100644 --- a/libs/openant-core/parsers/zig/repository_scanner.py +++ b/libs/openant-core/parsers/zig/repository_scanner.py @@ -5,11 +5,12 @@ """ import os -import json from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional +from utilities.file_io import write_json + class RepositoryScanner: """Scans a repository for Zig source files.""" @@ -131,5 +132,4 @@ def _is_test_file(self, filepath: str) -> bool: def save_results(self, output_path: str, results: Dict[str, Any]) -> None: """Save scan results to a JSON file.""" - with open(output_path, "w") as f: - json.dump(results, f, indent=2) + write_json(output_path, results) diff --git a/libs/openant-core/parsers/zig/test_pipeline.py b/libs/openant-core/parsers/zig/test_pipeline.py index b4a9832..d9e0621 100644 --- a/libs/openant-core/parsers/zig/test_pipeline.py +++ b/libs/openant-core/parsers/zig/test_pipeline.py @@ -20,6 +20,7 @@ import json import sys from pathlib import Path +from utilities.file_io import write_json # Add parent directories to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) @@ -96,10 +97,8 @@ def main(): "statistics": {"total_units": 0, "by_type": {}}, "metadata": {"generator": "zig_unit_generator.py"}, } - with open(output_dir / "dataset.json", "w") as f: - json.dump(empty_dataset, f, indent=2) - with open(output_dir / "analyzer_output.json", "w") as f: - json.dump({"repository": str(repo_path), "functions": {}}, f, indent=2) + write_json(output_dir / "dataset.json", empty_dataset) + write_json(output_dir / "analyzer_output.json", {"repository": str(repo_path), "functions": {}}) return 0 # Stage 2: Function Extractor diff --git a/libs/openant-core/parsers/zig/unit_generator.py b/libs/openant-core/parsers/zig/unit_generator.py index de1ce1c..71a306e 100644 --- a/libs/openant-core/parsers/zig/unit_generator.py +++ b/libs/openant-core/parsers/zig/unit_generator.py @@ -4,11 +4,12 @@ Creates self-contained analysis units with dependency context. """ -import json from datetime import datetime from pathlib import Path from typing import Dict, Any, List, Optional, Set +from utilities.file_io import write_json + class UnitGenerator: """Generates analysis units from call graph data.""" @@ -246,8 +247,6 @@ def save_results( output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) - with open(output_path / "dataset.json", "w") as f: - json.dump(dataset, f, indent=2) + write_json(output_path / "dataset.json", dataset) - with open(output_path / "analyzer_output.json", "w") as f: - json.dump(analyzer_output, f, indent=2) + write_json(output_path / "analyzer_output.json", analyzer_output) diff --git a/libs/openant-core/report/__main__.py b/libs/openant-core/report/__main__.py index fbe6515..1ed32ce 100644 --- a/libs/openant-core/report/__main__.py +++ b/libs/openant-core/report/__main__.py @@ -9,17 +9,17 @@ """ import argparse -import json import sys from pathlib import Path from .generator import generate_summary_report, generate_disclosure, generate_all from .schema import validate_pipeline_output, ValidationError +from utilities.file_io import open_utf8, read_json def cmd_summary(args): """Generate summary report.""" - pipeline_data = json.loads(Path(args.input).read_text()) + pipeline_data = read_json(args.input) try: validate_pipeline_output(pipeline_data) @@ -32,14 +32,15 @@ def cmd_summary(args): output_path = Path(args.output) if args.output else Path("SUMMARY_REPORT.md") output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text(report) + with open_utf8(output_path, "w") as f: + f.write(report) print(f" -> {output_path}") print(f" Cost: ${usage['cost_usd']:.4f} ({usage['total_tokens']:,} tokens)") def cmd_disclosures(args): """Generate disclosure documents.""" - pipeline_data = json.loads(Path(args.input).read_text()) + pipeline_data = read_json(args.input) try: validate_pipeline_output(pipeline_data) @@ -62,7 +63,8 @@ def cmd_disclosures(args): safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" - (output_dir / filename).write_text(disclosure) + with open_utf8(output_dir / filename, "w") as f: + f.write(disclosure) print(f" -> {output_dir / filename}") count += 1 diff --git a/libs/openant-core/report/generator.py b/libs/openant-core/report/generator.py index c996250..87c31a0 100644 --- a/libs/openant-core/report/generator.py +++ b/libs/openant-core/report/generator.py @@ -13,6 +13,7 @@ from dotenv import load_dotenv from .schema import validate_pipeline_output, ValidationError +from utilities.file_io import read_json load_dotenv() @@ -63,7 +64,8 @@ def _check_api_key(): def load_prompt(name: str) -> str: """Load a prompt template from the prompts directory.""" - return (PROMPTS_DIR / f"{name}.txt").read_text() + with open_utf8(PROMPTS_DIR / f"{name}.txt") as f: + return f.read() def merge_dynamic_results(pipeline_data: dict, pipeline_path: str) -> dict: @@ -76,7 +78,7 @@ def merge_dynamic_results(pipeline_data: dict, pipeline_path: str) -> dict: if not dynamic_path.exists(): return pipeline_data - dynamic_data = json.loads(dynamic_path.read_text()) + dynamic_data = read_json(dynamic_path) results_by_id = {} for result in dynamic_data.get("results", []): fid = result.get("finding_id") @@ -233,7 +235,7 @@ def generate_disclosure(vulnerability_data: dict, product_name: str) -> tuple[st def generate_all(pipeline_path: str, output_dir: str) -> None: """Generate all reports from a pipeline output file.""" - pipeline_data = json.loads(Path(pipeline_path).read_text()) + pipeline_data = read_json(pipeline_path) try: validate_pipeline_output(pipeline_data) @@ -247,7 +249,8 @@ def generate_all(pipeline_path: str, output_dir: str) -> None: # Generate summary report print("Generating summary report...") summary, _usage = generate_summary_report(pipeline_data) - (output_path / "SUMMARY_REPORT.md").write_text(summary) + with open_utf8(output_path / "SUMMARY_REPORT.md", "w") as f: + f.write(summary) print(f" -> {output_path / 'SUMMARY_REPORT.md'}") # Generate disclosure for each confirmed vulnerability @@ -265,7 +268,8 @@ def generate_all(pipeline_path: str, output_dir: str) -> None: safe_name = finding["short_name"].replace(" ", "_").upper() filename = f"DISCLOSURE_{i:02d}_{safe_name}.md" - (disclosures_dir / filename).write_text(disclosure) + with open_utf8(disclosures_dir / filename, "w") as f: + f.write(disclosure) print(f" -> {disclosures_dir / filename}") diff --git a/libs/openant-core/tests/test_file_io.py b/libs/openant-core/tests/test_file_io.py new file mode 100644 index 0000000..3aeb532 --- /dev/null +++ b/libs/openant-core/tests/test_file_io.py @@ -0,0 +1,388 @@ +"""Tests for utilities.file_io UTF-8 helpers and a regression scan.""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import sys +from pathlib import Path + +import pytest + +CORE_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(CORE_ROOT)) + +from utilities.file_io import open_utf8, read_json, run_utf8, write_json # noqa: E402 + + +NON_ASCII = "héllo 日本語 — café" + + +# --------------------------------------------------------------------------- +# Helper unit tests +# --------------------------------------------------------------------------- + +def test_open_utf8_round_trip(tmp_path: Path): + p = tmp_path / "x.txt" + with open_utf8(p, "w") as f: + f.write(NON_ASCII) + with open_utf8(p) as f: + assert f.read() == NON_ASCII + + +def test_open_utf8_passes_through_binary_mode(tmp_path: Path): + """Binary mode should not get encoding= injected.""" + p = tmp_path / "raw.bin" + payload = NON_ASCII.encode("utf-8") + with open_utf8(p, "wb") as f: + f.write(payload) + with open_utf8(p, "rb") as f: + assert f.read() == payload + + +def test_open_utf8_caller_encoding_wins(tmp_path: Path): + """If caller explicitly passes encoding=, helper must not override it.""" + p = tmp_path / "y.txt" + p.write_bytes("café".encode("latin-1")) + with open_utf8(p, encoding="latin-1") as f: + assert f.read() == "café" + + +def test_read_json_round_trip(tmp_path: Path): + p = tmp_path / "data.json" + obj = {"greeting": NON_ASCII, "list": ["a", NON_ASCII, "b"]} + write_json(p, obj) + assert read_json(p) == obj + + +def test_write_json_uses_utf8(tmp_path: Path): + """write_json must encode non-ASCII as UTF-8 bytes (not cp1252).""" + p = tmp_path / "data.json" + write_json(p, {"k": NON_ASCII}) + raw = p.read_bytes() + # The non-ASCII characters should appear as their UTF-8 encoding (or as + # JSON-escaped \uXXXX sequences — both are valid; the key is that the + # file does not contain a cp1252-encoded ?-replacement). + decoded = raw.decode("utf-8") + parsed = json.loads(decoded) + assert parsed["k"] == NON_ASCII + + +def test_write_json_default_indent(tmp_path: Path): + """write_json should pretty-print by default for human readability.""" + p = tmp_path / "data.json" + write_json(p, {"a": 1, "b": 2}) + text = p.read_text(encoding="utf-8") + # Indented output spans multiple lines. + assert "\n" in text + + +# --------------------------------------------------------------------------- +# run_utf8 subprocess test +# --------------------------------------------------------------------------- + +def test_run_utf8_captures_non_ascii_text(): + """run_utf8 with text=True must decode UTF-8 stdout without raising on cp1252.""" + code = ( + "import sys; " + "sys.stdout.buffer.write('" + + NON_ASCII + + "'.encode('utf-8'))" + ) + result = run_utf8( + [sys.executable, "-c", code], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == NON_ASCII + + +def test_run_utf8_universal_newlines_alias(tmp_path: Path): + """universal_newlines=True is an alias for text=True; must also get UTF-8.""" + code = ( + "import sys; " + "sys.stdout.buffer.write('" + + NON_ASCII + + "'.encode('utf-8'))" + ) + result = run_utf8( + [sys.executable, "-c", code], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == NON_ASCII + + +def test_run_utf8_invalid_bytes_replaced_not_raised(): + """errors='replace' default means invalid bytes don't raise.""" + code = ( + "import sys; " + "sys.stdout.buffer.write(b'good\\x9d_bad')" + ) + result = run_utf8( + [sys.executable, "-c", code], + capture_output=True, + text=True, + timeout=30, + ) + assert result.returncode == 0 + # Invalid byte 0x9d is replaced by U+FFFD rather than raising. + assert "good" in result.stdout + assert "bad" in result.stdout + + +def test_run_utf8_caller_can_override_errors_default_strict(): + """Without text=True, run_utf8 should not inject errors='replace'. + + Confirms that the encoding/errors injection only fires for text-mode + captures, leaving binary subprocess invocations untouched. + """ + result = run_utf8( + [sys.executable, "-c", "import sys; sys.stdout.buffer.write(b'\\x9d')"], + capture_output=True, + timeout=30, + ) + assert result.returncode == 0 + assert result.stdout == b"\x9d" + + +def test_run_utf8_does_not_override_explicit_encoding(): + """If caller passes encoding= explicitly, run_utf8 must not overwrite it.""" + result = run_utf8( + [sys.executable, "-c", "print('caf\\xe9')"], + capture_output=True, + text=True, + encoding="latin-1", + timeout=30, + ) + assert result.returncode == 0 + assert "café" in result.stdout + + +# --------------------------------------------------------------------------- +# Regression scan: no bare open() calls reappear in non-test code +# --------------------------------------------------------------------------- + +def _iter_python_sources(root: Path): + for p in root.rglob("*.py"): + rel = p.relative_to(root).as_posix() + if rel.startswith("tests/"): + continue + if rel == "utilities/file_io.py": + continue + # Skip vendored/build artifacts + if any(part in {".venv", "venv", "build", "dist", "__pycache__"} for part in p.parts): + continue + yield p + + +_OPEN_CALL_RE = re.compile(r"(? str: + """Replace string literals and comments with spaces so identifier matches inside + docstrings/comments don't trigger the regression check.""" + out = [] + i = 0 + n = len(text) + in_str = None + triple = False + while i < n: + c = text[i] + if in_str: + if c == "\\" and not triple: + out.append(" ") + i += 2 + continue + if triple and text[i:i + 3] == in_str: + out.append(" ") + in_str = None + triple = False + i += 3 + continue + if not triple and c == in_str: + in_str = None + out.append(" ") + i += 1 + continue + if not triple and c == "\n": + in_str = None + out.append("\n") + i += 1 + continue + out.append("\n" if c == "\n" else " ") + i += 1 + continue + if c == "#": + nl = text.find("\n", i) + if nl == -1: + out.append(" " * (n - i)) + break + out.append(" " * (nl - i)) + i = nl + continue + if text[i:i + 3] in ('"""', "'''"): + in_str = text[i:i + 3] + triple = True + out.append(" ") + i += 3 + continue + if c in ("'", '"'): + in_str = c + out.append(" ") + i += 1 + continue + out.append(c) + i += 1 + return "".join(out) + + +def _has_encoding(call_args: str) -> bool: + return re.search(r"\bencoding\s*=", call_args) is not None + + +def _has_binary_mode(call_args: str) -> bool: + return re.search(r"""(['"])([rwax+]*b[rwax+]*)\1""", call_args) is not None + + +def _scan_calls(scrubbed: str, original: str, call_re: re.Pattern): + """Yield (line_number, args_text, original_line) for each call match.""" + for m in call_re.finditer(scrubbed): + i = m.end() + depth = 1 + while i < len(scrubbed) and depth: + ch = scrubbed[i] + if ch == "(": + depth += 1 + elif ch == ")": + depth -= 1 + i += 1 + if depth != 0: + continue + args = original[m.end():i - 1] + line = original[:m.start()].count("\n") + 1 + yield line, args, original.splitlines()[line - 1].strip() + + +def test_no_bare_open_in_non_test_code(): + """Regression: every text-mode `open(` call in non-test code must specify + encoding=, otherwise Windows defaults to cp1252 and crashes on non-ASCII + source code. + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _OPEN_CALL_RE): + if _has_binary_mode(args) or _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found bare open() calls without encoding= in non-test code. " + "Use utilities.file_io.open_utf8 / read_json / write_json or pass " + "encoding='utf-8' explicitly:\n " + "\n ".join(offenders) + ) + + +# Match `.read_text(` / `.write_text(` method calls (any object, including +# Path objects). Don't match `text=` kwargs or other identifiers ending in +# read_text/write_text. +_PATH_TEXT_RE = re.compile(r"\.(?:read_text|write_text)\s*\(") + + +# Match `path.open(`-style method calls. The bare ``open(`` case is handled +# above, so here we look explicitly for ``.open(`` (Path or file-like object +# method form) which has the same Windows cp1252 default behaviour as +# ``open()`` and is not caught by the bare-open regex. +_DOT_OPEN_RE = re.compile(r"\.open\s*\(") + + +def test_no_bare_pathlib_text_io_in_non_test_code(): + """Regression: ``Path.read_text()`` / ``write_text()`` default to the + system locale encoding on Python <3.10 and to ``locale.getpreferredencoding(False)`` + in 3.10+ unless ``-X utf8`` mode is on. On Windows that is cp1252, which + crashes on non-ASCII content. Every call in non-test code must pass + ``encoding=`` explicitly. + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _PATH_TEXT_RE): + if _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found Path.read_text()/write_text() calls without encoding= in " + "non-test code. Pass encoding='utf-8' explicitly:\n " + + "\n ".join(offenders) + ) + + +def test_no_bare_dot_open_in_non_test_code(): + """Regression: ``path.open()`` (the Path / file-like method form) defaults + to system locale encoding the same way ``open()`` does, and is not caught + by the bare-open regex above. Every text-mode call must pass ``encoding=``. + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _DOT_OPEN_RE): + if _has_binary_mode(args) or _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found .open() calls without encoding= in non-test code. " + "Pass encoding='utf-8' explicitly:\n " + "\n ".join(offenders) + ) + + +# Match `subprocess.run(` (covers `subprocess.run` and `sp.run` etc. via the +# right-hand identifier — restrict to the explicit form to avoid noise). +_SUBPROCESS_RUN_RE = re.compile(r"(? bool: + return ( + re.search(r"\btext\s*=\s*True", call_args) is not None + or re.search(r"\buniversal_newlines\s*=\s*True", call_args) is not None + ) + + +def test_no_bare_text_mode_subprocess_in_non_test_code(): + """Regression: ``subprocess.run(..., text=True)`` decodes stdout/stderr + with the system locale on Windows (cp1252), which crashes on non-ASCII + output from parsers, codeql, etc. Every text-mode subprocess call must + pass ``encoding=`` explicitly (or use ``utilities.file_io.run_utf8``). + """ + offenders: list[str] = [] + for path in _iter_python_sources(CORE_ROOT): + text = path.read_text(encoding="utf-8") + scrubbed = _strip_strings_and_comments(text) + for line, args, src in _scan_calls(scrubbed, text, _SUBPROCESS_RUN_RE): + if not _has_text_mode(args): + continue + if _has_encoding(args): + continue + rel = path.relative_to(CORE_ROOT).as_posix() + offenders.append(f"{rel}:{line}: {src}") + + assert not offenders, ( + "Found subprocess.run(..., text=True) calls without encoding= in " + "non-test code. Pass encoding='utf-8', errors='replace' explicitly " + "(or use utilities.file_io.run_utf8):\n " + "\n ".join(offenders) + ) diff --git a/libs/openant-core/tests/test_parser_adapter.py b/libs/openant-core/tests/test_parser_adapter.py index af209cb..0acc7f8 100644 --- a/libs/openant-core/tests/test_parser_adapter.py +++ b/libs/openant-core/tests/test_parser_adapter.py @@ -1,11 +1,11 @@ """Tests for core/parser_adapter.py — language detection and Python parsing.""" -import json import os from pathlib import Path import pytest from core.parser_adapter import detect_language, parse_repository +from utilities.file_io import read_json class TestDetectLanguage: @@ -65,8 +65,7 @@ def test_dataset_json_valid(self, sample_python_repo, tmp_output_dir): language="python", processing_level="all", ) - with open(result.dataset_path) as f: - dataset = json.load(f) + dataset = read_json(result.dataset_path) assert "units" in dataset assert len(dataset["units"]) > 0 @@ -77,8 +76,7 @@ def test_units_have_required_fields(self, sample_python_repo, tmp_output_dir): language="python", processing_level="all", ) - with open(result.dataset_path) as f: - dataset = json.load(f) + dataset = read_json(result.dataset_path) for unit in dataset["units"]: assert "id" in unit assert "code" in unit @@ -101,6 +99,5 @@ def test_analyzer_output_generated(self, sample_python_repo, tmp_output_dir): ) assert result.analyzer_output_path is not None assert Path(result.analyzer_output_path).exists() - with open(result.analyzer_output_path) as f: - data = json.load(f) + data = read_json(result.analyzer_output_path) assert "functions" in data diff --git a/libs/openant-core/utilities/agentic_enhancer/repository_index.py b/libs/openant-core/utilities/agentic_enhancer/repository_index.py index 06ef199..5af649c 100644 --- a/libs/openant-core/utilities/agentic_enhancer/repository_index.py +++ b/libs/openant-core/utilities/agentic_enhancer/repository_index.py @@ -14,11 +14,12 @@ load_index_from_file: Load index from analyzer_output.json file """ -import json import re from pathlib import Path from typing import Optional +from utilities.file_io import read_json + class RepositoryIndex: """ @@ -283,7 +284,6 @@ def load_index_from_file(analyzer_output_path: str, repo_path: str = None) -> Re Returns: RepositoryIndex instance """ - with open(analyzer_output_path, 'r') as f: - analyzer_output = json.load(f) + analyzer_output = read_json(analyzer_output_path) return RepositoryIndex(analyzer_output, repo_path) diff --git a/libs/openant-core/utilities/context_enhancer.py b/libs/openant-core/utilities/context_enhancer.py index 2ffbfe6..2f7dea2 100644 --- a/libs/openant-core/utilities/context_enhancer.py +++ b/libs/openant-core/utilities/context_enhancer.py @@ -28,6 +28,7 @@ from .llm_client import AnthropicClient, TokenTracker, get_global_tracker, reset_global_tracker from .agentic_enhancer import RepositoryIndex, enhance_unit_with_agent, load_index_from_file from .rate_limiter import get_rate_limiter, is_rate_limit_error, is_retryable_error +from .file_io import read_json, write_json # Avoid circular import — import checkpoint at usage site _StepCheckpoint = None @@ -504,8 +505,7 @@ def enhance_dataset_agentic( if unit_id in processed_ids: cp_file = os.path.join(checkpoint_dir, f"{self._safe_filename(unit_id)}.json") if os.path.exists(cp_file): - with open(cp_file, 'r') as f: - cp_data = json.load(f) + cp_data = read_json(cp_file) unit["agent_context"] = cp_data.get("agent_context", {}) if "code" in cp_data: unit["code"] = cp_data["code"] @@ -538,8 +538,7 @@ def enhance_dataset_agentic( if not os.path.exists(cp_file): continue try: - with open(cp_file, 'r') as f: - cp_data = json.load(f) + cp_data = read_json(cp_file) # Sum usage from all existing checkpoints (completed + errored) cp_usage = cp_data.get("usage", {}) _summary_input_tokens += cp_usage.get("input_tokens", 0) @@ -792,8 +791,7 @@ def _save_unit_checkpoint(self, unit: dict, checkpoint_dir: str): "output_tokens": meta.get("output_tokens", 0), "cost_usd": meta.get("cost_usd", 0.0), } - with open(filepath, 'w') as f: - json.dump(cp_data, f, indent=2) + write_json(filepath, cp_data) def _load_completed_units(self, checkpoint_dir: str) -> set: """Load the set of completed unit IDs from per-unit checkpoint files.""" @@ -805,8 +803,7 @@ def _load_completed_units(self, checkpoint_dir: str) -> set: continue filepath = os.path.join(checkpoint_dir, filename) try: - with open(filepath, 'r') as f: - cp_data = json.load(f) + cp_data = read_json(filepath) unit_id = cp_data.get("id") agent_ctx = cp_data.get("agent_context", {}) if unit_id and agent_ctx and not agent_ctx.get("error"): @@ -818,8 +815,7 @@ def _load_completed_units(self, checkpoint_dir: str) -> set: def _migrate_legacy_checkpoint(self, checkpoint_path: str, checkpoint_dir: str, units: list): """Migrate a legacy single-file checkpoint to per-unit checkpoint files.""" try: - with open(checkpoint_path, 'r') as f: - checkpoint_data = json.load(f) + checkpoint_data = read_json(checkpoint_path) for cp_unit in checkpoint_data.get("units", []): if cp_unit.get("agent_context") and not cp_unit["agent_context"].get("error"): self._save_unit_checkpoint(cp_unit, checkpoint_dir) @@ -998,8 +994,7 @@ def main(): logging.error(f"Error: Input file not found: {input_path}") return 1 - with open(input_path, 'r') as f: - dataset = json.load(f) + dataset = read_json(input_path) # Enhance enhancer = ContextEnhancer() @@ -1029,8 +1024,7 @@ def main(): # Write output output_path = Path(args.output) if args.output else input_path - with open(output_path, 'w') as f: - json.dump(enhanced, f, indent=2) + write_json(output_path, enhanced) logging.info(f"Enhanced dataset written to: {output_path}") return 0 diff --git a/libs/openant-core/utilities/dynamic_tester/__init__.py b/libs/openant-core/utilities/dynamic_tester/__init__.py index e533f6c..03922ad 100644 --- a/libs/openant-core/utilities/dynamic_tester/__init__.py +++ b/libs/openant-core/utilities/dynamic_tester/__init__.py @@ -20,6 +20,7 @@ from utilities.dynamic_tester.result_collector import collect_result from utilities.dynamic_tester.reporter import generate_report from utilities.llm_client import get_global_tracker +from utilities.file_io import read_json, write_json, open_utf8 def run_dynamic_tests( @@ -45,9 +46,7 @@ def run_dynamic_tests( List of DynamicTestResult objects """ # Load pipeline output - with open(pipeline_output_path, "r") as f: - pipeline = json.load(f) - + pipeline = read_json(pipeline_output_path) findings = pipeline.get("findings", []) repo_info = { "name": pipeline.get("repository", {}).get("name", "unknown"), @@ -253,13 +252,13 @@ def run_dynamic_tests( report_md = generate_report(results, repo_info["name"], total_cost) report_path = os.path.join(output_dir, "DYNAMIC_TEST_RESULTS.md") - with open(report_path, "w") as f: + with open_utf8(report_path, "w") as f: f.write(report_md) print(f"\nReport written to {report_path}", file=sys.stderr) # Save structured results JSON results_path = os.path.join(output_dir, "dynamic_test_results.json") - with open(results_path, "w") as f: + with open_utf8(results_path, "w") as f: json.dump({ "repository": repo_info["name"], "total_findings": len(findings), diff --git a/libs/openant-core/utilities/dynamic_tester/docker_executor.py b/libs/openant-core/utilities/dynamic_tester/docker_executor.py index 04a45d3..87dec73 100644 --- a/libs/openant-core/utilities/dynamic_tester/docker_executor.py +++ b/libs/openant-core/utilities/dynamic_tester/docker_executor.py @@ -12,6 +12,7 @@ import tempfile import time import uuid +from utilities.file_io import open_utf8, run_utf8 # Timeouts DEFAULT_CONTAINER_TIMEOUT = 120 # seconds per container @@ -74,14 +75,14 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = shutil.copy2(source_file, os.path.join(work_dir, os.path.basename(source_file))) # Write Dockerfile - with open(os.path.join(work_dir, "Dockerfile"), "w") as f: + with open_utf8(os.path.join(work_dir, "Dockerfile"), "w") as f: f.write(generation["dockerfile"]) # Write test script test_filename = generation.get("test_filename", "test_exploit.py") test_path = os.path.join(work_dir, test_filename) os.makedirs(os.path.dirname(test_path), exist_ok=True) - with open(test_path, "w") as f: + with open_utf8(test_path, "w") as f: f.write(generation["test_script"]) # Write requirements/dependencies file @@ -89,7 +90,7 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = req_filename = generation.get("requirements_filename", "requirements.txt") req_path = os.path.join(work_dir, req_filename) os.makedirs(os.path.dirname(req_path), exist_ok=True) - with open(req_path, "w") as f: + with open_utf8(req_path, "w") as f: f.write(generation["requirements"]) # Copy attacker server if needed (before docker-compose so it's available) @@ -98,21 +99,21 @@ def _write_test_files(work_dir: str, generation: dict, source_file: str | None = os.makedirs(attacker_dir, exist_ok=True) shutil.copy2(ATTACKER_SERVER_PATH, os.path.join(attacker_dir, "server.py")) # Write attacker Dockerfile - with open(os.path.join(attacker_dir, "Dockerfile"), "w") as f: + with open_utf8(os.path.join(attacker_dir, "Dockerfile"), "w") as f: f.write("FROM python:3.11-slim\nWORKDIR /app\nCOPY server.py .\n" "EXPOSE 9999\nCMD [\"python\", \"server.py\"]\n") # Write docker-compose if multi-service, with sanitization if generation.get("docker_compose"): compose_content = _sanitize_compose(generation["docker_compose"]) - with open(os.path.join(work_dir, "docker-compose.yml"), "w") as f: + with open_utf8(os.path.join(work_dir, "docker-compose.yml"), "w") as f: f.write(compose_content) def _run_command(cmd: list[str], timeout: int, cwd: str = None) -> tuple[str, str, int, bool]: """Run a command with timeout. Returns (stdout, stderr, exit_code, timed_out).""" try: - result = subprocess.run( + result = run_utf8( cmd, capture_output=True, text=True, diff --git a/libs/openant-core/utilities/file_io.py b/libs/openant-core/utilities/file_io.py new file mode 100644 index 0000000..bc8d22f --- /dev/null +++ b/libs/openant-core/utilities/file_io.py @@ -0,0 +1,60 @@ +"""Centralized file I/O and subprocess helpers for Windows UTF-8 compatibility. + +On Windows, Python's default encoding is often ``cp1252`` (charmap), which +cannot decode common UTF-8 sequences found in source code. These thin +wrappers ensure that every file open and subprocess call uses UTF-8 +explicitly, preventing ``'charmap' codec can't decode byte ...`` errors. +""" + +import json +import os +import subprocess +from typing import Any, Union + +# Accept str, Path, or any os.PathLike +PathLike = Union[str, os.PathLike] + + +def open_utf8(path: PathLike, mode: str = "r", **kwargs): + """Open a file with UTF-8 encoding by default. + + Drop-in replacement for ``open()`` that sets ``encoding='utf-8'`` unless + the caller explicitly provides a different encoding or opens in binary + mode. + """ + if "b" not in mode and "encoding" not in kwargs: + kwargs["encoding"] = "utf-8" + return open(path, mode, **kwargs) + + +def read_json(path: PathLike) -> Any: + """Read and parse a JSON file using UTF-8 encoding.""" + with open_utf8(path, "r") as f: + return json.load(f) + + +def write_json(path: PathLike, data: Any, **kwargs) -> None: + """Write data as JSON to a file using UTF-8 encoding.""" + kwargs.setdefault("indent", 2) + with open_utf8(path, "w") as f: + json.dump(data, f, **kwargs) + + +def run_utf8(*args, **kwargs) -> subprocess.CompletedProcess: + """Run a subprocess with UTF-8 encoding for text mode. + + Wrapper around ``subprocess.run`` that sets ``encoding='utf-8'`` and + ``errors='replace'`` when ``text=True`` (or its alias + ``universal_newlines=True``) is passed, preventing charmap decode errors + on Windows. + + Note: ``errors='replace'`` substitutes U+FFFD for invalid bytes in + stdout/stderr rather than raising. This is intentional - subprocess + output is used for status display and diagnostics, not for security + analysis (parser results are read from JSON files separately). + Callers can override with ``errors='strict'`` if needed. + """ + if kwargs.get("text") or kwargs.get("universal_newlines"): + kwargs.setdefault("encoding", "utf-8") + kwargs.setdefault("errors", "replace") + return subprocess.run(*args, **kwargs) diff --git a/libs/openant-core/validate_dataset_schema.py b/libs/openant-core/validate_dataset_schema.py index 1312bce..7f65a7c 100755 --- a/libs/openant-core/validate_dataset_schema.py +++ b/libs/openant-core/validate_dataset_schema.py @@ -8,6 +8,7 @@ import json import sys +from utilities.file_io import read_json def validate_unit(unit, index): @@ -61,9 +62,7 @@ def validate_unit(unit, index): def validate_dataset(path): - with open(path) as f: - data = json.load(f) - + data = read_json(path) all_errors = [] units = data.get("units", [])