diff --git a/.github/actions/openmetadata-impact-analysis/action.yml b/.github/actions/openmetadata-impact-analysis/action.yml new file mode 100644 index 0000000..4e89563 --- /dev/null +++ b/.github/actions/openmetadata-impact-analysis/action.yml @@ -0,0 +1,104 @@ +name: OpenMetadata Impact Analysis +description: Analyze dbt model changes with OpenMetadata lineage and publish report artifacts. + +inputs: + diff-path: + description: Path to a unified git diff file. Generated from base-ref when missing. + required: false + default: changes.diff + metadata-host: + description: OpenMetadata or Collate host URL. + required: false + metadata-token: + description: Bot JWT token with read access to metadata. + required: false + openai-api-key: + description: OpenAI API key for the LangChain analysis agent. + required: false + base-ref: + description: Base ref used to generate the diff when diff-path does not exist. + required: false + default: origin/main + paths: + description: Comma-separated path globs to analyze. + required: false + default: "**/models/**/*.sql,**/models/**/*.yml,**/models/**/*.yaml" + report-path: + description: Markdown report path. + required: false + default: impact_report.md + html-report-path: + description: Static HTML report artifact path. + required: false + default: impact_report.html + metadata-output-path: + description: JSON metadata output path. + required: false + default: impact_metadata.json + python-version: + description: Python version used to run the analyzer. + required: false + default: "3.11" + +outputs: + report-path: + description: Markdown report path. + value: ${{ steps.run.outputs.report-path }} + html-report-path: + description: Static HTML report artifact path. + value: ${{ steps.run.outputs.html-report-path }} + risk-level: + description: Deterministic risk level for the analyzed PR. + value: ${{ steps.run.outputs.risk-level }} + affected-count: + description: Count of affected OpenMetadata assets discovered in the report. + value: ${{ steps.run.outputs.affected-count }} + +runs: + using: composite + steps: + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: ${{ inputs.python-version }} + cache: pip + cache-dependency-path: python/pyproject.toml + + - name: Generate diff when needed + shell: bash + run: | + if [ ! -f "${{ inputs.diff-path }}" ]; then + git diff "${{ inputs.base-ref }}...HEAD" > "${{ inputs.diff-path }}" + fi + + - name: Install Python dependencies + shell: bash + run: | + python -m pip install --upgrade pip + python -m pip install -e "python/[langchain]" langchain langchain-openai + + - name: Run impact analysis + id: run + shell: bash + env: + AI_SDK_HOST: ${{ inputs.metadata-host }} + AI_SDK_TOKEN: ${{ inputs.metadata-token }} + OPENAI_API_KEY: ${{ inputs.openai-api-key }} + run: | + python cookbook/mcp-impact-analysis/batch_analyzer.py "${{ inputs.diff-path }}" \ + --output "${{ inputs.report-path }}" \ + --html-output "${{ inputs.html-report-path }}" \ + --metadata-output "${{ inputs.metadata-output-path }}" \ + --paths "${{ inputs.paths }}" + + echo "report-path=${{ inputs.report-path }}" >> "$GITHUB_OUTPUT" + echo "html-report-path=${{ inputs.html-report-path }}" >> "$GITHUB_OUTPUT" + METADATA_OUTPUT_PATH="${{ inputs.metadata-output-path }}" python - <<'PY' >> "$GITHUB_OUTPUT" + import json + import os + from pathlib import Path + + metadata = json.loads(Path(os.environ["METADATA_OUTPUT_PATH"]).read_text()) + print(f"risk-level={metadata['risk_level']}") + print(f"affected-count={metadata['affected_count']}") + PY diff --git a/.github/workflows/impact-analysis.yml b/.github/workflows/impact-analysis.yml index 9c22b88..d97749d 100644 --- a/.github/workflows/impact-analysis.yml +++ b/.github/workflows/impact-analysis.yml @@ -1,10 +1,11 @@ name: Data Impact Analysis on: - # Disabling trigger, keeping it for demos - # pull_request: - # paths: - # - 'cookbook/resources/demo-database/dbt/models/**' + pull_request: + paths: + - "cookbook/resources/demo-database/dbt/models/**/*.sql" + - "cookbook/resources/demo-database/dbt/models/**/*.yml" + - "cookbook/resources/demo-database/dbt/models/**/*.yaml" workflow_dispatch: concurrency: @@ -13,15 +14,13 @@ concurrency: permissions: contents: read + pull-requests: write + issues: write jobs: analyze: name: Impact Analysis runs-on: ubuntu-latest - environment: test - permissions: - contents: read - pull-requests: write steps: - name: Checkout code @@ -29,42 +28,59 @@ jobs: with: fetch-depth: 0 - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: '3.11' - cache: 'pip' - cache-dependency-path: python/pyproject.toml - - - name: Install dependencies - run: | - pip install -e python/[langchain] - pip install langchain langchain-openai - - name: Generate diff + shell: bash run: | - git diff origin/main...HEAD > changes.diff - echo "--- Diff contents ---" - cat changes.diff + if [ "${{ github.event_name }}" = "pull_request" ]; then + git diff "${{ github.event.pull_request.base.sha }}...HEAD" > changes.diff + else + git fetch origin main --depth=1 + git diff origin/main...HEAD > changes.diff + fi - - name: Run impact analysis - working-directory: cookbook/mcp-impact-analysis - env: - AI_SDK_HOST: ${{ secrets.AI_SDK_HOST }} - AI_SDK_TOKEN: ${{ secrets.AI_SDK_TOKEN }} - OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} - run: python batch_analyzer.py ../../changes.diff > ../../impact_report.md + - name: Run OpenMetadata impact analysis + id: impact + uses: ./.github/actions/openmetadata-impact-analysis + with: + diff-path: changes.diff + metadata-host: ${{ secrets.AI_SDK_HOST }} + metadata-token: ${{ secrets.AI_SDK_TOKEN }} + openai-api-key: ${{ secrets.OPENAI_API_KEY }} + paths: "cookbook/resources/demo-database/dbt/models/**/*.sql,cookbook/resources/demo-database/dbt/models/**/*.yml,cookbook/resources/demo-database/dbt/models/**/*.yaml" + + - name: Find existing PR comment + if: github.event_name == 'pull_request' + uses: peter-evans/find-comment@v3 + id: find-comment + with: + issue-number: ${{ github.event.pull_request.number }} + comment-author: github-actions[bot] + body-includes: openmetadata-impact-analysis - name: Post PR comment if: github.event_name == 'pull_request' uses: peter-evans/create-or-update-comment@v4 with: + comment-id: ${{ steps.find-comment.outputs.comment-id }} issue-number: ${{ github.event.pull_request.number }} - body-path: impact_report.md + body-path: ${{ steps.impact.outputs.report-path }} + edit-mode: replace - - name: Write to step summary (manual trigger) - if: github.event_name == 'workflow_dispatch' + - name: Write to step summary + shell: bash run: | - echo "## Data Impact Analysis" >> $GITHUB_STEP_SUMMARY - echo "" >> $GITHUB_STEP_SUMMARY - cat impact_report.md >> $GITHUB_STEP_SUMMARY + echo "## Data Impact Analysis" >> "$GITHUB_STEP_SUMMARY" + echo "" >> "$GITHUB_STEP_SUMMARY" + echo "- Risk: \`${{ steps.impact.outputs.risk-level }}\`" >> "$GITHUB_STEP_SUMMARY" + echo "- Affected assets: \`${{ steps.impact.outputs.affected-count }}\`" >> "$GITHUB_STEP_SUMMARY" + echo "" >> "$GITHUB_STEP_SUMMARY" + cat "${{ steps.impact.outputs.report-path }}" >> "$GITHUB_STEP_SUMMARY" + + - name: Upload impact report artifact + uses: actions/upload-artifact@v4 + with: + name: openmetadata-impact-analysis + path: | + ${{ steps.impact.outputs.report-path }} + ${{ steps.impact.outputs.html-report-path }} + impact_metadata.json diff --git a/cookbook/mcp-impact-analysis/README.md b/cookbook/mcp-impact-analysis/README.md index a4aa8e8..9a972ac 100644 --- a/cookbook/mcp-impact-analysis/README.md +++ b/cookbook/mcp-impact-analysis/README.md @@ -248,29 +248,49 @@ Assets that could benefit from `loyalty_tier`: The [`batch_analyzer.py`](./batch_analyzer.py) script extends the interactive agent for automated use in pull requests. It: -1. Reads a `git diff` file and extracts every changed `.sql` file under a `models/` directory. -2. Spins up the same agent from `impact_analyzer.py`. -3. Iterates over each changed model and asks the agent for its downstream impact. -4. Prints a combined Markdown report suitable for posting as a PR comment. +1. Reads a `git diff` file and extracts changed dbt `.sql`, `.yml`, and `.yaml` files under a `models/` directory. +2. Uses the MCP-backed agent from `impact_analyzer.py` when credentials are available. +3. Falls back to a deterministic review report when AI credentials are missing, so CI still produces useful output. +4. Scores risk from affected assets, data-quality signals, PII/governance terms, and critical business asset mentions. +5. Writes Markdown, optional static HTML, and optional JSON metadata for GitHub Action outputs. Run it locally: ```bash git diff origin/main...HEAD > changes.diff -python batch_analyzer.py changes.diff +python batch_analyzer.py changes.diff \ + --output impact_report.md \ + --html-output impact_report.html \ + --metadata-output impact_metadata.json ``` ### GitHub Actions Integration -This repository includes a ready-to-use workflow at [`.github/workflows/impact-analysis.yml`](../../.github/workflows/impact-analysis.yml). It: +This repository includes a reusable composite action at [`.github/actions/openmetadata-impact-analysis/action.yml`](../../.github/actions/openmetadata-impact-analysis/action.yml) and a ready-to-use workflow at [`.github/workflows/impact-analysis.yml`](../../.github/workflows/impact-analysis.yml). The workflow: 1. **Triggers automatically** on PRs that modify dbt models under `cookbook/resources/demo-database/dbt/models/`. 2. **Supports manual dispatch** via the Actions tab — useful for demos or ad-hoc runs on any branch. 3. **Generates a diff** between the PR branch and `origin/main`. -4. **Runs the batch analyzer** against the diff to produce a Markdown impact report. -5. **Posts a PR comment** with the full report (or writes to the GitHub Step Summary for manual runs). +4. **Runs the reusable action** against the diff to produce Markdown, HTML, and JSON outputs. +5. **Posts or updates one PR comment** with the full report. +6. **Uploads the HTML report** as a workflow artifact for review and demos. -The workflow uses an HTML comment marker (``) to find and update its own comment on subsequent pushes, so you only ever see one impact analysis comment per PR. +The report uses an HTML comment marker (``) to find and update its own comment on subsequent pushes, so you only ever see one impact analysis comment per PR. + +#### Reusable Action + +Add this action to another repository after checking out code and generating a diff: + +```yaml +- name: Run OpenMetadata impact analysis + uses: open-metadata/ai-sdk/.github/actions/openmetadata-impact-analysis@main + with: + diff-path: changes.diff + metadata-host: ${{ secrets.AI_SDK_HOST }} + metadata-token: ${{ secrets.AI_SDK_TOKEN }} + openai-api-key: ${{ secrets.OPENAI_API_KEY }} + paths: "**/models/**/*.sql,**/models/**/*.yml,**/models/**/*.yaml" +``` #### Required Secrets @@ -282,6 +302,8 @@ Configure these in your repository settings under **Settings > Secrets and varia | `AI_SDK_TOKEN` | A bot JWT token with read access to metadata | | `OPENAI_API_KEY` | OpenAI API key for the LLM | +If these secrets are not present, the action still completes and emits a fallback report that tells reviewers which changed models need manual OpenMetadata review. + #### Demo Walkthrough To see the workflow in action: diff --git a/cookbook/mcp-impact-analysis/batch_analyzer.py b/cookbook/mcp-impact-analysis/batch_analyzer.py index 4f5258e..57b4e45 100644 --- a/cookbook/mcp-impact-analysis/batch_analyzer.py +++ b/cookbook/mcp-impact-analysis/batch_analyzer.py @@ -1,118 +1,202 @@ -""" -Batch Impact Analyzer for CI/CD Integration - -Analyzes the impact of all dbt model changes in a PR. +"""Batch impact analyzer for CI/CD integration. Usage: - git diff origin/main...HEAD > changes.diff - python batch_analyzer.py changes.diff - -Environment variables required: - AI_SDK_HOST - Your OpenMetadata server URL - AI_SDK_TOKEN - Your bot's JWT token - OPENAI_API_KEY - OpenAI API key + python batch_analyzer.py changes.diff --output impact_report.md """ +from __future__ import annotations + +import argparse +import json import sys from pathlib import Path -from impact_analyzer import create_impact_analyzer, analyze_change +from typing import Sequence + +from diff_parser import ChangedAsset, parse_changed_assets, split_path_filters +from report_renderer import ( + render_html_report, + render_markdown_report, + render_no_change_report, +) +from risk_scoring import ImpactScore, RiskLevel, score_impact def get_changed_models(diff_output: str) -> dict[str, str]: - """Extract changed dbt model names and their diff hunks from git diff. - - Returns a mapping of model name to the relevant diff snippet. - """ - models: dict[str, str] = {} - current_file: str | None = None - current_lines: list[str] = [] - - for line in diff_output.split("\n"): - if line.startswith("diff --git"): - # Flush previous file - if current_file is not None: - models[current_file] = "\n".join(current_lines) - current_file = None - current_lines = [line] - elif line.startswith("+++ b/") and line.endswith(".sql"): - path = Path(line.replace("+++ b/", "")) - if "models" in path.parts: - current_file = path.stem - current_lines.append(line) - else: - current_lines.append(line) - - # Flush last file - if current_file is not None: - models[current_file] = "\n".join(current_lines) - - return models - - -def main(): - if len(sys.argv) < 2: - print("Usage: python batch_analyzer.py ") - print("\nExample:") - print(" git diff origin/main...HEAD > changes.diff") - print(" python batch_analyzer.py changes.diff") - sys.exit(1) - - diff_path = Path(sys.argv[1]) + """Backward-compatible helper returning changed model/schema diffs by asset name.""" - if not diff_path.exists(): - print(f"Error: File not found: {diff_path}") - sys.exit(1) + return {asset.name: asset.diff for asset in parse_changed_assets(diff_output)} - diff_output = diff_path.read_text(encoding="utf-8") - models_with_diffs = get_changed_models(diff_output) +def build_change_prompt(asset: ChangedAsset) -> str: + """Build the prompt sent to the OpenMetadata MCP-backed agent.""" - if not models_with_diffs: - print("No dbt model changes detected.") - print("\nLooking for files matching pattern: +++ b/*/models/**/*.sql") - sys.exit(0) + return ( + f"The dbt {asset.kind} '{asset.name}' has been {asset.change_type}. " + f"The changed file is '{asset.path}'. Here is the git diff:\n\n" + f"```diff\n{asset.diff}\n```\n\n" + "Start with a brief explanation of what the diff is doing. Then analyze " + "which downstream assets are affected, who owns them, whether data-quality " + "tests may fail, and whether PII or governance tags require review." + ) - model_names = list(models_with_diffs.keys()) - print(f"# Impact Analysis Report\n") - print(f"Analyzing {len(model_names)} changed model(s): {', '.join(model_names)}\n") - executor, client = create_impact_analyzer() +def main(argv: Sequence[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) - all_reports = [] + diff_path = Path(args.diff_file) + if not diff_path.exists(): + print(f"Error: File not found: {diff_path}", file=sys.stderr) + return 1 - try: - for model, diff_snippet in models_with_diffs.items(): - print(f"\n{'='*60}") - print(f"## {model}") - print("="*60 + "\n") - - result = analyze_change( - executor, - f"The dbt model '{model}' has been modified. " - f"Here is the git diff showing what changed:\n\n" - f"```diff\n{diff_snippet}\n```\n\n" - f"Start your Impact Summary with a brief explanation of what " - f"the diff is doing (e.g. column renamed, filter added, new " - f"calculation). Then analyze: what downstream assets are " - f"affected and who should be notified?", + diff_output = diff_path.read_text(encoding="utf-8") + changed_assets = parse_changed_assets(diff_output, split_path_filters(args.paths)) + + if not changed_assets: + report = render_no_change_report() + _write_outputs(report, args.output, args.html_output) + _write_metadata_output( + args.metadata_output, + ImpactScore(RiskLevel.LOW, 0, ["no dbt model or schema changes detected"]), + ) + if args.output is None: + print(report) + return 0 + + analyses, warnings = _analyze_assets(changed_assets, use_ai=not args.no_ai) + score = score_impact("\n\n".join(analyses.values()), changed_assets) + report = render_markdown_report(changed_assets, analyses, score, warnings) + + _write_outputs(report, args.output, args.html_output) + _write_metadata_output(args.metadata_output, score) + + if args.output is None: + print(report) + else: + print(f"Wrote impact report to {args.output}") + + return 2 if args.fail_on_critical and score.level is RiskLevel.CRITICAL else 0 + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Analyze OpenMetadata impact from a git diff.") + parser.add_argument("diff_file", help="Path to a unified git diff file.") + parser.add_argument("--output", help="Write Markdown report to this path.") + parser.add_argument("--html-output", help="Write a static HTML report artifact to this path.") + parser.add_argument("--metadata-output", help="Write JSON metadata for GitHub Action outputs.") + parser.add_argument( + "--paths", + help="Comma-separated path globs to analyze. Defaults to dbt models/**/*.sql,yml,yaml.", + ) + parser.add_argument( + "--no-ai", + action="store_true", + help="Skip the MCP/LLM analyzer and emit deterministic fallback sections.", + ) + parser.add_argument( + "--fail-on-critical", + action="store_true", + help="Exit with code 2 when deterministic scoring marks the PR critical.", + ) + return parser + + +def _analyze_assets( + changed_assets: Sequence[ChangedAsset], + *, + use_ai: bool, +) -> tuple[dict[str, str], list[str]]: + warnings: list[str] = [] + analyses: dict[str, str] = {} + agent = None + client = None + + if use_ai: + try: + from impact_analyzer import analyze_change, create_impact_analyzer + + agent, client = create_impact_analyzer() + except Exception as exc: # noqa: BLE001 - degraded CI report is intentional. + warnings.append( + "AI analysis unavailable; deterministic fallback used. " + f"Reason: {exc.__class__.__name__}: {exc}" ) + use_ai = False - print(result["analysis"]) - all_reports.append({"model": model, **result}) - - # Summary - print("\n" + "="*60) - print("## Summary") - print("="*60 + "\n") - print("| Model | Status |") - print("|-------|--------|") - for report in all_reports: - status = "Done" if report.get("analysis") else "No output" - print(f"| {report['model']} | {status} |") - + try: + for asset in changed_assets: + if use_ai and agent is not None: + try: + result = analyze_change(agent, build_change_prompt(asset)) + analyses[asset.name] = result.get("analysis") or _fallback_analysis(asset) + continue + except Exception as exc: # noqa: BLE001 - keep other assets reportable. + warnings.append( + f"AI analysis failed for {asset.name}; fallback used. " + f"Reason: {exc.__class__.__name__}: {exc}" + ) + + analyses[asset.name] = _fallback_analysis(asset) finally: - client.close() + if client is not None: + client.close() + + return analyses, warnings + + +def _fallback_analysis(asset: ChangedAsset) -> str: + return "\n".join( + [ + "## What Changed", + f"`{asset.path}` was {asset.change_type}.", + "", + "## Impact Summary", + "OpenMetadata credentials or AI analysis were not available, so this report " + "could not query live lineage. Treat this as a review prompt, not a final " + "blast-radius assessment.", + "", + "## Affected Assets", + "Downstream assets were not resolved in fallback mode.", + "", + "## Risk Assessment", + "- Data Quality: Review tests for changed columns in this model or schema file.", + "- Compliance: Check whether changed columns contain PII or sensitive tags.", + "- Business: Review downstream dashboards and pipelines in OpenMetadata before merge.", + "", + "## Recommended Actions", + "1. Run this action with `AI_SDK_HOST`, `AI_SDK_TOKEN`, and `OPENAI_API_KEY`.", + "2. Review the OpenMetadata lineage graph for this asset.", + "3. Ask affected owners to approve high-risk model changes.", + ] + ) + + +def _write_outputs( + markdown_report: str, + output_path: str | None, + html_output_path: str | None, +) -> None: + if output_path: + Path(output_path).write_text(markdown_report, encoding="utf-8") + if html_output_path: + Path(html_output_path).write_text(render_html_report(markdown_report), encoding="utf-8") + + +def _write_metadata_output(output_path: str | None, score: ImpactScore) -> None: + if output_path is None: + return + Path(output_path).write_text( + json.dumps( + { + "risk_level": score.level.value, + "affected_count": score.affected_count, + "reasons": score.reasons, + }, + indent=2, + ), + encoding="utf-8", + ) if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/cookbook/mcp-impact-analysis/diff_parser.py b/cookbook/mcp-impact-analysis/diff_parser.py new file mode 100644 index 0000000..2df3ec2 --- /dev/null +++ b/cookbook/mcp-impact-analysis/diff_parser.py @@ -0,0 +1,153 @@ +"""Parse git diffs into dbt/OpenMetadata assets for impact analysis.""" + +from __future__ import annotations + +import fnmatch +import re +from dataclasses import dataclass +from pathlib import PurePosixPath +from typing import Sequence + + +MODEL_EXTENSIONS = {".sql"} +SCHEMA_EXTENSIONS = {".yml", ".yaml"} +DEFAULT_PATH_FILTERS = ("**/models/**/*.sql", "**/models/**/*.yml", "**/models/**/*.yaml") + + +@dataclass(frozen=True) +class ChangedAsset: + """A dbt model or schema file changed in a pull request.""" + + name: str + path: str + kind: str + change_type: str + diff: str + + +def parse_changed_assets( + diff_output: str, + path_filters: Sequence[str] | None = None, +) -> list[ChangedAsset]: + """Extract changed dbt model/schema assets from a unified git diff.""" + + filters = tuple(path_filters or DEFAULT_PATH_FILTERS) + assets: list[ChangedAsset] = [] + + for block in _split_diff_blocks(diff_output): + asset = _parse_diff_block(block, filters) + if asset is not None: + assets.append(asset) + + return assets + + +def split_path_filters(raw_filters: str | None) -> tuple[str, ...] | None: + """Split a comma-separated path filter string for CLI/GitHub Action use.""" + + if not raw_filters: + return None + filters = tuple(item.strip() for item in raw_filters.split(",") if item.strip()) + return filters or None + + +def _split_diff_blocks(diff_output: str) -> list[str]: + blocks: list[list[str]] = [] + current: list[str] = [] + + for line in diff_output.splitlines(): + if line.startswith("diff --git "): + if current: + blocks.append(current) + current = [line] + elif current: + current.append(line) + + if current: + blocks.append(current) + + return ["\n".join(block).rstrip("\n") for block in blocks] + + +def _parse_diff_block(block: str, path_filters: Sequence[str]) -> ChangedAsset | None: + old_path, new_path = _extract_paths(block) + selected_path = new_path if new_path and new_path != "/dev/null" else old_path + if selected_path is None or selected_path == "/dev/null": + return None + + normalized_path = _normalize_path(selected_path) + if not _matches_path_filters(normalized_path, path_filters): + return None + + path = PurePosixPath(normalized_path) + if "models" not in path.parts: + return None + + kind = _asset_kind(path) + if kind is None: + return None + + return ChangedAsset( + name=path.stem, + path=normalized_path, + kind=kind, + change_type=_change_type(block), + diff=block, + ) + + +def _extract_paths(block: str) -> tuple[str | None, str | None]: + old_path: str | None = None + new_path: str | None = None + + for line in block.splitlines(): + if line.startswith("--- "): + old_path = _strip_diff_prefix(line[4:]) + elif line.startswith("+++ "): + new_path = _strip_diff_prefix(line[4:]) + + if old_path is None or new_path is None: + match = re.match(r"diff --git a/(.*?) b/(.*)", block) + if match: + old_path = old_path or match.group(1) + new_path = new_path or match.group(2) + + return old_path, new_path + + +def _strip_diff_prefix(path: str) -> str: + if path in {"/dev/null", "dev/null"}: + return "/dev/null" + if path.startswith("a/") or path.startswith("b/"): + return path[2:] + return path + + +def _normalize_path(path: str) -> str: + return path.replace("\\", "/").lstrip("/") + + +def _asset_kind(path: PurePosixPath) -> str | None: + suffix = path.suffix.lower() + if suffix in MODEL_EXTENSIONS: + return "model" + if suffix in SCHEMA_EXTENSIONS: + return "schema" + return None + + +def _change_type(block: str) -> str: + if "deleted file mode" in block or "\n+++ /dev/null" in block: + return "deleted" + if "new file mode" in block or "\n--- /dev/null" in block: + return "added" + if "\nrename from " in block and "\nrename to " in block: + return "renamed" + return "modified" + + +def _matches_path_filters(path: str, path_filters: Sequence[str]) -> bool: + return any( + fnmatch.fnmatch(path, pattern) or fnmatch.fnmatch(f"/{path}", pattern) + for pattern in path_filters + ) diff --git a/cookbook/mcp-impact-analysis/report_renderer.py b/cookbook/mcp-impact-analysis/report_renderer.py new file mode 100644 index 0000000..89123b7 --- /dev/null +++ b/cookbook/mcp-impact-analysis/report_renderer.py @@ -0,0 +1,140 @@ +"""Render GitHub-friendly impact-analysis reports.""" + +from __future__ import annotations + +import html +from typing import Mapping, Sequence + +from diff_parser import ChangedAsset +from risk_scoring import ImpactScore, RiskLevel + + +COMMENT_MARKER = "" + + +def render_no_change_report() -> str: + """Render a successful report for PRs with no relevant dbt changes.""" + + return "\n".join( + [ + COMMENT_MARKER, + "# OpenMetadata Impact Analysis", + "", + "No dbt model or schema changes detected.", + "", + "The analyzer looked for changed files under dbt `models/` paths with " + "`.sql`, `.yml`, or `.yaml` extensions.", + "", + ] + ) + + +def render_markdown_report( + changed_assets: Sequence[ChangedAsset], + analyses: Mapping[str, str], + score: ImpactScore, + warnings: Sequence[str] | None = None, +) -> str: + """Render the final Markdown report for a PR comment or step summary.""" + + lines = [ + COMMENT_MARKER, + "# OpenMetadata Impact Analysis", + "", + "## Impact Summary", + "", + f"**Overall risk:** `{score.level.value.upper()}`", + f"**Affected assets found:** `{score.affected_count}`", + "", + "**Risk signals:**", + *[f"- {reason}" for reason in score.reasons], + "", + "## Changed Assets", + "", + ] + + if changed_assets: + lines.extend( + f"- `{asset.name}` ({asset.kind}, {asset.change_type}) - `{asset.path}`" + for asset in changed_assets + ) + else: + lines.append("- No dbt model or schema changes detected.") + + if warnings: + lines.extend(["", "## Warnings", ""]) + lines.extend(f"- {warning}" for warning in warnings) + + for asset_name, analysis in analyses.items(): + lines.extend(["", f"## Analysis: {asset_name}", "", analysis.strip(), ""]) + + lines.extend( + [ + "", + "## Reviewer Checklist", + "", + "- [ ] Review downstream owners before merge.", + "- [ ] Confirm impacted dashboards and pipelines are expected.", + "- [ ] Update or add data-quality tests for changed columns.", + "- [ ] Check governance tags for PII or sensitive data changes.", + "- [ ] Link follow-up migration or rollback tasks if risk is high.", + "", + ] + ) + + return "\n".join(lines) + + +def render_html_report(markdown_report: str, title: str = "OpenMetadata Impact Analysis") -> str: + """Render a dependency-free static HTML artifact from the Markdown report.""" + + safe_title = html.escape(title) + safe_report = html.escape(markdown_report) + risk_class = _risk_class(markdown_report) + return f""" + + + + + {safe_title} + + + +
+
{safe_report}
+
+ + +""" + + +def _risk_class(markdown_report: str) -> str: + if f"`{RiskLevel.CRITICAL.value.upper()}`" in markdown_report: + return "#cf222e" + if f"`{RiskLevel.HIGH.value.upper()}`" in markdown_report: + return "#bc4c00" + if f"`{RiskLevel.MEDIUM.value.upper()}`" in markdown_report: + return "#bf8700" + return "#1a7f37" diff --git a/cookbook/mcp-impact-analysis/risk_scoring.py b/cookbook/mcp-impact-analysis/risk_scoring.py new file mode 100644 index 0000000..2f121a2 --- /dev/null +++ b/cookbook/mcp-impact-analysis/risk_scoring.py @@ -0,0 +1,113 @@ +"""Deterministic risk scoring for impact-analysis reports.""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from enum import Enum +from typing import Sequence + +from diff_parser import ChangedAsset + + +class RiskLevel(str, Enum): + """Human-facing risk labels for PR comments and workflow outputs.""" + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +@dataclass(frozen=True) +class ImpactScore: + """Risk score summary derived from analysis text and changed assets.""" + + level: RiskLevel + affected_count: int + reasons: list[str] + + +PII_PATTERN = re.compile( + r"\b(pii|sensitive|email|ssn|card|ip_address|phone|address|gdpr|ccpa)\b", + re.IGNORECASE, +) +QUALITY_PATTERN = re.compile( + r"\b(data quality|dq|test|tests|not_null|unique|accepted_values|fail|failing)\b", + re.IGNORECASE, +) +CRITICAL_BUSINESS_PATTERN = re.compile( + r"\b(finance|revenue|executive|board|dashboard|sla|critical|sox)\b", + re.IGNORECASE, +) +NO_DOWNSTREAM_PATTERN = re.compile( + r"\b(no downstream assets|no assets are .*affected|no affected assets)\b", + re.IGNORECASE, +) +MARKDOWN_LINK_PATTERN = re.compile(r"\[[^\]]+\]\(([^)]+)\)") + + +def score_impact(analysis_text: str, changed_assets: Sequence[ChangedAsset]) -> ImpactScore: + """Score risk from report content without trusting the LLM for the label.""" + + affected_count = _affected_count(analysis_text) + points = 0 + reasons: list[str] = [] + + if affected_count >= 5: + points += 3 + reasons.append("many downstream assets detected") + elif affected_count >= 3: + points += 2 + reasons.append("multiple downstream assets detected") + elif affected_count > 0: + points += 1 + reasons.append("downstream assets detected") + + if any(asset.change_type == "deleted" for asset in changed_assets): + points += 2 + reasons.append("model or schema deletion detected") + + if PII_PATTERN.search(analysis_text): + points += 2 + reasons.append("PII or sensitive data mentioned") + + if QUALITY_PATTERN.search(analysis_text): + points += 1 + reasons.append("data quality impact mentioned") + + if CRITICAL_BUSINESS_PATTERN.search(analysis_text): + points += 2 + reasons.append("critical business asset mentioned") + + if affected_count == 0 and NO_DOWNSTREAM_PATTERN.search(analysis_text): + points = min(points, 1) + reasons = ["no downstream assets detected"] + + return ImpactScore( + level=_risk_level(points), + affected_count=affected_count, + reasons=reasons or ["no material risk signals detected"], + ) + + +def _affected_count(analysis_text: str) -> int: + links = { + match.group(1) + for match in MARKDOWN_LINK_PATTERN.finditer(analysis_text) + if "/table/" in match.group(1) + or "/dashboard/" in match.group(1) + or "/pipeline/" in match.group(1) + or "/chart/" in match.group(1) + } + return len(links) + + +def _risk_level(points: int) -> RiskLevel: + if points >= 6: + return RiskLevel.CRITICAL + if points >= 4: + return RiskLevel.HIGH + if points >= 2: + return RiskLevel.MEDIUM + return RiskLevel.LOW diff --git a/cookbook/mcp-impact-analysis/tests/conftest.py b/cookbook/mcp-impact-analysis/tests/conftest.py new file mode 100644 index 0000000..094b8d8 --- /dev/null +++ b/cookbook/mcp-impact-analysis/tests/conftest.py @@ -0,0 +1,11 @@ +"""Test configuration for the MCP impact-analysis cookbook.""" + +from __future__ import annotations + +import sys +from pathlib import Path + + +COOKBOOK_DIR = Path(__file__).resolve().parents[1] +if str(COOKBOOK_DIR) not in sys.path: + sys.path.insert(0, str(COOKBOOK_DIR)) diff --git a/cookbook/mcp-impact-analysis/tests/test_batch_analyzer.py b/cookbook/mcp-impact-analysis/tests/test_batch_analyzer.py new file mode 100644 index 0000000..8fd029b --- /dev/null +++ b/cookbook/mcp-impact-analysis/tests/test_batch_analyzer.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from pathlib import Path + +from batch_analyzer import main + + +def test_main_writes_no_change_report_without_creating_agent(tmp_path: Path) -> None: + diff_path = tmp_path / "changes.diff" + output_path = tmp_path / "impact_report.md" + html_path = tmp_path / "impact_report.html" + diff_path.write_text( + "diff --git a/README.md b/README.md\n" + "--- a/README.md\n" + "+++ b/README.md\n" + "@@ -1 +1 @@\n" + "-old\n" + "+new\n", + encoding="utf-8", + ) + + exit_code = main( + [ + str(diff_path), + "--output", + str(output_path), + "--html-output", + str(html_path), + ] + ) + + assert exit_code == 0 + assert "No dbt model or schema changes detected." in output_path.read_text(encoding="utf-8") + assert "No dbt model or schema changes detected." in html_path.read_text(encoding="utf-8") diff --git a/cookbook/mcp-impact-analysis/tests/test_diff_parser.py b/cookbook/mcp-impact-analysis/tests/test_diff_parser.py new file mode 100644 index 0000000..6cb2051 --- /dev/null +++ b/cookbook/mcp-impact-analysis/tests/test_diff_parser.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from diff_parser import ChangedAsset, parse_changed_assets + + +def test_parse_changed_assets_detects_sql_and_schema_files() -> None: + diff = """diff --git a/models/staging/stg_orders.sql b/models/staging/stg_orders.sql +index 111..222 100644 +--- a/models/staging/stg_orders.sql ++++ b/models/staging/stg_orders.sql +@@ -1,2 +1,2 @@ +-select id, status from raw.orders ++select id, status as order_status from raw.orders +diff --git a/models/staging/_schema.yml b/models/staging/_schema.yml +index 333..444 100644 +--- a/models/staging/_schema.yml ++++ b/models/staging/_schema.yml +@@ -1,3 +1,5 @@ + models: + - name: stg_orders ++ tests: ++ - not_null +diff --git a/README.md b/README.md +index 555..666 100644 +--- a/README.md ++++ b/README.md +@@ -1 +1 @@ +-old ++new +""" + + assets = parse_changed_assets(diff) + + assert assets == [ + ChangedAsset( + name="stg_orders", + path="models/staging/stg_orders.sql", + kind="model", + change_type="modified", + diff="diff --git a/models/staging/stg_orders.sql b/models/staging/stg_orders.sql\n" + "index 111..222 100644\n" + "--- a/models/staging/stg_orders.sql\n" + "+++ b/models/staging/stg_orders.sql\n" + "@@ -1,2 +1,2 @@\n" + "-select id, status from raw.orders\n" + "+select id, status as order_status from raw.orders", + ), + ChangedAsset( + name="_schema", + path="models/staging/_schema.yml", + kind="schema", + change_type="modified", + diff="diff --git a/models/staging/_schema.yml b/models/staging/_schema.yml\n" + "index 333..444 100644\n" + "--- a/models/staging/_schema.yml\n" + "+++ b/models/staging/_schema.yml\n" + "@@ -1,3 +1,5 @@\n" + " models:\n" + " - name: stg_orders\n" + "+ tests:\n" + "+ - not_null", + ), + ] + + +def test_parse_changed_assets_handles_deleted_models() -> None: + diff = """diff --git a/models/marts/fct_orders.sql b/models/marts/fct_orders.sql +deleted file mode 100644 +index 111..000 +--- a/models/marts/fct_orders.sql ++++ /dev/null +@@ -1,2 +0,0 @@ +-select * from int_orders +""" + + assets = parse_changed_assets(diff) + + assert len(assets) == 1 + assert assets[0].name == "fct_orders" + assert assets[0].path == "models/marts/fct_orders.sql" + assert assets[0].kind == "model" + assert assets[0].change_type == "deleted" diff --git a/cookbook/mcp-impact-analysis/tests/test_report_renderer.py b/cookbook/mcp-impact-analysis/tests/test_report_renderer.py new file mode 100644 index 0000000..ea009eb --- /dev/null +++ b/cookbook/mcp-impact-analysis/tests/test_report_renderer.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from diff_parser import ChangedAsset +from report_renderer import COMMENT_MARKER, render_html_report, render_markdown_report +from risk_scoring import ImpactScore, RiskLevel + + +def test_render_markdown_report_includes_marker_and_required_sections() -> None: + report = render_markdown_report( + changed_assets=[ChangedAsset("stg_orders", "models/staging/stg_orders.sql", "model", "modified", "")], + analyses={"stg_orders": "## What Changed\nStatus was renamed.\n"}, + score=ImpactScore(RiskLevel.MEDIUM, 1, ["downstream assets detected"]), + warnings=["AI analysis unavailable; deterministic fallback used."], + ) + + assert report.startswith(COMMENT_MARKER) + assert "## Impact Summary" in report + assert "## Changed Assets" in report + assert "## Analysis: stg_orders" in report + assert "## Reviewer Checklist" in report + assert "AI analysis unavailable" in report + + +def test_render_html_report_escapes_analysis_content() -> None: + html = render_html_report( + markdown_report="## Report\n", + title="OpenMetadata Impact Analysis", + ) + + assert "