diff --git a/datasette/utils/permissions.py b/datasette/utils/permissions.py index 6c30a12a27..4f4858d048 100644 --- a/datasette/utils/permissions.py +++ b/datasette/utils/permissions.py @@ -2,8 +2,7 @@ from __future__ import annotations import json -from typing import Any, Dict, Iterable, List, Sequence, Tuple -import sqlite3 +from typing import Any, Iterable, List from datasette.permissions import PermissionSQL from datasette.plugins import pm @@ -46,10 +45,11 @@ async def gather_permission_sql_from_hooks( for permission_sql in _iter_permission_sql_from_result(resolved, action=action): if not permission_sql.source: permission_sql.source = default_source - params = permission_sql.params or {} - params.setdefault("action", action) - params.setdefault("actor", actor_json) - params.setdefault("actor_id", actor_id) + if permission_sql.params is None: + permission_sql.params = {} + permission_sql.params.setdefault("action", action) + permission_sql.params.setdefault("actor", actor_json) + permission_sql.params.setdefault("actor_id", actor_id) collected.append(permission_sql) return collected @@ -82,358 +82,3 @@ def _iter_permission_sql_from_result( raise TypeError( "Plugin providers must return PermissionSQL instances, sequences, or callables" ) - - -# ----------------------------- -# Plugin interface & utilities -# ----------------------------- - - -def build_rules_union( - actor: dict | None, plugins: Sequence[PermissionSQL] -) -> Tuple[str, Dict[str, Any]]: - """ - Compose plugin SQL into a UNION ALL. - - Returns: - union_sql: a SELECT with columns (parent, child, allow, reason, source_plugin) - params: dict of bound parameters including :actor (JSON), :actor_id, and plugin params - - Note: Plugins are responsible for ensuring their parameter names don't conflict. - The system reserves these parameter names: :actor, :actor_id, :action, :filter_parent - Plugin parameters should be prefixed with a unique identifier (e.g., source name). - """ - parts: List[str] = [] - actor_json = json.dumps(actor) if actor else None - actor_id = actor.get("id") if actor else None - params: Dict[str, Any] = {"actor": actor_json, "actor_id": actor_id} - - for p in plugins: - # No namespacing - just use plugin params as-is - params.update(p.params or {}) - - # Skip plugins that only provide restriction_sql (no permission rules) - if p.sql is None: - continue - - parts.append( - f""" - SELECT parent, child, allow, reason, '{p.source}' AS source_plugin FROM ( - {p.sql} - ) - """.strip() - ) - - if not parts: - # Empty UNION that returns no rows - union_sql = "SELECT NULL parent, NULL child, NULL allow, NULL reason, 'none' source_plugin WHERE 0" - else: - union_sql = "\nUNION ALL\n".join(parts) - - return union_sql, params - - -# ----------------------------------------------- -# Core resolvers (no temp tables, no custom UDFs) -# ----------------------------------------------- - - -async def resolve_permissions_from_catalog( - db, - actor: dict | None, - plugins: Sequence[Any], - action: str, - candidate_sql: str, - candidate_params: Dict[str, Any] | None = None, - *, - implicit_deny: bool = True, -) -> List[Dict[str, Any]]: - """ - Resolve permissions by embedding the provided *candidate_sql* in a CTE. - - Expectations: - - candidate_sql SELECTs: parent TEXT, child TEXT - (Use child=NULL for parent-scoped actions like "execute-sql".) - - *db* exposes: rows = await db.execute(sql, params) - where rows is an iterable of sqlite3.Row - - plugins: hook results handled by await_me_maybe - can be sync/async, - single PermissionSQL, list, or callable returning PermissionSQL - - actor is the actor dict (or None), made available as :actor (JSON), :actor_id, and :action - - Decision policy: - 1) Specificity first: child (depth=2) > parent (depth=1) > root (depth=0) - 2) Within the same depth: deny (0) beats allow (1) - 3) If no matching rule: - - implicit_deny=True -> treat as allow=0, reason='implicit deny' - - implicit_deny=False -> allow=None, reason=None - - Returns: list of dict rows - - parent, child, allow, reason, source_plugin, depth - - resource (rendered "/parent/child" or "/parent" or "/") - """ - resolved_plugins: List[PermissionSQL] = [] - restriction_sqls: List[str] = [] - - for plugin in plugins: - if callable(plugin) and not isinstance(plugin, PermissionSQL): - resolved = plugin(action) # type: ignore[arg-type] - else: - resolved = plugin # type: ignore[assignment] - if not isinstance(resolved, PermissionSQL): - raise TypeError("Plugin providers must return PermissionSQL instances") - resolved_plugins.append(resolved) - - # Collect restriction SQL filters - if resolved.restriction_sql: - restriction_sqls.append(resolved.restriction_sql) - - union_sql, rule_params = build_rules_union(actor, resolved_plugins) - all_params = { - **(candidate_params or {}), - **rule_params, - "action": action, - } - - sql = f""" - WITH - cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ) - SELECT - c.parent, c.child, - COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, - COALESCE(w.reason, CASE WHEN :implicit_deny THEN 'implicit deny' ELSE NULL END) AS reason, - w.source_plugin, - COALESCE(w.depth, -1) AS depth, - :action AS action, - CASE - WHEN c.parent IS NULL THEN '/' - WHEN c.child IS NULL THEN '/' || c.parent - ELSE '/' || c.parent || '/' || c.child - END AS resource - FROM cands c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - ORDER BY c.parent, c.child - """ - - # If there are restriction filters, wrap the query with INTERSECT - # This ensures only resources in the restriction allowlist are returned - if restriction_sqls: - # Start with the main query, but select only parent/child for the INTERSECT - main_query_for_intersect = f""" - WITH - cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ), - permitted_resources AS ( - SELECT c.parent, c.child - FROM cands c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - WHERE COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) = 1 - ) - SELECT parent, child FROM permitted_resources - """ - - # Build restriction list with INTERSECT (all must match) - # Then filter to resources that match hierarchically - # Wrap each restriction_sql in a subquery to avoid operator precedence issues - # with UNION ALL inside the restriction SQL statements - restriction_intersect = "\nINTERSECT\n".join( - f"SELECT * FROM ({sql})" for sql in restriction_sqls - ) - - # Combine: resources allowed by permissions AND in restriction allowlist - # Database-level restrictions (parent, NULL) should match all children (parent, *) - filtered_resources = f""" - WITH restriction_list AS ( - {restriction_intersect} - ), - permitted AS ( - {main_query_for_intersect} - ), - filtered AS ( - SELECT p.parent, p.child - FROM permitted p - WHERE EXISTS ( - SELECT 1 FROM restriction_list r - WHERE (r.parent = p.parent OR r.parent IS NULL) - AND (r.child = p.child OR r.child IS NULL) - ) - ) - """ - - # Now join back to get full results for only the filtered resources - sql = f""" - {filtered_resources} - , cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ) - SELECT - c.parent, c.child, - COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, - COALESCE(w.reason, CASE WHEN :implicit_deny THEN 'implicit deny' ELSE NULL END) AS reason, - w.source_plugin, - COALESCE(w.depth, -1) AS depth, - :action AS action, - CASE - WHEN c.parent IS NULL THEN '/' - WHEN c.child IS NULL THEN '/' || c.parent - ELSE '/' || c.parent || '/' || c.child - END AS resource - FROM filtered c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - ORDER BY c.parent, c.child - """ - - rows_iter: Iterable[sqlite3.Row] = await db.execute( - sql, - {**all_params, "implicit_deny": 1 if implicit_deny else 0}, - ) - return [dict(r) for r in rows_iter] - - -async def resolve_permissions_with_candidates( - db, - actor: dict | None, - plugins: Sequence[Any], - candidates: List[Tuple[str, str | None]], - action: str, - *, - implicit_deny: bool = True, -) -> List[Dict[str, Any]]: - """ - Resolve permissions without any external candidate table by embedding - the candidates as a UNION of parameterized SELECTs in a CTE. - - candidates: list of (parent, child) where child can be None for parent-scoped actions. - actor: actor dict (or None), made available as :actor (JSON), :actor_id, and :action - """ - # Build a small CTE for candidates. - cand_rows_sql: List[str] = [] - cand_params: Dict[str, Any] = {} - for i, (parent, child) in enumerate(candidates): - pkey = f"cand_p_{i}" - ckey = f"cand_c_{i}" - cand_params[pkey] = parent - cand_params[ckey] = child - cand_rows_sql.append(f"SELECT :{pkey} AS parent, :{ckey} AS child") - candidate_sql = ( - "\nUNION ALL\n".join(cand_rows_sql) - if cand_rows_sql - else "SELECT NULL AS parent, NULL AS child WHERE 0" - ) - - return await resolve_permissions_from_catalog( - db, - actor, - plugins, - action, - candidate_sql=candidate_sql, - candidate_params=cand_params, - implicit_deny=implicit_deny, - ) diff --git a/permissions-notes.md b/permissions-notes.md new file mode 100644 index 0000000000..00695f08f4 --- /dev/null +++ b/permissions-notes.md @@ -0,0 +1,727 @@ +# SQL Permissions System - Deep Code Review Notes + +## Overview + +The SQL permissions system was introduced in Datasette 1.0a20 and subsequently refined through 1.0a24. It replaces the older plugin hook-based `permission_allowed` system with a SQL-driven approach where all permission decisions are resolved by executing SQL queries against the internal SQLite database. + +Key commits: +- `95a1fef` (1.0a20): Initial introduction of `permissions.py`, `utils/permissions.py`, `default_permissions.py` +- `23a640d`: `--default-deny` option +- `d814e81`: `skip_permission_checks` context variable, `actions_sql.py` +- `0a92452`: Split `default_permissions.py` into a package with 7 modules +- `66d2a03`: Ruff lint fixes + +## Architecture Summary + +### Permission Check Flow + +``` +Request → Authentication → Action Check + ↓ + permission_resources_sql hook + ↓ + Multiple PermissionSQL objects collected + ↓ + UNION ALL into rules CTE + ↓ + Cascading evaluation: + child(2) → parent(1) → global(0) + DENY beats ALLOW at same level + ↓ + restriction_sql INTERSECT filtering + ↓ + Boolean result (or resource list) +``` + +### Two Code Paths + +1. **Single resource check** (`check_permission_for_resource` in `actions_sql.py:494-587`): Uses `ROW_NUMBER() OVER (PARTITION BY ...)` with ORDER BY depth to pick a winner. Used by `datasette.allowed()`. + +2. **All resources check** (`_build_single_action_sql` in `actions_sql.py:130-425`): Uses separate `child_lvl`, `parent_lvl`, `global_lvl` CTEs with `MAX(CASE ...)` aggregates, then a cascading CASE statement. Used by `datasette.allowed_resources()`. + +These two code paths implement the **same cascading logic** but with completely different SQL structures. + +### Key Files + +| File | Lines | Purpose | +|------|-------|---------| +| `datasette/permissions.py` | 210 | Core abstractions: `Resource`, `Action`, `PermissionSQL`, `SkipPermissions` | +| `datasette/resources.py` | 91 | `DatabaseResource`, `TableResource`, `QueryResource` | +| `datasette/utils/actions_sql.py` | 587 | SQL builders for `allowed_resources()` and `allowed()` | +| `datasette/utils/permissions.py` | 439 | Hook gathering, `resolve_permissions_from_catalog()` (3rd implementation) | +| `datasette/default_permissions/__init__.py` | 59 | Package init, re-exports, CSRF skip, canned_queries | +| `datasette/default_permissions/config.py` | 442 | `ConfigPermissionProcessor` - datasette.yaml rules | +| `datasette/default_permissions/defaults.py` | 70 | `DEFAULT_ALLOW_ACTIONS`, `default_allow_sql` | +| `datasette/default_permissions/restrictions.py` | 195 | Actor `_r` allowlist handling | +| `datasette/default_permissions/helpers.py` | 85 | `PermissionRowCollector`, action name variants | +| `datasette/default_permissions/root.py` | 29 | Root user global allow | +| `datasette/default_permissions/tokens.py` | 95 | Signed API token auth | + +--- + +## Findings + +### Tests: All Pass + +263 tests pass, 3 xpassed. Test files: +- `test_permissions.py` (largest, 1713 lines) +- `test_config_permission_rules.py` (163 lines) +- `test_utils_permissions.py` (612 lines) +- `test_permission_endpoints.py` (501 lines) +- `test_default_deny.py` (129 lines) +- `test_restriction_sql.py` +- `test_allowed_resources.py` +- `test_actions_sql.py` + +--- + +## Issues Found + +### ISSUE 1 (Design Concern): Root user blocked by `allow:` blocks that don't include "root" + +**Severity: Medium (by design per #2509, but potentially surprising UX)** + +When a table has an `allow:` block in config like: +```yaml +databases: + mydb: + tables: + secrets: + allow: + id: admin +``` + +The root user (--root) is **denied access** to that table. This happens because: + +1. `root_user_permissions_sql()` returns a global (NULL, NULL) ALLOW +2. `config_permissions_sql()` generates a child-level (mydb, secrets) DENY for actors not matching `{id: admin}` (root's id is "root", not "admin") +3. The cascading logic says child-level beats global-level + +**Observed behavior:** +``` +curl -b [root-cookies] /test_perms/secrets.json → 403 Forbidden +``` + +**Rules visible in /-/rules.json:** +```json +[ + {"parent": null, "child": null, "allow": 1, "reason": "root user"}, + {"parent": "test_perms", "child": "secrets", "allow": 0, "reason": "config deny allow..."} +] +``` + +**This is intentional per issue #2509**: `test_root_user_respects_settings_deny` in `test_permission_endpoints.py:355` explicitly asserts that config deny rules override root. The same logic applies to `allow: {id: admin}` - since root's id doesn't match, it becomes a deny. + +**However, this is a UX concern**: An admin starting Datasette with `--root` may reasonably expect full access. With `allow: {id: admin}`, the workaround is `allow: {id: [admin, root]}`, but with `allow: false` there is no config-based workaround. + +**Recommendation**: Document this clearly in `--root` documentation. Consider whether a future `--root-bypass-config` flag or equivalent would be useful for debugging scenarios. + +--- + +### ISSUE 2 (Design): Three separate implementations of cascading logic + +The cascading permission resolution (child > parent > global, deny beats allow) is implemented in three different places: + +1. **`actions_sql.py:_build_single_action_sql()`** (lines 246-384): Uses separate CTEs (`child_lvl`, `parent_lvl`, `global_lvl`) each doing `LEFT JOIN` + `GROUP BY` with `MAX()` aggregates, then a CASE cascade in `decisions`. + +2. **`actions_sql.py:check_permission_for_resource()`** (lines 555-587): Uses `ROW_NUMBER() OVER (PARTITION BY parent, child ORDER BY depth DESC, ...)` to pick a single winner. + +3. **`permissions.py:resolve_permissions_from_catalog()`** (lines 141-397): Yet another implementation using `ROW_NUMBER()` like #2 but with different structure, including massive SQL duplication when restriction_sql is present (the entire query is repeated in the restriction case). + +**Important note**: `resolve_permissions_from_catalog()` is **only used in tests** (`test_utils_permissions.py`), not in any production code path. This means it's a test-only implementation of the same logic, which could drift out of sync with the actual production implementations (#1 and #2). If the production SQL is changed, these tests might still pass on the old test-only implementation while production behavior changes. + +The two production paths (#1 and #2) implement the same cascading logic but with different SQL patterns. This is fragile - a logic change must be applied in both places. + +--- + +### ISSUE 3 (Code Quality): Massive SQL duplication in `resolve_permissions_from_catalog()` + +In `utils/permissions.py:256-391`, when `restriction_sqls` is present, the **entire CTE chain** (cands, rules, matched, ranked, winner) is duplicated - once for the main query and once for the restriction filtering. This results in ~135 lines of nearly identical SQL being emitted twice. + +The restriction-with-restrictions path generates SQL that embeds the full resolution query inside a `permitted_resources` CTE, then creates a `filtered` CTE, and then re-creates cands/rules/matched/ranked/winner *again* to get the full output columns. This could be simplified significantly. + +--- + +### ISSUE 4 (Code Quality): Global `_reason_id` counter in `PermissionSQL` + +`permissions.py:157` has a module-level `_reason_id` counter that increments forever: + +```python +_reason_id = 1 + +class PermissionSQL: + @classmethod + def allow(cls, reason, _allow=True): + global _reason_id + i = _reason_id + _reason_id += 1 + ... +``` + +This means: +- Every `PermissionSQL.allow()` or `.deny()` call increments a process-global counter +- In a long-running server, param keys grow: `:reason_1`, `:reason_2`, ..., `:reason_100000` +- Not thread-safe (though Python's GIL provides some protection) +- Makes SQL non-deterministic between requests (harder to cache or compare) +- The counter never resets + +This isn't a memory leak per se (the SQL is transient), but it's an unusual pattern. A better approach would be to use a per-call counter or deterministic naming. + +--- + +### ISSUE 5 (Security): `source_plugin` name injected into SQL without parameterization + +In three places, the plugin name is interpolated directly into SQL: + +```python +# actions_sql.py:185 +f"SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM ..." + +# actions_sql.py:484 +f"SELECT parent, child, allow, reason, '{permission_sql.source}' AS source_plugin FROM ..." + +# permissions.py:121 +f"SELECT parent, child, allow, reason, '{p.source}' AS source_plugin FROM ..." +``` + +The `source` field comes from `_plugin_name_from_hookimpl()` which extracts the Python module name. While unlikely to contain SQL injection payloads in practice, a malicious plugin with a single-quote in its name could inject SQL. This should use parameterized values. + +--- + +### ISSUE 6 (Security): `QueryResource.resources_sql()` uses manual quote escaping + +In `resources.py:82-88`: +```python +db_escaped = db_name.replace("'", "''") +query_escaped = query_name.replace("'", "''") +selects.append(f"SELECT '{db_escaped}' AS parent, '{query_escaped}' AS child") +``` + +This manually escapes single quotes by doubling them instead of using parameterized queries. While the double-quote escape is the correct SQLite approach, parameterized queries would be safer and more robust. + +The limitation here is that `resources_sql()` returns a SQL string, not (SQL, params) - so the API would need to change to support parameterization. + +--- + +### ISSUE 7 (Performance): `include_is_private` doubles the permission SQL + +When `include_is_private=True` is used (which is the default for database and index page views), the entire permission resolution is run twice: +1. Once for the actual actor +2. Once for `actor=None` (anonymous) + +This generates separate `anon_rules`, `anon_child_lvl`, `anon_parent_lvl`, `anon_global_lvl`, and `anon_decisions` CTEs - effectively doubling the size and cost of the query. + +Looking at the trace output for an anonymous user viewing the database page, the view-table permission query with `include_is_private=True` was the slowest query at ~4.2ms. For authenticated users with many rules, this would be worse. + +**Optimization opportunity**: When the actor IS anonymous (`actor=None`), the `is_private` computation is trivially 0 for all allowed resources since the actor and anonymous actor are the same. This case could be short-circuited. + +--- + +### ISSUE 8 (Performance): Homepage counts ALL tables, not just visible ones + +In the trace for the homepage, `table_counts` queries are issued for ALL tables: +``` +select count(*) from [posts] limit 10001 -- visible to anon +select count(*) from [secrets] limit 10001 -- NOT visible to anon +select count(*) from [users] limit 10001 -- NOT visible to anon +``` + +The count results for `secrets` and `users` are computed but then discarded because those tables aren't in the allowed set. This is wasteful, especially with large tables. The count queries should only be issued for tables the user can actually see. + +--- + +### ISSUE 9 (Design): `allow:` blocks generate DENYs, not restrictions + +The current design converts `allow: {id: admin}` blocks into **deny** rules for non-matching actors and **allow** rules for matching actors. This means: + +```yaml +tables: + secrets: + allow: + id: admin +``` + +Generates two separate rules depending on the actor: +- For admin: `(test_perms, secrets, allow=1, "config allow...")` +- For everyone else: `(test_perms, secrets, allow=0, "config deny...")` + +The deny rule is emitted at the child level, which means it **cannot be overridden by any global or parent-level allow**. This is the root cause of Issue 1. + +A more nuanced approach might: +- Only emit allow rules from `allow:` blocks +- Use a separate "last-resort" deny mechanism that doesn't interfere with higher-priority allows +- Or use a "priority" system where root > config > defaults + +--- + +### ISSUE 10 (Design): No explicit deny mechanism for specific actors + +The system has `allow:` blocks to restrict access to specific actors, but there's no explicit `deny:` block in config to deny specific actors while allowing everyone else. The only way to deny a specific actor is through the permission resolution system's cascading logic, which is indirect. + +A `deny:` block could be useful: +```yaml +databases: + mydb: + deny: + id: malicious_bot +``` + +--- + +### ISSUE 11 (Design Gap): `also_requires` only supports one level + +The `Action` dataclass has `also_requires: str | None` which links one action to another (e.g., `execute-sql` requires `view-database`). This only supports one level of dependency. If action A requires B which requires C, the system doesn't automatically chain these. + +Currently, `also_requires` is handled explicitly in both `allowed()` (recursive call) and `build_allowed_resources_sql()` (INNER JOIN of two queries). The recursive call in `allowed()` would handle chains, but `build_allowed_resources_sql()` only handles one level. + +--- + +### ISSUE 12 (Observability): Permission reason tracking loses deny information + +When a permission check results in a deny, the `allowed()` method logs the result as `result=False` but doesn't capture the reason. The `check_permission_for_resource()` function only returns a boolean, discarding the reason and source plugin information. + +For debugging, it would be valuable to know *why* access was denied - especially for the root user scenario in Issue 1. + +--- + +## Positive Observations + +1. **Clean separation of concerns**: The `default_permissions/` package split is well-organized with each module having a clear, focused responsibility. + +2. **Parameterized SQL throughout**: All user-controlled values (actor_id, action names, database names, table names in PermissionRowCollector) use parameterized queries. The exceptions noted above (source_plugin, QueryResource) are edge cases. + +3. **Comprehensive test coverage**: 263 tests covering a wide range of scenarios including cascading logic, restrictions, config rules, default deny, and endpoints. + +4. **Debuggability**: The `/-/rules.json` and `/-/allowed.json` endpoints make it straightforward to understand why a permission decision was made. The trace system exposes the actual SQL executed. + +5. **Extension points**: The `permission_resources_sql` hook is well-designed for plugins to contribute rules. The `restriction_sql` mechanism for actor allowlists is elegant. + +6. **Pagination**: `allowed_resources()` supports keyset pagination, which is important for instances with many tables/databases. + +--- + +## Recommendations (Priority Order) + +### P1: Document root user config interaction (Issue 1) + +The `--root` flag documentation should explicitly note that `allow:` blocks in config can override root access. For users who want root to bypass all restrictions, they should include "root" in their allow blocks: `allow: {id: [admin, root]}`. + +Consider in the future: a `--root-bypass-config` flag or similar for debugging scenarios where root truly needs unrestricted access. + +### P1: Consolidate cascading logic (Issue 2) + +Extract the cascading logic into a single shared SQL builder. Both `check_permission_for_resource()` and `_build_single_action_sql()` should call the same underlying function. The `resolve_permissions_from_catalog()` in `utils/permissions.py` should either be deprecated or aligned. + +### P1: Fix `source_plugin` SQL injection (Issue 5) + +Pass `source_plugin` as a parameter instead of interpolating it. This is a straightforward fix. + +### P2: Optimize `include_is_private` for anonymous users (Issue 7) + +Short-circuit when `actor=None` - the anonymous check is redundant. + +### P2: Only count visible tables (Issue 8) + +Pass the allowed table set to the counting logic to avoid wasted queries. + +### P3: Replace global `_reason_id` counter (Issue 4) + +Use a per-invocation counter or UUID-based naming for reason parameters. + +### P3: Simplify `resolve_permissions_from_catalog()` restriction handling (Issue 3) + +Refactor to avoid duplicating the entire CTE chain when restrictions are present. + +### P4: Add deny reason to permission check logging (Issue 12) + +Return `(allowed, reason)` tuples from `check_permission_for_resource()`. + +--- + +## Proposal: Consolidating the Cascading Logic (Issues 2 + 3) + +### The problem + +The cascading logic ("child > parent > global; deny beats allow at each level") is implemented three times in two files: + +| # | Function | File | Used by | SQL pattern | +|---|----------|------|---------|-------------| +| 1 | `_build_single_action_sql()` | `actions_sql.py:246-384` | `allowed_resources()` (production) | 3 separate CTEs (`child_lvl`, `parent_lvl`, `global_lvl`) with `LEFT JOIN` + `GROUP BY` + `MAX()`, then CASE cascade | +| 2 | `check_permission_for_resource()` | `actions_sql.py:555-587` | `allowed()` (production) | `ROW_NUMBER() OVER (PARTITION BY ... ORDER BY depth DESC, ...)` + `LIMIT 1` | +| 3 | `resolve_permissions_from_catalog()` | `permissions.py:197-391` | Tests only | Same `ROW_NUMBER()` as #2, but with the entire CTE chain **tripled** when restrictions are present | + +These three implementations must all agree on the resolution semantics. A logic change (e.g., adding a priority tier) would need to be replicated in all three places. + +### Why three exist + +Each serves a different purpose with different requirements: + +- **#1 (bulk resources)**: Needs to evaluate every `(parent, child)` in the `base` CTE. Can't use `ROW_NUMBER()` as easily because it needs the per-resource aggregates available for the `include_is_private` anonymous pass too. Outputs `reason` as JSON array and `is_private`. +- **#2 (single resource)**: Only checks one `(parent, child)`. Much simpler — just filter matching rules, rank, pick winner. Returns boolean. +- **#3 (test utility)**: Returns full resolution details (allow, reason, source_plugin, depth) for every candidate. Used in tests to verify the cascading logic itself. + +### Proposed design: One SQL builder, three callers + +Introduce a single function `build_cascading_ctes()` that generates the shared CTE fragment, then each caller wraps it with its own `SELECT` and extras. + +#### Step 1: Extract `build_rules_union_from_permission_sqls()` + +Both production paths (#1 and #2) already have nearly identical code to iterate over `PermissionSQL` objects, collect params, collect `restriction_sqls`, and build the UNION ALL. Factor this into a single shared function: + +```python +# In actions_sql.py (or a new shared module) + +@dataclass +class CollectedRules: + """Result of collecting PermissionSQL objects into SQL fragments.""" + rules_union: str # UNION ALL of all rule SELECTs + params: dict[str, Any] # All collected params + restriction_sqls: list[str] # restriction_sql fragments + +def collect_permission_rules( + permission_sqls: list[PermissionSQL], +) -> CollectedRules | None: + """ + Iterate PermissionSQL objects, build the UNION ALL, collect params + and restriction_sqls. Returns None if no rule SQL was found. + """ + rule_parts = [] + all_params = {} + restriction_sqls = [] + + for i, psql in enumerate(permission_sqls): + all_params.update(psql.params or {}) + if psql.restriction_sql: + restriction_sqls.append(psql.restriction_sql) + if psql.sql is None: + continue + # Parameterize source_plugin instead of interpolating (fixes Issue 5) + source_key = f"_src_{i}" + all_params[source_key] = psql.source + rule_parts.append( + f"SELECT parent, child, allow, reason, :{source_key} AS source_plugin" + f" FROM ({psql.sql})" + ) + + if not rule_parts: + return None + + return CollectedRules( + rules_union=" UNION ALL ".join(rule_parts), + params=all_params, + restriction_sqls=restriction_sqls, + ) +``` + +This already fixes **Issue 5** (`source_plugin` injection) as a side effect. + +#### Step 2: Extract `build_cascading_ctes()` + +The core cascading logic — given a `base` CTE and an `all_rules` CTE, produce a `decisions` CTE — can be expressed as a single function that returns CTE SQL fragments: + +```python +def build_cascading_ctes( + *, + rules_alias: str = "all_rules", + base_alias: str = "base", + include_reasons: bool = False, +) -> str: + """ + Return CTE SQL for child_lvl, parent_lvl, global_lvl, decisions. + + Expects the caller to already have defined CTEs named `base_alias` + (with columns: parent, child) and `rules_alias` (with columns: + parent, child, allow, reason, source_plugin). + + The output `decisions` CTE has columns: + parent, child, is_allowed, reason + Where `reason` is either a json_group_array (include_reasons=True) + or the single winning reason text. + """ + # The three level CTEs + level_ctes = [] + for level_name, join_condition in [ + ("child_lvl", f"ar.parent = b.parent AND ar.child = b.child"), + ("parent_lvl", f"ar.parent = b.parent AND ar.child IS NULL"), + ("global_lvl", f"ar.parent IS NULL AND ar.child IS NULL"), + ]: + reason_cols = "" + if include_reasons: + reason_cols = ( + ",\n json_group_array(CASE WHEN ar.allow = 0 " + "THEN ar.source_plugin || ': ' || ar.reason END) AS deny_reasons" + ",\n json_group_array(CASE WHEN ar.allow = 1 " + "THEN ar.source_plugin || ': ' || ar.reason END) AS allow_reasons" + ) + level_ctes.append(f"""{level_name} AS ( + SELECT b.parent, b.child, + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow{reason_cols} + FROM {base_alias} b + LEFT JOIN {rules_alias} ar ON {join_condition} + GROUP BY b.parent, b.child +)""") + + # The decisions CTE + if include_reasons: + reason_case = """ + CASE + WHEN cl.any_deny = 1 THEN cl.deny_reasons + WHEN cl.any_allow = 1 THEN cl.allow_reasons + WHEN pl.any_deny = 1 THEN pl.deny_reasons + WHEN pl.any_allow = 1 THEN pl.allow_reasons + WHEN gl.any_deny = 1 THEN gl.deny_reasons + WHEN gl.any_allow = 1 THEN gl.allow_reasons + ELSE '[]' + END AS reason""" + else: + reason_case = "'implicit deny' AS reason" # simple placeholder + + null_safe_join = ( + "b.parent = {a}.parent AND " + "(b.child = {a}.child OR (b.child IS NULL AND {a}.child IS NULL))" + ) + + decisions_cte = f"""decisions AS ( + SELECT + b.parent, b.child, + CASE + WHEN cl.any_deny = 1 THEN 0 + WHEN cl.any_allow = 1 THEN 1 + WHEN pl.any_deny = 1 THEN 0 + WHEN pl.any_allow = 1 THEN 1 + WHEN gl.any_deny = 1 THEN 0 + WHEN gl.any_allow = 1 THEN 1 + ELSE 0 + END AS is_allowed, + {reason_case} + FROM {base_alias} b + JOIN child_lvl cl ON {null_safe_join.format(a='cl')} + JOIN parent_lvl pl ON {null_safe_join.format(a='pl')} + JOIN global_lvl gl ON {null_safe_join.format(a='gl')} +)""" + + return ",\n".join(level_ctes) + ",\n" + decisions_cte +``` + +#### Step 3: Extract `build_restriction_filter()` + +Restriction handling is also duplicated. A single function can generate the restriction CTE and WHERE clause: + +```python +def build_restriction_filter(restriction_sqls: list[str]) -> tuple[str, str]: + """ + Returns (cte_sql, where_clause) for restriction filtering. + + cte_sql: ", restriction_list AS (...)" to append to WITH block + where_clause: "AND EXISTS (...)" to append to WHERE + """ + restriction_intersect = "\nINTERSECT\n".join( + f"SELECT * FROM ({sql})" for sql in restriction_sqls + ) + cte_sql = f",\nrestriction_list AS (\n {restriction_intersect}\n)" + where_clause = """ + AND EXISTS ( + SELECT 1 FROM restriction_list r + WHERE (r.parent = decisions.parent OR r.parent IS NULL) + AND (r.child = decisions.child OR r.child IS NULL) + )""" + return cte_sql, where_clause +``` + +#### Step 4: Rewrite the three callers + +**`_build_single_action_sql()` (bulk resources)** + +```python +async def _build_single_action_sql(datasette, actor, action, *, parent=None, + include_is_private=False): + action_obj = datasette.actions.get(action) + base_resources_sql = await action_obj.resource_class.resources_sql(datasette) + + permission_sqls = await gather_permission_sql_from_hooks(...) + if permission_sqls is SKIP_PERMISSION_CHECKS: + return ... # early return unchanged + + collected = collect_permission_rules(permission_sqls) + if collected is None: + return ... # empty result unchanged + + all_params = collected.params + + # Build WITH clause + cte_parts = [ + f"WITH\nbase AS (\n {base_resources_sql}\n)", + f"all_rules AS (\n {collected.rules_union}\n)", + ] + + # Anonymous rules for is_private (if needed) + if include_is_private: + anon_collected = ... # same anon logic as before, but using collect_permission_rules + cte_parts.append(f"anon_rules AS (\n {anon_collected.rules_union}\n)") + + # Core cascading logic — ONE call + cte_parts.append(build_cascading_ctes(include_reasons=True)) + + if include_is_private: + cte_parts.append(build_cascading_ctes( + rules_alias="anon_rules", + base_alias="base", + # Use different CTE names to avoid collision: + # This variant would need a prefix parameter, e.g. prefix="anon_" + )) + # ... or simpler: call a second time with aliased names + + # Restriction filter + restriction_cte = "" + restriction_where = "" + if collected.restriction_sqls: + restriction_cte, restriction_where = build_restriction_filter( + collected.restriction_sqls + ) + + # Final SELECT + select_cols = "parent, child, reason" + if include_is_private: + select_cols += ", is_private" + + query = ( + ",\n".join(cte_parts) + restriction_cte + + f"\nSELECT {select_cols}\nFROM decisions\nWHERE is_allowed = 1" + + restriction_where + + (f"\n AND parent = :filter_parent" if parent else "") + + "\nORDER BY parent, child" + ) + return query, all_params +``` + +**`check_permission_for_resource()` (single resource)** + +This can now be rewritten to use the same `build_cascading_ctes()` with a single-row `base`: + +```python +async def check_permission_for_resource(*, datasette, actor, action, parent, child): + rules_union, all_params, restriction_sqls = await build_permission_rules_sql( + datasette, actor, action + ) + if not rules_union: + return False + + all_params["_check_parent"] = parent + all_params["_check_child"] = child + + # Check restrictions first (unchanged fast-path) + if restriction_sqls: + ... # existing restriction check, unchanged + + # Use the shared cascading logic with a single-row base + base_sql = "SELECT :_check_parent AS parent, :_check_child AS child" + cascade = build_cascading_ctes() + + query = f""" +WITH +base AS ({base_sql}), +all_rules AS ({rules_union}), +{cascade} +SELECT COALESCE((SELECT is_allowed FROM decisions), 0) AS is_allowed +""" + result = await datasette.get_internal_database().execute(query, all_params) + return bool(result.rows[0][0]) if result.rows else False +``` + +This replaces the current depth/ROW_NUMBER approach with the same `child_lvl`/`parent_lvl`/`global_lvl` pattern, ensuring identical semantics. + +**`resolve_permissions_from_catalog()` (test utility)** + +This becomes a thin wrapper too. Since it's test-only, the main benefit is eliminating 250 lines of duplicated SQL: + +```python +async def resolve_permissions_from_catalog(db, actor, plugins, action, + candidate_sql, candidate_params=None, + *, implicit_deny=True): + # Resolve plugins (existing code, unchanged) + resolved_plugins, restriction_sqls = ... + + union_sql, rule_params = build_rules_union(actor, resolved_plugins) + all_params = {**(candidate_params or {}), **rule_params, "action": action} + + cascade = build_cascading_ctes( + include_reasons=True, + base_alias="cands", + rules_alias="rules", + ) + + # One query, no duplication + restriction_cte = "" + restriction_where = "" + if restriction_sqls: + restriction_cte, restriction_where = build_restriction_filter(restriction_sqls) + + sql = f""" + WITH + cands AS ({candidate_sql}), + rules AS ({union_sql}), + {cascade} + {restriction_cte} + SELECT + c.parent, c.child, + COALESCE(d.is_allowed, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, + d.reason, :action AS action, + ... + FROM cands c + LEFT JOIN decisions d ON c.parent = d.parent AND c.child = d.child + {restriction_where} + ORDER BY c.parent, c.child + """ + + rows = await db.execute(sql, {**all_params, "implicit_deny": ...}) + return [dict(r) for r in rows] +``` + +This eliminates the 135-line SQL triplication entirely. + +### The `include_is_private` complication + +The `include_is_private` path is the one wrinkle. It needs to run the cascading logic twice: once for the real actor and once for anonymous. The current code duplicates all three level CTEs with `anon_` prefixes. + +With the shared builder, we'd need `build_cascading_ctes()` to accept a `prefix` parameter so it can generate `anon_child_lvl`, `anon_parent_lvl`, etc.: + +```python +def build_cascading_ctes(*, rules_alias="all_rules", base_alias="base", + include_reasons=False, prefix=""): + # Use prefix for all CTE names: + # f"{prefix}child_lvl", f"{prefix}parent_lvl", etc. +``` + +Then the caller does: + +```python +ctes = build_cascading_ctes(include_reasons=True) # -> child_lvl, decisions +ctes += build_cascading_ctes(rules_alias="anon_rules", # -> anon_child_lvl, anon_decisions + prefix="anon_", + include_reasons=False) +``` + +And the final SELECT joins both `decisions` and `anon_decisions`. + +### Impact summary + +| Before | After | +|--------|-------| +| `_build_single_action_sql`: ~180 lines of CTE construction | ~40 lines + shared builder | +| `check_permission_for_resource`: ~35 lines of cascading SQL | ~10 lines + shared builder | +| `resolve_permissions_from_catalog`: ~250 lines, SQL tripled when restrictions present | ~30 lines + shared builder | +| `source_plugin` interpolated unsafely in 3 places | Parameterized in `collect_permission_rules()` | +| `build_rules_union` in `permissions.py` (test-only duplicate) | Replaced by shared `collect_permission_rules()` | + +Total: ~465 lines of SQL-building code reduced to ~80 lines of callers + ~80 lines of shared builders. Three implementations of cascading logic become one. + +### Migration plan + +1. Add `collect_permission_rules()` and `build_cascading_ctes()` and `build_restriction_filter()` to `actions_sql.py` (or a new `datasette/utils/permission_sql_builder.py`) +2. Rewrite `check_permission_for_resource()` to use the shared builder +3. Rewrite `_build_single_action_sql()` to use the shared builder (including `include_is_private` prefix support) +4. Rewrite `resolve_permissions_from_catalog()` to use the shared builder +5. Delete `build_rules_union()` from `permissions.py` +6. Run the full test suite — all 263 tests must still pass since behavior is unchanged +7. Verify via `?_trace=1` that the generated SQL is correct and equivalently performant diff --git a/tests/test_utils_permissions.py b/tests/test_utils_permissions.py index b412de0f9a..b1ba309121 100644 --- a/tests/test_utils_permissions.py +++ b/tests/test_utils_permissions.py @@ -1,612 +1,665 @@ +""" +Tests for cascading permission resolution logic. + +These tests verify the core cascading semantics through the production +code paths (allowed_resources / allowed) rather than through a separate +test-only SQL builder. Every test registers a lightweight plugin via +``ds.pm.register`` and calls the real ``Datasette.allowed_resources()`` +and/or ``Datasette.allowed()`` methods. + +Cascading semantics tested: + 1. child (depth 2) > parent (depth 1) > global (depth 0) + 2. DENY beats ALLOW at the same depth + 3. No matching rule → implicit deny + 4. Multiple plugins can contribute rules with independent parameters + 5. :actor, :actor_id, :action are available in SQL +""" + import pytest +import pytest_asyncio from datasette.app import Datasette from datasette.permissions import PermissionSQL -from datasette.utils.permissions import resolve_permissions_from_catalog -from typing import Callable, List +from datasette.resources import TableResource, DatabaseResource +from datasette import hookimpl + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class PermissionRulesPlugin: + """Thin shim that delegates to a callback for permission_resources_sql.""" + + def __init__(self, rules_callback): + self.rules_callback = rules_callback + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + return self.rules_callback(datasette, actor, action) -@pytest.fixture -def db(): - ds = Datasette() - import tempfile - from datasette.database import Database - path = tempfile.mktemp(suffix="demo.db") - db = ds.add_database(Database(ds, path=path)) - return db +@pytest_asyncio.fixture +async def ds(): + """ + Create a Datasette instance with catalog tables that mirror the + original test_utils_permissions layout: + + databases: perm_accounting, perm_hr, perm_analytics + tables per db: table01..table10 + special tables: perm_accounting/sales, perm_analytics/secret + + Uses default_deny=True so that only the test-registered plugins + determine permission outcomes (no built-in default-allow rules). + + Database names are prefixed with ``perm_`` to avoid collisions with + other test fixtures that create memory databases in the same process. + """ + instance = Datasette(default_deny=True) + await instance.invoke_startup() + + per_parent = 10 + parents = ["perm_accounting", "perm_hr", "perm_analytics"] + specials = { + "perm_accounting": ["sales"], + "perm_analytics": ["secret"], + "perm_hr": [], + } + + for parent in parents: + db = instance.add_memory_database(parent) + base_tables = [f"table{i:02d}" for i in range(1, per_parent + 1)] + for s in specials.get(parent, []): + if s not in base_tables: + base_tables[0] = s + for tbl in base_tables: + await db.execute_write( + f"CREATE TABLE IF NOT EXISTS [{tbl}] (id INTEGER PRIMARY KEY)" + ) + + await instance._refresh_schemas() + yield instance + # Cleanup: remove databases to avoid polluting other tests + for parent in parents: + instance.remove_database(parent) -NO_RULES_SQL = ( - "SELECT NULL AS parent, NULL AS child, NULL AS allow, NULL AS reason WHERE 0" -) +# --------------------------------------------------------------------------- +# Plugin factories — return callables suitable for PermissionRulesPlugin +# --------------------------------------------------------------------------- -def plugin_allow_all_for_user(user: str) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: +def _cb_allow_all_for_user(user): + """Global allow for a specific user.""" + + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT NULL AS parent, NULL AS child, 1 AS allow, - 'global allow for ' || :allow_all_user || ' on ' || :allow_all_action AS reason - WHERE :actor_id = :allow_all_user - """, - {"allow_all_user": user, "allow_all_action": action}, + sql=( + "SELECT NULL AS parent, NULL AS child, 1 AS allow, " + "'global allow for ' || :_aau_user || ' on ' || :action AS reason" + ), + params={"_aau_user": user}, ) - return provider + return cb -def plugin_deny_specific_table( - user: str, parent: str, child: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: +def _cb_deny_specific_table(user, parent, child): + """Child-level deny for a specific user + table.""" + + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :deny_specific_table_parent AS parent, :deny_specific_table_child AS child, 0 AS allow, - 'deny ' || :deny_specific_table_parent || '/' || :deny_specific_table_child || ' for ' || :deny_specific_table_user || ' on ' || :deny_specific_table_action AS reason - WHERE :actor_id = :deny_specific_table_user - """, - { - "deny_specific_table_parent": parent, - "deny_specific_table_child": child, - "deny_specific_table_user": user, - "deny_specific_table_action": action, - }, + sql=( + "SELECT :_dst_parent AS parent, :_dst_child AS child, 0 AS allow, " + "'deny ' || :_dst_parent || '/' || :_dst_child || ' for ' || :_dst_user AS reason" + ), + params={"_dst_parent": parent, "_dst_child": child, "_dst_user": user}, ) - return provider + return cb -def plugin_org_policy_deny_parent(parent: str) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: +def _cb_org_policy_deny_parent(parent): + """Unconditional parent-level deny (applies to all actors).""" + + def cb(datasette, actor, action): return PermissionSQL( - """ - SELECT :org_policy_parent_deny_parent AS parent, NULL AS child, 0 AS allow, - 'org policy: parent ' || :org_policy_parent_deny_parent || ' denied on ' || :org_policy_parent_deny_action AS reason - """, - { - "org_policy_parent_deny_parent": parent, - "org_policy_parent_deny_action": action, - }, + sql=( + "SELECT :_opd_parent AS parent, NULL AS child, 0 AS allow, " + "'org policy: deny ' || :_opd_parent AS reason" + ), + params={"_opd_parent": parent}, ) - return provider + return cb + +def _cb_allow_parent_for_user(user, parent): + """Parent-level allow for a specific user.""" -def plugin_allow_parent_for_user( - user: str, parent: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :allow_parent_parent AS parent, NULL AS child, 1 AS allow, - 'allow full parent for ' || :allow_parent_user || ' on ' || :allow_parent_action AS reason - WHERE :actor_id = :allow_parent_user - """, - { - "allow_parent_parent": parent, - "allow_parent_user": user, - "allow_parent_action": action, - }, + sql=( + "SELECT :_apu_parent AS parent, NULL AS child, 1 AS allow, " + "'allow parent ' || :_apu_parent || ' for ' || :_apu_user AS reason" + ), + params={"_apu_parent": parent, "_apu_user": user}, ) - return provider + return cb + +def _cb_child_allow_for_user(user, parent, child): + """Child-level allow for a specific user.""" -def plugin_child_allow_for_user( - user: str, parent: str, child: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :allow_child_parent AS parent, :allow_child_child AS child, 1 AS allow, - 'allow child for ' || :allow_child_user || ' on ' || :allow_child_action AS reason - WHERE :actor_id = :allow_child_user - """, - { - "allow_child_parent": parent, - "allow_child_child": child, - "allow_child_user": user, - "allow_child_action": action, - }, + sql=( + "SELECT :_cau_parent AS parent, :_cau_child AS child, 1 AS allow, " + "'allow child ' || :_cau_parent || '/' || :_cau_child || ' for ' || :_cau_user AS reason" + ), + params={"_cau_parent": parent, "_cau_child": child, "_cau_user": user}, ) - return provider + return cb + +def _cb_root_deny_for_all(): + """Unconditional global deny.""" -def plugin_root_deny_for_all() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: + def cb(datasette, actor, action): return PermissionSQL( - """ - SELECT NULL AS parent, NULL AS child, 0 AS allow, 'root deny for all on ' || :root_deny_action AS reason - """, - {"root_deny_action": action}, + sql=( + "SELECT NULL AS parent, NULL AS child, 0 AS allow, " + "'root deny for all' AS reason" + ), ) - return provider + return cb + +def _cb_conflicting_same_child_rules(user, parent, child): + """Two plugins: one allow + one deny at the same child level.""" -def plugin_conflicting_same_child_rules( - user: str, parent: str, child: str -) -> List[Callable[[str], PermissionSQL]]: - def allow_provider(action: str) -> PermissionSQL: + def cb_allow(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :conflict_child_allow_parent AS parent, :conflict_child_allow_child AS child, 1 AS allow, - 'team grant at child for ' || :conflict_child_allow_user || ' on ' || :conflict_child_allow_action AS reason - WHERE :actor_id = :conflict_child_allow_user - """, - { - "conflict_child_allow_parent": parent, - "conflict_child_allow_child": child, - "conflict_child_allow_user": user, - "conflict_child_allow_action": action, - }, + sql=( + "SELECT :_csca_parent AS parent, :_csca_child AS child, 1 AS allow, " + "'team grant at child' AS reason" + ), + params={"_csca_parent": parent, "_csca_child": child}, ) - def deny_provider(action: str) -> PermissionSQL: + def cb_deny(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :conflict_child_deny_parent AS parent, :conflict_child_deny_child AS child, 0 AS allow, - 'exception deny at child for ' || :conflict_child_deny_user || ' on ' || :conflict_child_deny_action AS reason - WHERE :actor_id = :conflict_child_deny_user - """, - { - "conflict_child_deny_parent": parent, - "conflict_child_deny_child": child, - "conflict_child_deny_user": user, - "conflict_child_deny_action": action, - }, + sql=( + "SELECT :_cscd_parent AS parent, :_cscd_child AS child, 0 AS allow, " + "'exception deny at child' AS reason" + ), + params={"_cscd_parent": parent, "_cscd_child": child}, ) - return [allow_provider, deny_provider] + return cb_allow, cb_deny + +def _cb_allow_all_for_action(user, allowed_action): + """Global allow for a specific user on a specific action only.""" -def plugin_allow_all_for_action( - user: str, allowed_action: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: + def cb(datasette, actor, action): if action != allowed_action: - return PermissionSQL(NO_RULES_SQL) - # Sanitize parameter names by replacing hyphens with underscores - param_prefix = action.replace("-", "_") + return None + if not actor or actor.get("id") != user: + return None return PermissionSQL( - f""" - SELECT NULL AS parent, NULL AS child, 1 AS allow, - 'global allow for ' || :{param_prefix}_user || ' on ' || :{param_prefix}_action AS reason - WHERE :actor_id = :{param_prefix}_user - """, - {f"{param_prefix}_user": user, f"{param_prefix}_action": action}, + sql=( + "SELECT NULL AS parent, NULL AS child, 1 AS allow, " + "'global allow for ' || :_aafa_user || ' on ' || :action AS reason" + ), + params={"_aafa_user": user}, ) - return provider + return cb -VIEW_TABLE = "view-table" +# --------------------------------------------------------------------------- +# Helpers for asserting results +# --------------------------------------------------------------------------- -# ---------- Catalog DDL (from your schema) ---------- -CATALOG_DDL = """ -CREATE TABLE IF NOT EXISTS catalog_databases ( - database_name TEXT PRIMARY KEY, - path TEXT, - is_memory INTEGER, - schema_version INTEGER -); -CREATE TABLE IF NOT EXISTS catalog_tables ( - database_name TEXT, - table_name TEXT, - rootpage INTEGER, - sql TEXT, - PRIMARY KEY (database_name, table_name), - FOREIGN KEY (database_name) REFERENCES catalog_databases(database_name) -); -""" - -PARENTS = ["accounting", "hr", "analytics"] -SPECIALS = {"accounting": ["sales"], "analytics": ["secret"], "hr": []} - -TABLE_CANDIDATES_SQL = ( - "SELECT database_name AS parent, table_name AS child FROM catalog_tables" -) -PARENT_CANDIDATES_SQL = ( - "SELECT database_name AS parent, NULL AS child FROM catalog_databases" -) - - -# ---------- Helpers ---------- -async def seed_catalog(db, per_parent: int = 10) -> None: - await db.execute_write_script(CATALOG_DDL) - # databases - db_rows = [(p, f"/{p}.db", 0, 1) for p in PARENTS] - await db.execute_write_many( - "INSERT OR REPLACE INTO catalog_databases(database_name, path, is_memory, schema_version) VALUES (?,?,?,?)", - db_rows, - ) +def _allowed_set(resources): + """Convert PaginatedResources.resources to {(parent, child), ...}.""" + return {(r.parent, r.child) for r in resources} - # tables - def tables_for(parent: str, n: int): - base = [f"table{i:02d}" for i in range(1, n + 1)] - for s in SPECIALS.get(parent, []): - if s not in base: - base[0] = s - return base - - table_rows = [] - for p in PARENTS: - for t in tables_for(p, per_parent): - table_rows.append((p, t, 0, f"CREATE TABLE {t} (id INTEGER PRIMARY KEY)")) - await db.execute_write_many( - "INSERT OR REPLACE INTO catalog_tables(database_name, table_name, rootpage, sql) VALUES (?,?,?,?)", - table_rows, - ) +def _allowed_set_for_parent(resources, parent): + return {(r.parent, r.child) for r in resources if r.parent == parent} -def res_allowed(rows, parent=None): - return sorted( - r["resource"] - for r in rows - if r["allow"] == 1 and (parent is None or r["parent"] == parent) - ) +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- -def res_denied(rows, parent=None): - return sorted( - r["resource"] - for r in rows - if r["allow"] == 0 and (parent is None or r["parent"] == parent) - ) +VIEW_TABLE = "view-table" -# ---------- Tests ---------- @pytest.mark.asyncio -async def test_alice_global_allow_with_specific_denies_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_allow_all_for_user("alice"), - plugin_deny_specific_table("alice", "accounting", "sales"), - plugin_org_policy_deny_parent("hr"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "alice"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - # Alice can see everything except accounting/sales and hr/* - assert "/accounting/sales" in res_denied(rows) - for r in rows: - if r["parent"] == "hr": - assert r["allow"] == 0 - elif r["resource"] == "/accounting/sales": - assert r["allow"] == 0 - else: - assert r["allow"] == 1 +async def test_alice_global_allow_with_specific_denies(ds): + """ + Alice has global allow, but: + - accounting/sales is denied at child level + - hr/* is denied at parent level + She should see everything except those. + """ + # Combine three plugin callbacks into one that returns a list + deny_table_cb = _cb_deny_specific_table("alice", "perm_accounting", "sales") + deny_parent_cb = _cb_org_policy_deny_parent("perm_hr") + allow_cb = _cb_allow_all_for_user("alice") + + def combined(datasette, actor, action): + results = [] + for cb in (allow_cb, deny_table_cb, deny_parent_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "alice"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + allowed = _allowed_set(result.resources) + + # accounting/sales should be denied (child deny beats global allow) + assert ("perm_accounting", "sales") not in allowed + + # All hr tables should be denied (parent deny beats global allow) + hr_tables = {(p, c) for p, c in allowed if p == "perm_hr"} + assert len(hr_tables) == 0 + + # analytics tables should all be allowed + analytics_tables = {(p, c) for p, c in allowed if p == "perm_analytics"} + assert len(analytics_tables) == 10 + + # accounting tables (except sales) should be allowed + accounting_tables = {(p, c) for p, c in allowed if p == "perm_accounting"} + assert len(accounting_tables) == 9 # 10 - 1 denied + + # Verify with allowed() single-resource checks + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "sales"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_hr", "table01"), + actor=actor, + ) + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_carol_parent_allow_but_child_conflict_deny_wins_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_org_policy_deny_parent("hr"), - plugin_allow_parent_for_user("carol", "analytics"), - *plugin_conflicting_same_child_rules("carol", "analytics", "secret"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "carol"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, +async def test_parent_allow_but_child_conflict_deny_wins(ds): + """ + Carol has parent-level allow on analytics, but there are conflicting + child-level allow + deny on analytics/secret. DENY should win. + hr is parent-level denied. + """ + cb_allow, cb_deny = _cb_conflicting_same_child_rules( + "carol", "perm_analytics", "secret" ) - allowed_analytics = res_allowed(rows, parent="analytics") - denied_analytics = res_denied(rows, parent="analytics") - - assert "/analytics/secret" in denied_analytics - # 10 analytics children total, 1 denied - assert len(allowed_analytics) == 9 + allow_parent_cb = _cb_allow_parent_for_user("carol", "perm_analytics") + deny_parent_cb = _cb_org_policy_deny_parent("perm_hr") + + def combined(datasette, actor, action): + results = [] + for cb in (deny_parent_cb, allow_parent_cb, cb_allow, cb_deny): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "carol"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + analytics_allowed = _allowed_set_for_parent(result.resources, "perm_analytics") + + # analytics/secret should be denied (child deny beats child allow) + assert ("perm_analytics", "secret") not in analytics_allowed + # 10 analytics tables, 1 denied + assert len(analytics_allowed) == 9 + + # Verify via allowed() + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "secret"), + actor=actor, + ) + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table02"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_specificity_child_allow_overrides_parent_deny_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_allow_all_for_user("alice"), - plugin_org_policy_deny_parent("analytics"), # parent-level deny - plugin_child_allow_for_user( - "alice", "analytics", "table02" - ), # child allow beats parent deny - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "alice"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - - # table02 allowed, other analytics tables denied - assert any(r["resource"] == "/analytics/table02" and r["allow"] == 1 for r in rows) - assert all( - (r["parent"] != "analytics" or r["child"] == "table02" or r["allow"] == 0) - for r in rows - ) +async def test_specificity_child_allow_overrides_parent_deny(ds): + """ + analytics is parent-level denied, but alice has a child-level allow + on analytics/table02. Child beats parent. + """ + allow_cb = _cb_allow_all_for_user("alice") + deny_parent_cb = _cb_org_policy_deny_parent("perm_analytics") + child_allow_cb = _cb_child_allow_for_user("alice", "perm_analytics", "table02") + + def combined(datasette, actor, action): + results = [] + for cb in (allow_cb, deny_parent_cb, child_allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "alice"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + analytics_allowed = _allowed_set_for_parent(result.resources, "perm_analytics") + + # table02 should be allowed (child allow beats parent deny) + assert ("perm_analytics", "table02") in analytics_allowed + # All other analytics tables should be denied (parent deny, no child rule) + assert len(analytics_allowed) == 1 + + # Verify via allowed() + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table02"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_root_deny_all_but_parent_allow_rescues_specific_parent_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_root_deny_for_all(), # root deny - plugin_allow_parent_for_user( - "bob", "accounting" - ), # parent allow (more specific) - ] - rows = await resolve_permissions_from_catalog( - db, {"id": "bob"}, plugins, VIEW_TABLE, TABLE_CANDIDATES_SQL, implicit_deny=True - ) - for r in rows: - if r["parent"] == "accounting": - assert r["allow"] == 1 - else: - assert r["allow"] == 0 +async def test_root_deny_all_but_parent_allow_rescues_specific_parent(ds): + """ + Global deny for all, but bob has parent-level allow on accounting. + Parent beats global. + """ + deny_cb = _cb_root_deny_for_all() + allow_cb = _cb_allow_parent_for_user("bob", "perm_accounting") + + def combined(datasette, actor, action): + results = [] + for cb in (deny_cb, allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "bob"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + allowed = _allowed_set(result.resources) + + # Only accounting tables should be allowed + assert all(p == "perm_accounting" for p, c in allowed) + assert len(allowed) == 10 + + # Verify via allowed() + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_hr", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_parent_scoped_candidates(db): - await seed_catalog(db) - plugins = [ - plugin_org_policy_deny_parent("hr"), - plugin_allow_parent_for_user("carol", "analytics"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "carol"}, - plugins, - VIEW_TABLE, - PARENT_CANDIDATES_SQL, - implicit_deny=True, - ) - d = {r["resource"]: r["allow"] for r in rows} - assert d["/analytics"] == 1 - assert d["/hr"] == 0 +async def test_parent_scoped_action(ds): + """ + For parent-scoped resources (databases), verify cascading. + analytics allowed, hr denied, accounting implicitly denied. + """ + deny_cb = _cb_org_policy_deny_parent("perm_hr") + allow_cb = _cb_allow_parent_for_user("carol", "perm_analytics") + + def combined(datasette, actor, action): + results = [] + for cb in (deny_cb, allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "carol"} + result = await ds.allowed_resources("view-database", actor) + allowed = {r.parent for r in result.resources} + + assert "perm_analytics" in allowed + # hr is explicitly denied, accounting has no matching rule → implicit deny + assert "perm_hr" not in allowed + + # Verify via allowed() + assert await ds.allowed( + action="view-database", + resource=DatabaseResource("perm_analytics"), + actor=actor, + ) + assert not await ds.allowed( + action="view-database", + resource=DatabaseResource("perm_hr"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_implicit_deny_behavior(db): - await seed_catalog(db) - plugins = [] # no rules at all - - # implicit_deny=True -> everything denied with reason 'implicit deny' - rows = await resolve_permissions_from_catalog( - db, - {"id": "erin"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - assert all(r["allow"] == 0 and r["reason"] == "implicit deny" for r in rows) - - # implicit_deny=False -> no winner => allow is None, reason is None - rows2 = await resolve_permissions_from_catalog( - db, - {"id": "erin"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=False, - ) - assert all(r["allow"] is None and r["reason"] is None for r in rows2) +async def test_implicit_deny_when_no_rules(ds): + """ + When no plugins return any rules, everything is denied (implicit deny). + """ + def no_rules(datasette, actor, action): + return None -@pytest.mark.asyncio -async def test_candidate_filters_via_params(db): - await seed_catalog(db) - # Add some metadata to test filtering - # Mark 'hr' as is_memory=1 and increment analytics schema_version - await db.execute_write( - "UPDATE catalog_databases SET is_memory=1 WHERE database_name='hr'" - ) - await db.execute_write( - "UPDATE catalog_databases SET schema_version=2 WHERE database_name='analytics'" - ) + plugin = PermissionRulesPlugin(no_rules) + ds.pm.register(plugin, name="test_plugin") - # Candidate SQL that filters by db metadata via params - candidate_sql = """ - SELECT t.database_name AS parent, t.table_name AS child - FROM catalog_tables t - JOIN catalog_databases d ON d.database_name = t.database_name - WHERE (:exclude_memory = 1 AND d.is_memory = 1) IS NOT 1 - AND (:min_schema_version IS NULL OR d.schema_version >= :min_schema_version) - """ + try: + actor = {"id": "erin"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 0 - plugins = [ - plugin_root_deny_for_all(), - plugin_allow_parent_for_user( - "dev", "analytics" - ), # analytics rescued if included by candidates - ] - - # Case 1: exclude memory dbs, require schema_version >= 2 -> only analytics appear, and thus are allowed - rows = await resolve_permissions_from_catalog( - db, - {"id": "dev"}, - plugins, - VIEW_TABLE, - candidate_sql, - candidate_params={"exclude_memory": 1, "min_schema_version": 2}, - implicit_deny=True, - ) - assert rows and all(r["parent"] == "analytics" for r in rows) - assert all(r["allow"] == 1 for r in rows) - - # Case 2: include memory dbs, min_schema_version = None -> accounting/hr/analytics appear, - # but root deny wins except where specifically allowed (none except analytics parent allow doesn’t apply to table depth if candidate includes children; still fine—policy is explicit). - rows2 = await resolve_permissions_from_catalog( - db, - {"id": "dev"}, - plugins, - VIEW_TABLE, - candidate_sql, - candidate_params={"exclude_memory": 0, "min_schema_version": None}, - implicit_deny=True, - ) - assert any(r["parent"] == "accounting" for r in rows2) - assert any(r["parent"] == "hr" for r in rows2) - # For table-scoped candidates, the parent-level allow does not override root deny unless you have child-level rules - assert all(r["allow"] in (0, 1) for r in rows2) + # Single resource check too + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_action_specific_rules(db): - await seed_catalog(db) - plugins = [plugin_allow_all_for_action("dana", VIEW_TABLE)] - - view_rows = await resolve_permissions_from_catalog( - db, - {"id": "dana"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - assert view_rows and all(r["allow"] == 1 for r in view_rows) - assert all(r["action"] == VIEW_TABLE for r in view_rows) - - insert_rows = await resolve_permissions_from_catalog( - db, - {"id": "dana"}, - plugins, - "insert-row", - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - assert insert_rows and all(r["allow"] == 0 for r in insert_rows) - assert all(r["reason"] == "implicit deny" for r in insert_rows) - assert all(r["action"] == "insert-row" for r in insert_rows) +async def test_action_specific_rules(ds): + """ + Rules that only apply to view-table should not grant insert-row. + """ + cb = _cb_allow_all_for_action("dana", VIEW_TABLE) + plugin = PermissionRulesPlugin(cb) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "dana"} + + # view-table should be allowed + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 30 # 3 dbs x 10 tables + + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + + # insert-row should be denied (no rules for it) + assert not await ds.allowed( + action="insert-row", + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_actor_actor_id_action_parameters_available(db): - """Test that :actor (JSON), :actor_id, and :action are all available in SQL""" - await seed_catalog(db) - - def plugin_using_all_parameters() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ +async def test_actor_parameters_available_in_sql(ds): + """ + Test that :actor (JSON), :actor_id, and :action are all available in plugin SQL. + """ + + def cb(datasette, actor, action): + return PermissionSQL( + sql=""" SELECT NULL AS parent, NULL AS child, 1 AS allow, 'Actor ID: ' || COALESCE(:actor_id, 'null') || - ', Actor JSON: ' || COALESCE(:actor, 'null') || ', Action: ' || :action AS reason WHERE :actor_id = 'test_user' AND :action = 'view-table' AND json_extract(:actor, '$.role') = 'admin' - """ - ) + """, + params={}, # :actor_id, :actor, :action are added by the framework + ) - return provider + plugin = PermissionRulesPlugin(cb) + ds.pm.register(plugin, name="test_plugin") - plugins = [plugin_using_all_parameters()] + try: + actor = {"id": "test_user", "role": "admin"} - # Test with full actor dict - rows = await resolve_permissions_from_catalog( - db, - {"id": "test_user", "role": "admin"}, - plugins, - "view-table", - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) + # Should be allowed because the SQL conditions are met + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) - # Should have allowed rows with reason containing all the info - allowed = [r for r in rows if r["allow"] == 1] - assert len(allowed) > 0 + # Different actor should be denied + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor={"id": "other_user", "role": "admin"}, + ) - # Check that the reason string contains evidence of all parameters - reason = allowed[0]["reason"] - assert "test_user" in reason - assert "view-table" in reason - # The :actor parameter should be the JSON string - assert "Actor JSON:" in reason + # Verify allowed_resources also works + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 30 # all tables allowed + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_multiple_plugins_with_own_parameters(db): +async def test_multiple_plugins_with_own_parameters(ds): """ - Test that multiple plugins can use their own parameter names without conflict. - - This verifies that the parameter naming convention works: plugins prefix their - parameters (e.g., :plugin1_pattern, :plugin2_message) and both sets of parameters - are successfully bound in the SQL queries. + Multiple plugins can use their own parameter names without conflict. """ - await seed_catalog(db) - - def plugin_one() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - if action != "view-table": - return PermissionSQL("plugin_one", "SELECT NULL WHERE 0", {}) - return PermissionSQL( - """ + + def cb_one(datasette, actor, action): + if action != VIEW_TABLE: + return None + return PermissionSQL( + sql=""" SELECT database_name AS parent, table_name AS child, - 1 AS allow, 'Plugin one used param: ' || :plugin1_param AS reason + 1 AS allow, 'Plugin one: ' || :p1_param AS reason FROM catalog_tables - WHERE database_name = 'accounting' - """, - { - "plugin1_param": "value1", - }, - ) - - return provider + WHERE database_name = 'perm_accounting' + """, + params={"p1_param": "value1"}, + ) - def plugin_two() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - if action != "view-table": - return PermissionSQL("plugin_two", "SELECT NULL WHERE 0", {}) - return PermissionSQL( - """ + def cb_two(datasette, actor, action): + if action != VIEW_TABLE: + return None + return PermissionSQL( + sql=""" SELECT database_name AS parent, table_name AS child, - 1 AS allow, 'Plugin two used param: ' || :plugin2_param AS reason + 1 AS allow, 'Plugin two: ' || :p2_param AS reason FROM catalog_tables - WHERE database_name = 'hr' - """, - { - "plugin2_param": "value2", - }, - ) - - return provider - - plugins = [plugin_one(), plugin_two()] - - rows = await resolve_permissions_from_catalog( - db, - {"id": "test_user"}, - plugins, - "view-table", - TABLE_CANDIDATES_SQL, - implicit_deny=False, - ) + WHERE database_name = 'perm_hr' + """, + params={"p2_param": "value2"}, + ) - # Both plugins should contribute results with their parameters successfully bound - plugin_one_rows = [ - r for r in rows if r.get("reason") and "Plugin one" in r["reason"] - ] - plugin_two_rows = [ - r for r in rows if r.get("reason") and "Plugin two" in r["reason"] - ] - - assert len(plugin_one_rows) > 0, "Plugin one should contribute rules" - assert len(plugin_two_rows) > 0, "Plugin two should contribute rules" - - # Verify each plugin's parameters were successfully bound in the SQL - assert any( - "value1" in r.get("reason", "") for r in plugin_one_rows - ), "Plugin one's :plugin1_param should be bound" - assert any( - "value2" in r.get("reason", "") for r in plugin_two_rows - ), "Plugin two's :plugin2_param should be bound" + plugin_one = PermissionRulesPlugin(cb_one) + plugin_two = PermissionRulesPlugin(cb_two) + ds.pm.register(plugin_one, name="test_plugin_one") + ds.pm.register(plugin_two, name="test_plugin_two") + + try: + actor = {"id": "test_user"} + result = await ds.allowed_resources(VIEW_TABLE, actor, include_reasons=True) + allowed = _allowed_set(result.resources) + + # Both plugins should contribute — accounting from plugin one, hr from plugin two + accounting_allowed = {(p, c) for p, c in allowed if p == "perm_accounting"} + hr_allowed = {(p, c) for p, c in allowed if p == "perm_hr"} + + assert len(accounting_allowed) == 10 + assert len(hr_allowed) == 10 + + # Check reasons contain the parameterized values + for r in result.resources: + if r.parent == "perm_accounting": + assert any("value1" in reason for reason in r.reasons) + elif r.parent == "perm_hr": + assert any("value2" in reason for reason in r.reasons) + finally: + ds.pm.unregister(plugin_one, name="test_plugin_one") + ds.pm.unregister(plugin_two, name="test_plugin_two")