Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- MCP server installation now respects the `targets:` whitelist exactly like `apm install`: drop a non-listed runtime even when its `.cursor/`, `.codex/`, or other on-disk signal exists. Previously the MCP install path called `active_targets()` reading the singular `target:` key only, so projects whitelisting `targets: [copilot]` could still receive `~/.codex/config.toml` and `.cursor/mcp.json` writes from foreign signals. The fix audits both paths: (a) the call site at `local_bundle_handler.py` now forwards the canonical plural list; (b) the gate now delegates to the same `resolve_targets` resolver that backs `apm install` skills, so a malformed `targets:` field (conflicting `target:` + `targets:`, `targets: []`, or unknown target name) fails closed with the same `[x]` red error voice + remediation block. The same delegation closes a related asymmetry: a greenfield project (no `targets:`, no `--target` flag, no detected signals) used to silently fall back to `[copilot]` for MCP-only invocations, while `apm install` raised `NoHarnessError` on the same input -- both surfaces now error consistently. Drop lines now use the `[i] Skipped MCP config for X (active targets: Y)` format mirroring the canonical `Targets: X (source: Y)` provenance line. The `-g`/`--global` carve-out is unchanged: `apm install -g --mcp NAME` writes to user-scope (`~/.config/...`, `~/.codex/`, etc.) bypassing the project-scope gate by design. (#1335)
- Gemini CLI: `apm install -g --mcp NAME` now correctly writes to `~/.gemini/settings.json` (user scope) and `apm install` from outside the target project writes to `<project_root>/.gemini/settings.json` instead of `cwd`. Previously `--global` had no effect on Gemini and project-scope writes silently landed in the wrong directory. The matching opt-in gate and cleanup paths in `MCPIntegrator` are aligned in the same change. (#1299)
- Root `.apm/hooks/*.json` installs now use a stable local hook source id and heal stale same-content merged hook entries left behind by older checkout-derived `_apm_source` values, keeping Claude/Codex/Cursor/Gemini/Windsurf hook configs idempotent. (#1329)
- `apm install --target claude` now preserves self-defined stdio MCP `env` values from `apm.yml` and writes non-string values such as `PORT: 3000` and `DEBUG: false` as MCP-compatible strings. (#1222)
- Non-skill integrators (agent, instruction, prompt, command, hook script-copy) silently adopt byte-identical pre-existing files so a degraded `deployed_files=[]` lockfile no longer permanently blocks installs gated by `required-packages-deployed`. (#1313)
- `apm audit` drift check now returns skip-with-info (`passed=True`) when the install cache is cold, instead of failing the audit; bare `apm audit` surfaces the skip reason on stderr so CI pipelines that have not yet run `apm install` are not incorrectly red-marked. (#1289)
Expand Down
303 changes: 293 additions & 10 deletions src/apm_cli/integration/hook_integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@
from pathlib import Path
from typing import Dict, List, Optional, Tuple # noqa: F401, UP035

import yaml

from apm_cli.integration.base_integrator import BaseIntegrator, IntegrationResult
from apm_cli.utils.path_security import ensure_path_within
from apm_cli.utils.path_security import (
PathTraversalError,
ensure_path_within,
validate_path_segments,
)
from apm_cli.utils.paths import portable_relpath

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -506,17 +512,271 @@ def _rewrite_hooks_data(

return rewritten, unique_scripts

def _get_package_name(self, package_info) -> str:
@staticmethod
def _is_root_local_package(package_info, project_root: Path | None) -> bool:
"""Return True when *package_info* represents the project's own .apm content."""
if project_root is None:
return False
try:
return Path(package_info.install_path).resolve() == Path(project_root).resolve()
except (OSError, RuntimeError):
return False

@staticmethod
def _safe_source_name(value: str | None, fallback: str = "_local") -> str:
"""Return a stable source marker that is also safe for hook script paths."""
if not isinstance(value, str) or not value:
return fallback
safe = re.sub(r"[^A-Za-z0-9._-]+", "-", value.strip()).strip(".-_")
if not safe or safe in {".", ".."}:
return fallback
return safe

def _get_root_local_package_name(self, package_info, project_root: Path) -> str:
"""Get the stable source marker for root .apm content."""
apm_yml = Path(project_root) / "apm.yml"
if apm_yml.exists():
try:
from apm_cli.utils.yaml_io import load_yaml

data = load_yaml(apm_yml)
if isinstance(data, dict):
manifest_name = self._safe_source_name(data.get("name"))
if manifest_name != "_local":
return manifest_name
except (OSError, ValueError, yaml.YAMLError):
pass

package = getattr(package_info, "package", None)
package_name = self._safe_source_name(getattr(package, "name", None))
if package_name != "_local":
return package_name
return "_local"

def _get_package_name(self, package_info, project_root: Path | None = None) -> str:
"""Get a short package name for use in file/directory naming.

Args:
package_info: PackageInfo object

Returns:
str: Package name derived from install path
str: Package name used as hook source marker and script namespace
"""
if self._is_root_local_package(package_info, project_root):
return self._get_root_local_package_name(package_info, Path(project_root))
return package_info.install_path.name

def _get_hook_source_marker(
self,
package_info,
project_root: Path,
package_name: str,
) -> str:
"""Get the marker stored in merged hook JSON for ownership cleanup."""
if self._is_root_local_package(package_info, project_root):
if package_name == "_local":
return "_local"
return f"_local/{package_name}"
return package_name

@staticmethod
def _hook_entry_content_key(entry: dict) -> str:
"""Build a stable comparison key excluding APM ownership metadata."""
comparable = {k: v for k, v in sorted(entry.items()) if k != "_apm_source"}
return json.dumps(comparable, sort_keys=True, separators=(",", ":"))

@staticmethod
def _dependency_hook_sources(project_root: Path) -> set[str]:
"""Return source markers that correspond to installed dependency dirs."""
apm_modules = project_root / "apm_modules"
if not apm_modules.is_dir():
return set()

lockfile_paths, lockfile_readable = HookIntegrator._lockfile_dependency_paths(project_root)
if lockfile_readable:
sources: set[str] = set()
for rel_path in lockfile_paths:
package_path = HookIntegrator._safe_dependency_path(apm_modules, rel_path)
if package_path is None:
continue
HookIntegrator._add_dependency_source(sources, package_path)
return sources

return HookIntegrator._bounded_dependency_hook_sources(apm_modules)

@staticmethod
def _lockfile_dependency_paths(project_root: Path) -> tuple[list[str], bool]:
"""Return installed dependency paths from a readable lockfile, if present."""
try:
from apm_cli.deps.lockfile import LEGACY_LOCKFILE_NAME, LockFile, get_lockfile_path

lockfile_path = get_lockfile_path(project_root)
if not lockfile_path.exists():
legacy_path = project_root / LEGACY_LOCKFILE_NAME
if legacy_path.exists():
lockfile_path = legacy_path
if not lockfile_path.exists():
return [], False
lockfile = LockFile.read(lockfile_path)
if lockfile is None:
return [], False
return lockfile.get_installed_paths(project_root / "apm_modules"), True
except (AttributeError, OSError, TypeError, ValueError, KeyError):
return [], False

@staticmethod
def _safe_dependency_path(apm_modules: Path, rel_path: str) -> Path | None:
"""Return a lockfile dependency path without escaping apm_modules."""
try:
validate_path_segments(
rel_path,
context="lockfile dependency path",
reject_empty=True,
)
package_path = apm_modules / Path(rel_path)
ensure_path_within(package_path, apm_modules)
if HookIntegrator._has_symlink_component(apm_modules, package_path):
return None
return package_path
except (OSError, PathTraversalError, RuntimeError, TypeError):
return None

@staticmethod
def _has_symlink_component(apm_modules: Path, package_path: Path) -> bool:
"""Return True when any component below apm_modules is a symlink."""
try:
relative = package_path.relative_to(apm_modules)
current = apm_modules
for part in relative.parts:
current = current / part
if current.is_symlink():
return True
return False
except (OSError, ValueError):
return True

@staticmethod
def _is_dependency_package_dir(path: Path) -> bool:
"""Return True when *path* looks like an installed package root."""
try:
hooks = path / "hooks"
apm_hooks = path / ".apm" / "hooks"
apm_yml = path / "apm.yml"
skill_md = path / "SKILL.md"
return (
(hooks.is_dir() and not hooks.is_symlink())
or (apm_hooks.is_dir() and not apm_hooks.is_symlink())
or (apm_yml.is_file() and not apm_yml.is_symlink())
or (skill_md.is_file() and not skill_md.is_symlink())
)
except OSError:
return False

@staticmethod
def _add_dependency_source(sources: set[str], package_path: Path) -> bool:
"""Add package_path.name to sources when package_path is a package root."""
try:
if (
not package_path.is_dir()
or package_path.is_symlink()
or not HookIntegrator._is_dependency_package_dir(package_path)
):
return False
except OSError:
return False
sources.add(package_path.name)
return True

@staticmethod
def _child_dependency_dirs(path: Path) -> list[Path]:
"""Return direct non-hidden child dirs without following symlink roots."""
try:
if path.is_symlink() or not path.is_dir():
return []
return sorted(
[
child
for child in path.iterdir()
if not child.is_symlink() and child.is_dir() and not child.name.startswith(".")
],
key=lambda child: child.name,
)
except OSError:
return []

@staticmethod
def _collect_known_subdirectory_sources(sources: set[str], repo_root: Path) -> None:
"""Collect dependency sources from known virtual subdirectory layouts."""
for namespace in ("collections", "skills"):
for package_path in HookIntegrator._child_dependency_dirs(repo_root / namespace):
HookIntegrator._add_dependency_source(sources, package_path)

apm_dir = repo_root / ".apm"
try:
if apm_dir.is_symlink() or not apm_dir.is_dir():
return
except OSError:
return
for primitive in ("agents", "commands", "hooks", "instructions", "prompts", "skills"):
for package_path in HookIntegrator._child_dependency_dirs(apm_dir / primitive):
HookIntegrator._add_dependency_source(sources, package_path)

@staticmethod
def _collect_remote_dependency_sources(sources: set[str], namespace: Path) -> None:
"""Collect fallback sources from explicit remote install layouts."""
if HookIntegrator._add_dependency_source(sources, namespace):
return

for repo_or_project in HookIntegrator._child_dependency_dirs(namespace):
if HookIntegrator._add_dependency_source(sources, repo_or_project):
continue

HookIntegrator._collect_known_subdirectory_sources(sources, repo_or_project)

for ado_repo in HookIntegrator._child_dependency_dirs(repo_or_project):
if HookIntegrator._add_dependency_source(sources, ado_repo):
continue
HookIntegrator._collect_known_subdirectory_sources(sources, ado_repo)

@staticmethod
def _collect_local_dependency_sources(sources: set[str], local_namespace: Path) -> None:
"""Collect apm_modules/_local/<name> package roots only."""
for local_package in HookIntegrator._child_dependency_dirs(local_namespace):
HookIntegrator._add_dependency_source(sources, local_package)

@staticmethod
def _bounded_dependency_hook_sources(apm_modules: Path) -> set[str]:
"""Fallback source scan limited to known apm_modules package layouts."""
sources: set[str] = set()

for package_root in HookIntegrator._child_dependency_dirs(apm_modules):
if package_root.name == "_local":
HookIntegrator._collect_local_dependency_sources(sources, package_root)
continue

HookIntegrator._collect_remote_dependency_sources(sources, package_root)
return sources
Comment thread
imk1t marked this conversation as resolved.

def _should_remove_prior_merged_entry(
self,
entry,
*,
source_marker: str,
fresh_content_keys: set[str],
heal_stale_root_source: bool,
dependency_sources: set[str],
remove_current_source: bool,
) -> bool:
"""Return True when an existing merged-hook entry should be replaced."""
if not isinstance(entry, dict):
return False
source = entry.get("_apm_source")
if remove_current_source and source == source_marker:
return True
if not heal_stale_root_source or not source or source in dependency_sources:
return False
return self._hook_entry_content_key(entry) in fresh_content_keys
Comment on lines +776 to +778

def integrate_package_hooks(
self,
package_info,
Expand Down Expand Up @@ -556,7 +816,7 @@ def integrate_package_hooks(
hooks_dir = project_root / root_dir / "hooks"
hooks_dir.mkdir(parents=True, exist_ok=True)

package_name = self._get_package_name(package_info)
package_name = self._get_package_name(package_info, project_root)
hooks_integrated = 0
scripts_copied = 0
scripts_adopted = 0
Expand Down Expand Up @@ -662,7 +922,12 @@ def _integrate_merged_hooks(
if not hook_files:
return _empty

package_name = self._get_package_name(package_info)
package_name = self._get_package_name(package_info, project_root)
source_marker = self._get_hook_source_marker(package_info, project_root, package_name)
heal_stale_root_source = self._is_root_local_package(package_info, project_root)
dependency_sources = (
self._dependency_hook_sources(project_root) if heal_stale_root_source else set()
)
hooks_integrated = 0
scripts_copied = 0
scripts_adopted = 0
Expand Down Expand Up @@ -724,7 +989,12 @@ def _integrate_merged_hooks(
# Mark each entry with APM source for sync/cleanup
for entry in entries:
if isinstance(entry, dict):
entry["_apm_source"] = package_name
entry["_apm_source"] = source_marker
fresh_content_keys = {
self._hook_entry_content_key(entry)
for entry in entries
if isinstance(entry, dict)
}

# Idempotent upsert: drop any prior entries owned by this
# package before appending fresh ones. Without this, every
Expand All @@ -734,12 +1004,20 @@ def _integrate_merged_hooks(
# with multiple hook files targeting the same event
# contributes each file's entries in turn, and stripping
# on every iteration would erase earlier files' work.
if event_name not in cleared_events:
remove_current_source = event_name not in cleared_events
if remove_current_source or heal_stale_root_source:
# Clear from the normalised event
json_config["hooks"][event_name] = [
e
for e in json_config["hooks"][event_name]
if not (isinstance(e, dict) and e.get("_apm_source") == package_name)
if not self._should_remove_prior_merged_entry(
e,
source_marker=source_marker,
fresh_content_keys=fresh_content_keys,
heal_stale_root_source=heal_stale_root_source,
dependency_sources=dependency_sources,
remove_current_source=remove_current_source,
)
]
# Also clear from any alias events that map to
# this normalised name (handles migration from
Expand All @@ -749,8 +1027,13 @@ def _integrate_merged_hooks(
json_config["hooks"][alias] = [
e
for e in json_config["hooks"][alias]
if not (
isinstance(e, dict) and e.get("_apm_source") == package_name
if not self._should_remove_prior_merged_entry(
e,
source_marker=source_marker,
fresh_content_keys=fresh_content_keys,
heal_stale_root_source=heal_stale_root_source,
dependency_sources=dependency_sources,
remove_current_source=remove_current_source,
)
]
# Remove the alias key entirely if now empty
Expand Down
Loading