Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions src/apm_cli/deps/plugin_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import json
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional # noqa: F401, UP035
Expand All @@ -25,6 +26,38 @@
_logger = logging.getLogger(__name__)


class PluginIntegrityError(RuntimeError):
"""Raised when a plugin destination tree contains a pre-existing symlink.

Refusing to copy through a symlinked destination is defense-in-depth
for the data-loss-adjacent ``shutil.copytree(..., dirs_exist_ok=True)``
flow in ``_map_plugin_artifacts``. A malicious package shipping
``.apm/skills/<name>`` (or any other target_* subtree) as a symlink to
an external path (e.g. ``/etc``, ``$HOME/.ssh``) would otherwise
redirect writes outside the plugin root.
"""


def _assert_no_symlink_descendants(target: Path) -> None:
"""Refuse to copy when *target* or any of its descendants is a symlink.

Uses ``lstat``/``os.walk(followlinks=False)`` so the check itself does
not traverse a hostile symlink. No-op when *target* does not exist.
"""
if not target.exists() and not target.is_symlink():
return
if target.is_symlink():
raise PluginIntegrityError(f"Refusing to copy into symlinked plugin destination: {target}")
for root, dirs, files in os.walk(target, followlinks=False):
root_path = Path(root)
for name in dirs + files:
entry = root_path / name
if entry.is_symlink():
raise PluginIntegrityError(
f"Refusing to copy into plugin destination containing symlinked entry: {entry}"
)


