diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c79d18b..983e8fc 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,18 @@ +## v2.17.1 (Patch — security hardening) + +Five must-fix findings from a draconian security/logic review of the existing code (no new features), each verified against source and pinned by a regression test. + +### Security + +- **Path traversal via client-certificate Common Name.** `create_client_certificate` built the on-disk identifier straight from the caller-supplied Common Name (only spaces replaced) and ran `mkdir(parents=True)` before any other check, so a CN such as `../../../../tmp/evil` created — and wrote a CA-signed key, certificate and metadata into — a directory outside the managed cert tree (single and batch issuance, operator role). The pre-existing `_validate_identifier` guard was only ever applied on retrieval, never to the value that constructs the identifier. The Common Name is now slugified at the construction site (covering every caller) to a filesystem-safe token, with a containment assertion before `mkdir`. Benign Common Names slugify to themselves so existing identifiers and paths are unchanged, and the full Common Name is still preserved verbatim in the certificate subject and metadata. +- **Backup restore downgraded private keys to world-readable.** `restore_unified_backup` set `0600` only for the exact filename `privkey.pem` and `0644` for everything else, but certbot keeps the real key bytes in `archive//privkeyN.pem` (the `live/privkey.pem` symlink points at them) and the ACME account key in `accounts/.../private_key.json` — both retained in the unified backup — so a restore actively rewrote served key material and the account key to `0644`. Restore now mirrors certbot's own permissions: every private-key filename is locked to `0600` and public material (cert/chain/fullchain plus metadata json) stays `0644`. +- **`GET /api/metrics` was reachable unauthenticated.** The endpoint carried no auth decorator while its sibling info endpoints (`/api/cache`, `/api/backups`) require the `viewer` role. It now requires a viewer credential to match its peers; the Prometheus scrape target remains the separate, intentionally public `/metrics` route. +- **A disabled, demoted, or deleted user kept their live session.** The role was snapshotted into the session at login and session validation only checked expiry, so disabling, demoting, or deleting a user left an already-issued session valid — with the old role — for up to the session lifetime (default 8h), with no kill switch for a compromised or just-removed account. Those transitions now invalidate the user's in-memory sessions, forcing re-authentication under the new state; an unrelated edit (e.g. email) does not log the user out. + +### Fixes + +- **Webhook secrets clobbered on a *generic* settings save.** v2.15.0 restored masked webhook secrets on the dedicated `/api/notifications/config` route, but the generic `POST /api/settings` path (the full-settings round-trip used by the UI and integration tests) still merged the `notifications` subtree without restoring secrets nested in the `channels.webhooks` list — `_deep_merge_dict` replaces lists wholesale and `_strip_masked_values` only walks dicts — writing the mask sentinel over the real HMAC secret / bot token. The corruption was silent: deliveries then signed with `********`. `atomic_update` now restores masked list secrets generically for every deep-merge key, so both paths behave identically. + ## v2.17.0 (Feature — third-party-verifiable audit trail) Completes the agentic audit trail (v2.16.0 added attribution + the tamper-evident hash chain): the record can now be verified by a third party, off the box, without running or trusting CertMate. diff --git a/modules/__init__.py b/modules/__init__.py index 611f9d9..7973800 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -1,2 +1,2 @@ -__version__ = '2.17.0' +__version__ = '2.17.1' diff --git a/modules/api/resources.py b/modules/api/resources.py index 42b738d..3fab9f7 100644 --- a/modules/api/resources.py +++ b/modules/api/resources.py @@ -263,6 +263,12 @@ def get(self): # Metrics endpoints class MetricsList(Resource): + # Gated like its sibling info endpoints (CacheStats, BackupList): the + # Prometheus scrape target is the separate public '/metrics' route — + # this JSON summary lives in the authenticated API and must require at + # least a viewer credential, not be reachable unauthenticated. + @api.doc(security='Bearer') + @auth_manager.require_role('viewer') def get(self): """Get available metrics information""" try: diff --git a/modules/core/auth.py b/modules/core/auth.py index e408023..b670bd8 100644 --- a/modules/core/auth.py +++ b/modules/core/auth.py @@ -515,6 +515,7 @@ def update_user(self, username, password=None, role=None, email=None, enabled=No return False, "User not found" user = users[username] + original_role = self._normalize_role(user.get('role', 'operator')) # SSO-managed accounts authenticate against the IdP and carry an # empty password_hash on purpose. Refuse to set a local password so @@ -555,6 +556,13 @@ def update_user(self, username, password=None, role=None, email=None, enabled=No if self._save_users(users): logger.info(f"User '{username}' updated successfully") + # A privilege change must not be outlived by a live session: + # disabling the account or changing its role revokes any + # in-memory sessions so the change takes effect immediately — + # the user re-authenticates under the new role, or is blocked. + role_changed = role is not None and self._normalize_role(role) != original_role + if enabled is False or role_changed: + self._invalidate_sessions_for_user(username) return True, "User updated successfully" return False, "Failed to save user" except (OSError, ValueError, KeyError) as e: @@ -579,6 +587,8 @@ def delete_user(self, username): if self._save_users(users): logger.info(f"User '{username}' deleted successfully") + # Kill any live sessions so a deleted account can't keep acting. + self._invalidate_sessions_for_user(username) return True, "User deleted successfully" return False, "Failed to delete user" except (OSError, ValueError, KeyError) as e: @@ -692,6 +702,25 @@ def invalidate_session(self, session_id): return True return False + def _invalidate_sessions_for_user(self, username): + """Drop every in-memory session belonging to ``username``. + + Called when a user is disabled, demoted, or deleted so a live session + cannot outlive the privilege change. The role is otherwise snapshotted + into the session at login (see ``create_session``) and stays valid + until ``SESSION_TIMEOUT_HOURS``, which would leave a just-disabled / + just-demoted / deleted account fully privileged for up to the session + lifetime. Returns the number of sessions dropped. + """ + with self._session_lock: + stale = [sid for sid, data in self._sessions.items() + if data.get('user') == username] + for sid in stale: + del self._sessions[sid] + if stale: + logger.info(f"Invalidated {len(stale)} active session(s) for user '{username}'") + return len(stale) + def _cleanup_sessions(self): """Remove expired sessions (caller must hold _session_lock)""" current_time = time.time() diff --git a/modules/core/client_certificates.py b/modules/core/client_certificates.py index d066a6e..83c3a35 100644 --- a/modules/core/client_certificates.py +++ b/modules/core/client_certificates.py @@ -6,6 +6,7 @@ import logging import json import os +import re from pathlib import Path from datetime import datetime, timedelta from .utils import utc_now @@ -24,6 +25,32 @@ logger = logging.getLogger(__name__) +# Characters allowed verbatim in the on-disk identifier slug. Anything else +# (path separators, NUL, unicode, ...) is collapsed to '-'; runs of '.' are +# then collapsed so a '..' traversal token can never survive. The allowed set +# mirrors _validate_identifier's _SAFE_IDENTIFIER_RE, so a benign CN such as +# 'svc.example.com' slugifies to itself — byte-for-byte the same identifier the +# old `.lower().replace(' ', '-')` produced — while '/', '\\' and '..' cannot. +_CN_SLUG_STRIP_RE = re.compile(r'[^a-z0-9._-]+') + + +def _slugify_common_name(common_name: str) -> str: + """Reduce a certificate Common Name to a filesystem-safe slug. + + Lowercases, replaces every run of disallowed characters with a single '-', + collapses any run of dots to one (killing the '..' traversal token), and + strips leading separators so the result starts with an alphanumeric. The + output therefore matches ``_SAFE_IDENTIFIER_RE`` (or falls back to 'client' + when the CN slugifies to empty) and can never contain '/', '\\', '..' or + NUL. The full Common Name is still preserved verbatim in the certificate + subject and metadata — only the directory/file naming uses the slug. + """ + slug = _CN_SLUG_STRIP_RE.sub('-', (common_name or '').strip().lower()) + slug = re.sub(r'\.{2,}', '.', slug) # collapse '..'+ so no traversal token + slug = slug.lstrip('.-_') # _SAFE_IDENTIFIER_RE needs alnum first + return slug or 'client' + + class ClientCertificateManager: """ Manages client certificates for mTLS, VPN, and user authentication. @@ -130,12 +157,23 @@ def create_client_certificate( if not isinstance(days_valid, int) or days_valid < MIN_CERTIFICATE_VALIDITY_DAYS or days_valid > MAX_CERTIFICATE_VALIDITY_DAYS: return False, f"days_valid must be between {MIN_CERTIFICATE_VALIDITY_DAYS} and {MAX_CERTIFICATE_VALIDITY_DAYS}", None - # Generate unique identifier - identifier = f"{common_name.lower().replace(' ', '-')}-{uuid4().hex[:8]}" + # Generate unique identifier. The Common Name is caller-controlled + # (anyone with cert-issuance rights), so slugify it to a + # filesystem-safe token before it ever touches a path — otherwise a + # CN like '../../../tmp/evil' would make cert_subdir.mkdir() create, + # and write a CA-signed key + cert into, a directory outside the tree. + identifier = f"{_slugify_common_name(common_name)}-{uuid4().hex[:8]}" # Determine storage directory cert_dir = self._get_cert_subdir(cert_usage) cert_subdir = cert_dir / identifier + # Defence in depth: refuse to create/write anything outside the + # managed cert tree even if the slug logic above were ever weakened. + cert_root = cert_dir.resolve() + resolved_subdir = cert_subdir.resolve() + if cert_root != resolved_subdir and cert_root not in resolved_subdir.parents: + logger.error("Refusing to create client cert outside the cert tree: %r", identifier) + return False, "Invalid certificate identifier", None cert_subdir.mkdir(parents=True, exist_ok=True) # Handle CSR or generate CSR + key diff --git a/modules/core/file_operations.py b/modules/core/file_operations.py index 1064610..dacbd23 100644 --- a/modules/core/file_operations.py +++ b/modules/core/file_operations.py @@ -6,6 +6,7 @@ import base64 import io import os +import re import json import tempfile import zipfile @@ -29,6 +30,20 @@ # the domain config dir for future renewals after a restore. _BACKUP_EXCLUDE_DIRS = frozenset({'logs', 'work'}) +# Private-key material that certbot/CertMate writes into the cert tree. On +# restore every one of these must get 0600 — not just the live `privkey.pem`. +# certbot keeps the real key bytes in archive//privkeyN.pem (the +# live/privkey.pem symlink points at them) and the ACME account key in +# accounts/.../private_key.json; both are retained in the unified backup +# (accounts/ and archive/ are intentionally kept — see _BACKUP_EXCLUDE_DIRS). +# Matching only the exact name 'privkey.pem' left those world-readable (0644). +# Public material (cert/chain/fullchain + *.json metadata) stays 0644 so +# external consumers can still read it, mirroring certbot's own permissions. +_PRIVATE_KEY_FILE_RE = re.compile( + r'(?:^|[._-])privkey\d*\.pem$|^private_key\.json$|\.key$', + re.IGNORECASE, +) + # --- Backup encryption at rest -------------------------------------------- # Unified backups embed every certificate private key (privkey.pem). With # CERTMATE_BACKUP_PASSPHRASE set, the whole zip is encrypted (Fernet: @@ -701,8 +716,10 @@ def restore_unified_backup(self, backup_file_path): logger.error(f"Error extracting {file_info.filename}: {e}") continue - # Set appropriate permissions - if target_path.name == 'privkey.pem': + # Set appropriate permissions: lock down every private + # key (live, archived, and the ACME account key), leave + # public cert material world-readable. + if _PRIVATE_KEY_FILE_RE.search(target_path.name): os.chmod(target_path, 0o600) else: os.chmod(target_path, 0o644) diff --git a/modules/core/settings.py b/modules/core/settings.py index 0001082..b3f3009 100644 --- a/modules/core/settings.py +++ b/modules/core/settings.py @@ -286,6 +286,28 @@ def _identity(d): return new_list +def _restore_masked_list_secrets_deep(old_subtree, new_subtree): + """Recursively apply ``_restore_masked_list_secrets`` across a merged + settings subtree. + + ``_deep_merge_dict`` replaces lists wholesale, so a secret the UI masked + inside a list-of-dicts (e.g. ``notifications.channels.webhooks``) survives + the merge only as the sentinel. Walking the merged subtree in lockstep with + the on-disk one and restoring every list lets the *generic* settings POST + path preserve list-nested secrets exactly like the dedicated notifications + route already does. Mutates and returns ``new_subtree``. + """ + if not isinstance(new_subtree, dict) or not isinstance(old_subtree, dict): + return new_subtree + for key, value in new_subtree.items(): + old_value = old_subtree.get(key) + if isinstance(value, list): + _restore_masked_list_secrets(old_value, value) + elif isinstance(value, dict): + _restore_masked_list_secrets_deep(old_value, value) + return new_subtree + + # Top-level settings keys whose value is a nested dict that should be # deep-merged rather than wholesale-replaced on save. Each of these stores # multiple secret-bearing subtrees (per-backend storage credentials, per-CA @@ -558,7 +580,15 @@ def atomic_update(self, incoming: dict, protected_keys=('users', 'api_keys', 'lo if (key in _DEEP_MERGE_SETTINGS_KEYS and isinstance(existing.get(key), dict) and isinstance(value, dict)): - merged[key] = _deep_merge_dict(existing[key], value) + merged_subtree = _deep_merge_dict(existing[key], value) + # _deep_merge_dict replaces lists wholesale, so a secret the + # UI masked inside a list-of-dicts (e.g. + # notifications.channels.webhooks) would otherwise be written + # back as the sentinel. Restore those from the on-disk + # subtree so the generic settings POST path preserves them + # like the dedicated notifications route does. + _restore_masked_list_secrets_deep(existing[key], merged_subtree) + merged[key] = merged_subtree for key in protected_keys: if key in existing: merged[key] = existing[key] diff --git a/package.json b/package.json index 5cdac96..d205f40 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "private": true, "name": "certmate", - "version": "2.17.0", + "version": "2.17.1", "description": "CertMate SSL Certificate Manager - frontend assets", "scripts": { "css:build": "npx tailwindcss -i static/css/input.css -o static/css/tailwind.min.css --minify", diff --git a/tests/test_backup_restore_key_permissions.py b/tests/test_backup_restore_key_permissions.py new file mode 100644 index 0000000..de24a6c --- /dev/null +++ b/tests/test_backup_restore_key_permissions.py @@ -0,0 +1,68 @@ +"""Regression: restoring a unified backup must not leave private keys 0644. + +restore_unified_backup chmod'd only the exact name 'privkey.pem' to 0600 and +everything else to 0644. certbot keeps the real key bytes in +archive//privkeyN.pem (live/privkey.pem symlinks to them) and the ACME +account key in accounts/.../private_key.json — both retained in the backup +(accounts/ and archive/ are intentionally kept). Restore therefore actively +downgraded served key material to world-readable. + +The fix locks down every private-key filename while leaving public cert material +(cert/chain/fullchain + metadata json) at 0644, mirroring certbot's own perms. +""" +import os +import stat +import zipfile + +import pytest + +from modules.core.file_operations import FileOperations, _PRIVATE_KEY_FILE_RE + +pytestmark = [pytest.mark.unit] + + +@pytest.fixture +def file_ops(tmp_path): + cert_dir = tmp_path / "certificates" + data_dir = tmp_path / "data" + backup_dir = tmp_path / "backups" + logs_dir = tmp_path / "logs" + for d in (cert_dir, data_dir, backup_dir, logs_dir): + d.mkdir() + return FileOperations(cert_dir=cert_dir, data_dir=data_dir, + backup_dir=backup_dir, logs_dir=logs_dir) + + +def _mode(path): + return stat.S_IMODE(os.stat(path).st_mode) + + +_PRIVATE = ["privkey.pem", "archive/d.com/privkey1.pem", + "accounts/x/private_key.json", "extra.key"] +_PUBLIC = ["cert.pem", "fullchain.pem", "chain.pem", + "archive/d.com/fullchain1.pem", "metadata.json", + "accounts/x/regr.json"] + + +def test_restore_locks_down_every_private_key(file_ops, tmp_path): + backup = tmp_path / "backup_test.zip" + with zipfile.ZipFile(backup, "w") as zf: + for rel in _PRIVATE + _PUBLIC: + zf.writestr(f"certificates/d.com/{rel}", b"secret-or-not") + + assert file_ops.restore_unified_backup(str(backup)) is True + + base = file_ops.cert_dir / "d.com" + for rel in _PRIVATE: + assert _mode(base / rel) == 0o600, f"{rel} must be 0600, got {oct(_mode(base / rel))}" + for rel in _PUBLIC: + assert _mode(base / rel) == 0o644, f"{rel} must be 0644, got {oct(_mode(base / rel))}" + + +def test_private_key_pattern_matches_real_certbot_names(): + for name in ("privkey.pem", "privkey1.pem", "privkey42.pem", + "private_key.json", "client.key"): + assert _PRIVATE_KEY_FILE_RE.search(name), f"{name} should be treated as private" + for name in ("fullchain.pem", "cert.pem", "chain1.pem", + "metadata.json", "regr.json", "README"): + assert not _PRIVATE_KEY_FILE_RE.search(name), f"{name} should stay public" diff --git a/tests/test_client_cert_path_traversal.py b/tests/test_client_cert_path_traversal.py new file mode 100644 index 0000000..ac12953 --- /dev/null +++ b/tests/test_client_cert_path_traversal.py @@ -0,0 +1,81 @@ +"""Regression: a malicious Common Name cannot escape the client-cert tree. + +modules/core/client_certificates.py built the on-disk identifier straight from +the CN (only spaces replaced) and ran ``cert_subdir.mkdir(parents=True)`` before +anything else. A CN like ``../../../../tmp/evil`` therefore created — and wrote +a CA-signed key + cert + metadata into — a directory OUTSIDE the managed tree. +The pre-existing ``_validate_identifier`` guard was only ever applied on +*retrieval*, never to the value that *constructs* the identifier. + +The fix slugifies the CN at the construction site (covering single + batch + +any future caller) and asserts containment before mkdir. Benign CNs keep their +old identifier byte-for-byte. +""" +from pathlib import Path + +import pytest + +from modules.core.client_certificates import ( + ClientCertificateManager, + _slugify_common_name, +) +from modules.core.private_ca import PrivateCAGenerator + +pytestmark = [pytest.mark.unit] + + +@pytest.fixture(scope="module") +def ca(tmp_path_factory): + pca = PrivateCAGenerator(tmp_path_factory.mktemp("ca")) + assert pca.initialize() is True + return pca + + +@pytest.fixture +def mgr(ca, tmp_path): + return ClientCertificateManager(tmp_path / "client-certs", ca) + + +@pytest.mark.parametrize("cn", [ + "../../../../tmp/cm_pwned/evil", + "../escape", + "a/b/c", + "..\\..\\win", + "x\x00y", + "..", +]) +def test_malicious_common_name_stays_inside_tree(mgr, cn, tmp_path): + ok, _err, data = mgr.create_client_certificate(common_name=cn) + root = (tmp_path / "client-certs").resolve() + if ok: + ident = data["identifier"] + assert "/" not in ident and "\\" not in ident and ".." not in ident + for p in data["paths"].values(): + if p: + assert Path(p).resolve().is_relative_to(root), f"escaped tree: {p}" + # In no case may anything be written outside the client-cert root: the + # only child of tmp_path must remain the manager's own directory. + assert not (tmp_path / "cm_pwned").exists() + assert {p.name for p in tmp_path.iterdir()} == {"client-certs"} + + +def test_slug_is_filesystem_safe(): + assert _slugify_common_name("../../etc/passwd") == "etc-passwd" + assert _slugify_common_name("..\\..\\win") == "win" + assert _slugify_common_name("a..b") == "a.b" # traversal token killed + assert _slugify_common_name("///") == "client" + assert _slugify_common_name("") == "client" + assert _slugify_common_name("x\x00y") == "x-y" + + +def test_benign_cn_unchanged_and_retrievable(mgr): + # Dots / spaces behave exactly as the old code did, so the identifier and + # therefore existing retrieval paths are unaffected. + assert _slugify_common_name("svc.example.com") == "svc.example.com" + assert _slugify_common_name("John Doe") == "john-doe" + + ok, err, data = mgr.create_client_certificate(common_name="svc.example.com") + assert ok, err + ident = data["identifier"] + assert ident.startswith("svc.example.com-") + assert mgr.get_certificate_file(ident, "crt") diff --git a/tests/test_metrics_endpoint_auth.py b/tests/test_metrics_endpoint_auth.py new file mode 100644 index 0000000..f5989fb --- /dev/null +++ b/tests/test_metrics_endpoint_auth.py @@ -0,0 +1,60 @@ +"""Regression: GET /api/metrics must require at least a viewer credential. + +MetricsList carried no auth decorator at all, while its sibling info endpoints +(CacheStats, BackupList) are require_role('viewer'). An unauthenticated GET hit +the handler. The Prometheus scrape target is the SEPARATE public '/metrics' +route; this JSON summary lives in the authenticated API and must be gated to +match its peers. +""" +from collections import defaultdict +from unittest.mock import MagicMock + +import pytest +from flask import Flask, abort +from flask_restx import Api, Namespace + +from modules.api.models import create_api_models +from modules.api.resources import create_api_resources +from modules.core.auth import ROLE_HIERARCHY + +pytestmark = [pytest.mark.unit] + + +def _build_app(current_user): + def require_role_factory(min_role): + def deco(fn): + def wrapped(*a, **k): + role = (current_user or {}).get("role") + if ROLE_HIERARCHY.get(role, -1) < ROLE_HIERARCHY.get(min_role, 999): + abort(401) + return fn(*a, **k) + return wrapped + return deco + + auth_manager = MagicMock() + auth_manager.require_role = MagicMock(side_effect=require_role_factory) + + managers = defaultdict(MagicMock) + managers["auth"] = auth_manager + + app = Flask(__name__) + app.config["TESTING"] = True + api = Api(app, prefix="/api") + models = create_api_models(api) + resources = create_api_resources(api, models, managers) + + ns = Namespace("metrics", description="metrics") + api.add_namespace(ns) + ns.add_resource(resources["MetricsList"], "") + return app + + +def test_metrics_blocked_when_unauthenticated(): + resp = _build_app(current_user=None).test_client().get("/api/metrics") + assert resp.status_code == 401 + + +def test_metrics_not_blocked_for_viewer(): + # Auth passes; body may be 200 or 503 (prometheus availability) — never 401. + resp = _build_app(current_user={"role": "viewer"}).test_client().get("/api/metrics") + assert resp.status_code != 401 diff --git a/tests/test_session_revocation_on_user_change.py b/tests/test_session_revocation_on_user_change.py new file mode 100644 index 0000000..159d7dd --- /dev/null +++ b/tests/test_session_revocation_on_user_change.py @@ -0,0 +1,75 @@ +"""Regression: disabling, demoting, or deleting a user revokes live sessions. + +create_session() snapshots the role at login; validate_session() returned that +frozen snapshot with only an expiry check. update_user(enabled=False) / a role +change / delete_user mutated settings.json but left the in-memory session valid +for up to SESSION_TIMEOUT_HOURS — a just-disabled / demoted / deleted account +stayed privileged with no kill switch. The fix invalidates a user's sessions on +those transitions; an unrelated edit (e.g. email) must NOT log the user out. +""" +import copy +from unittest.mock import MagicMock + +import pytest + +from modules.core.auth import AuthManager + +pytestmark = [pytest.mark.unit] + + +def _mk_settings_manager(initial): + sm = MagicMock() + sm.load_settings.side_effect = lambda: copy.deepcopy(initial) + + def _update(fn, _audit_label): + state = copy.deepcopy(initial) + fn(state) + initial.clear() + initial.update(state) + return True + + sm.update.side_effect = _update + return sm + + +@pytest.fixture +def auth(): + initial = {} + am = AuthManager(_mk_settings_manager(initial)) + # A standing admin so the last-admin guards never block our mutations. + ok, _ = am.create_user("root", "rootpw", role="admin") + assert ok + return am + + +def _session_for(am, username, role="operator"): + ok, _ = am.create_user(username, "pw", role=role) + assert ok + sid = am.create_session(username) + assert am.validate_session(sid) is not None + return sid + + +def test_disable_revokes_session(auth): + sid = _session_for(auth, "alice") + assert auth.update_user("alice", enabled=False)[0] + assert auth.validate_session(sid) is None + + +def test_role_change_revokes_session(auth): + sid = _session_for(auth, "bob") + assert auth.update_user("bob", role="viewer")[0] + assert auth.validate_session(sid) is None + + +def test_delete_revokes_session(auth): + sid = _session_for(auth, "carol") + assert auth.delete_user("carol")[0] + assert auth.validate_session(sid) is None + + +def test_unrelated_update_keeps_session(auth): + sid = _session_for(auth, "dave") + assert auth.update_user("dave", email="dave@example.com")[0] + # An email-only change is not a privilege change — session must survive. + assert auth.validate_session(sid) is not None diff --git a/tests/test_settings_list_secret_generic_restore.py b/tests/test_settings_list_secret_generic_restore.py new file mode 100644 index 0000000..a80eb94 --- /dev/null +++ b/tests/test_settings_list_secret_generic_restore.py @@ -0,0 +1,74 @@ +"""Regression: webhook secrets survive a GENERIC settings round-trip. + +notifications is a _DEEP_MERGE_SETTINGS_KEYS subtree, but its secrets live in a +list-of-dicts (channels.webhooks). _deep_merge_dict replaces lists wholesale and +_strip_masked_values only walks dicts, so a GET (which masks the secret to +'********') followed by a POST of the whole settings blob through the GENERIC +/api/settings path persisted '********' over the real secret. The dedicated +/api/notifications/config route restored list secrets; the generic path did not. + +atomic_update now restores masked list secrets generically for every deep-merge +key, so both paths behave the same. +""" +import pytest + +from modules.core.file_operations import FileOperations +from modules.core.settings import SettingsManager, SECRET_MASK_SENTINEL as MASK + +pytestmark = [pytest.mark.unit] + + +@pytest.fixture +def sm(tmp_path): + cert_dir = tmp_path / "certificates" + data_dir = tmp_path / "data" + backup_dir = tmp_path / "backups" + logs_dir = tmp_path / "logs" + for d in (cert_dir, data_dir, backup_dir, logs_dir): + d.mkdir() + file_ops = FileOperations(cert_dir=cert_dir, data_dir=data_dir, + backup_dir=backup_dir, logs_dir=logs_dir) + return SettingsManager(file_ops=file_ops, settings_file=data_dir / "settings.json") + + +def _seed(sm, secret="REAL-HMAC-SECRET", token="REAL-BOT-TOKEN"): + assert sm.atomic_update({ + "notifications": {"channels": {"webhooks": [ + {"name": "wh", "type": "generic", "url": "https://h", "secret": secret}, + {"name": "tg", "type": "telegram", "url": "", "token": token, "chat_id": "1"}, + ]}} + }) is True + + +def _webhooks_by_name(sm): + whs = sm.load_settings()["notifications"]["channels"]["webhooks"] + return {w["name"]: w for w in whs} + + +def test_masked_list_secret_not_clobbered_on_generic_roundtrip(sm): + _seed(sm) + # GET masks the secrets; the whole blob is POSTed back via the generic path. + assert sm.atomic_update({ + "notifications": {"channels": {"webhooks": [ + {"name": "wh", "type": "generic", "url": "https://h", "secret": MASK}, + {"name": "tg", "type": "telegram", "url": "", "token": MASK, "chat_id": "1"}, + ]}} + }) is True + + by_name = _webhooks_by_name(sm) + assert by_name["wh"]["secret"] == "REAL-HMAC-SECRET" # not '********' + assert by_name["tg"]["token"] == "REAL-BOT-TOKEN" + + +def test_retyped_secret_still_overrides(sm): + _seed(sm) + assert sm.atomic_update({ + "notifications": {"channels": {"webhooks": [ + {"name": "wh", "type": "generic", "url": "https://h", "secret": "NEW-SECRET"}, + {"name": "tg", "type": "telegram", "url": "", "token": MASK, "chat_id": "1"}, + ]}} + }) is True + + by_name = _webhooks_by_name(sm) + assert by_name["wh"]["secret"] == "NEW-SECRET" + assert by_name["tg"]["token"] == "REAL-BOT-TOKEN"