def _surface_warning(message: str, logger: logging.Logger) -> None:
"""Emit a warning to both the stdlib logger and the rich console.

Expand Down Expand Up @@ -445,60 +478,77 @@ def _resolve_sources(component: str, default_dir: str):
return [default]
return []

# Helper: True when *src* and *dst* resolve to the same filesystem path
# (e.g. a manifest entry pointing at a file already inside the target).
# Copying onto self raises ``shutil.SameFileError`` and ``shutil.copytree``
# over identical directories triggers it per-file, so callers must skip.
def _is_same_path(src: Path, dst: Path) -> bool:
try:
return src.resolve() == dst.resolve()
except OSError:
return False

# Map agents/
# Unlike skills (which are named directories containing SKILL.md), agents
# are flat files -- each .md is one agent. So we always merge directory
# contents directly into .apm/agents/ (no nesting by dir name).
agent_sources = _resolve_sources("agents", "agents")
if agent_sources:
target_agents = apm_dir / "agents"
if target_agents.exists():
shutil.rmtree(target_agents)
_assert_no_symlink_descendants(target_agents)
agent_dirs = [s for s in agent_sources if s.is_dir()]
agent_files = [s for s in agent_sources if s.is_file()]
if agent_dirs:
shutil.copytree(agent_dirs[0], target_agents, ignore=ignore_non_content)
for extra in agent_dirs[1:]:
shutil.copytree(extra, target_agents, dirs_exist_ok=True, ignore=ignore_non_content)
for d in agent_dirs:
if _is_same_path(d, target_agents):
continue
shutil.copytree(d, target_agents, dirs_exist_ok=True, ignore=ignore_non_content)
if agent_files:
target_agents.mkdir(parents=True, exist_ok=True)
for f in agent_files:
shutil.copy2(f, target_agents / f.name)
dst = target_agents / f.name
if _is_same_path(f, dst):
continue
shutil.copy2(f, dst)

# Map skills/
skill_sources = _resolve_sources("skills", "skills")
if skill_sources:
target_skills = apm_dir / "skills"
if target_skills.exists():
shutil.rmtree(target_skills)
_assert_no_symlink_descendants(target_skills)
skill_dirs = [s for s in skill_sources if s.is_dir()]
skill_files = [s for s in skill_sources if s.is_file()]

is_custom_list = isinstance(manifest.get("skills"), list)
if is_custom_list and skill_dirs:
target_skills.mkdir(parents=True, exist_ok=True)
for d in skill_dirs:
nested = target_skills / d.name
if _is_same_path(d, nested):
continue
shutil.copytree(
d,
target_skills / d.name,
nested,
ignore=ignore_non_content,
dirs_exist_ok=True,
)
elif skill_dirs:
shutil.copytree(skill_dirs[0], target_skills, ignore=ignore_non_content)
for extra in skill_dirs[1:]:
shutil.copytree(extra, target_skills, dirs_exist_ok=True, ignore=ignore_non_content)
for d in skill_dirs:
if _is_same_path(d, target_skills):
continue
shutil.copytree(d, target_skills, dirs_exist_ok=True, ignore=ignore_non_content)
if skill_files:
target_skills.mkdir(parents=True, exist_ok=True)
for f in skill_files:
shutil.copy2(f, target_skills / f.name)
dst = target_skills / f.name
if _is_same_path(f, dst):
continue
shutil.copy2(f, dst)

# Map commands/ -> .apm/prompts/ (normalize .md -> .prompt.md)
command_sources = _resolve_sources("commands", "commands")
if command_sources:
target_prompts = apm_dir / "prompts"
if target_prompts.exists():
shutil.rmtree(target_prompts)
_assert_no_symlink_descendants(target_prompts)
target_prompts.mkdir(parents=True, exist_ok=True)

def _copy_command_file(source_file: Path, dest_dir: Path, rel_to: Path = None): # noqa: RUF013
Expand All @@ -511,6 +561,8 @@ def _copy_command_file(source_file: Path, dest_dir: Path, rel_to: Path = None):
if not source_file.name.endswith(".prompt.md") and source_file.suffix == ".md":
target_path = target_path.with_name(f"{source_file.stem}.prompt.md")
target_path.parent.mkdir(parents=True, exist_ok=True)
if _is_same_path(source_file, target_path):
return
shutil.copy2(source_file, target_path)

for source in command_sources:
Expand All @@ -528,6 +580,7 @@ def _copy_command_file(source_file: Path, dest_dir: Path, rel_to: Path = None):
if isinstance(hooks_value, dict):
# Inline hooks object -> write as .apm/hooks/hooks.json
target_hooks = apm_dir / "hooks"
_assert_no_symlink_descendants(target_hooks)
target_hooks.mkdir(parents=True, exist_ok=True)
(target_hooks / "hooks.json").write_text(json.dumps(hooks_value, indent=2))
elif isinstance(hooks_value, str) and (plugin_path / hooks_value).is_file():
Expand All @@ -537,24 +590,33 @@ def _copy_command_file(source_file: Path, dest_dir: Path, rel_to: Path = None):
pass
else:
target_hooks = apm_dir / "hooks"
_assert_no_symlink_descendants(target_hooks)
target_hooks.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, target_hooks / "hooks.json")
dst = target_hooks / "hooks.json"
if not _is_same_path(src_file, dst):
shutil.copy2(src_file, dst)
else:
# Directory path(s) -- standard flow
hook_sources = _resolve_sources("hooks", "hooks")
if hook_sources:
target_hooks = apm_dir / "hooks"
if target_hooks.exists():
shutil.rmtree(target_hooks)
shutil.copytree(hook_sources[0], target_hooks, ignore=ignore_non_content)
for extra in hook_sources[1:]:
shutil.copytree(extra, target_hooks, dirs_exist_ok=True, ignore=ignore_non_content)
_assert_no_symlink_descendants(target_hooks)
for d in hook_sources:
if _is_same_path(d, target_hooks):
continue
shutil.copytree(d, target_hooks, dirs_exist_ok=True, ignore=ignore_non_content)

# Pass-through files required for MCP/LSP plugins to function
for passthrough in (".mcp.json", ".lsp.json", "settings.json"):
source_file = plugin_path / passthrough
if source_file.exists() and not source_file.is_symlink():
shutil.copy2(source_file, apm_dir / passthrough)
dst = apm_dir / passthrough
if dst.is_symlink():
raise PluginIntegrityError(
f"Refusing to copy through symlinked plugin pass-through file: {dst}"
)
if not _is_same_path(source_file, dst):
shutil.copy2(source_file, dst)


def _generate_apm_yml(manifest: dict[str, Any]) -> str:
Expand Down
Loading
Loading