diff --git a/cloudsplaining/scan/authorization_details.py b/cloudsplaining/scan/authorization_details.py index 498f8f59..7d6c50a2 100644 --- a/cloudsplaining/scan/authorization_details.py +++ b/cloudsplaining/scan/authorization_details.py @@ -10,6 +10,9 @@ import logging from typing import Any +from cloudsplaining.shared.aws_principal import AWSPrincipal +from cloudsplaining.scan.policy_document import PolicyDocument + from policy_sentry.querying.actions import get_all_action_links from policy_sentry.querying.all import get_all_service_prefixes @@ -19,6 +22,8 @@ from cloudsplaining.scan.user_details import UserDetailList from cloudsplaining.shared.exclusions import DEFAULT_EXCLUSIONS, Exclusions + + all_service_prefixes = get_all_service_prefixes() logger = logging.getLogger(__name__) @@ -98,10 +103,26 @@ def __init__( "roles": self.role_detail_list.json, } + self.policies.set_iam_data(iam_data) self.group_detail_list.set_iam_data(iam_data) self.user_detail_list.set_iam_data(iam_data) self.role_detail_list.set_iam_data(iam_data) + from cloudsplaining.scan.policy_document import PolicyDocument + + for role_detail in self.role_details: + # Collect all policies (attached + inline) + policy_docs = [p.policy_document for p in role_detail.policies] + + # Merge them + merged_doc = PolicyDocument.merge(policy_docs) + + if merged_doc: + composite_escalations = merged_doc.allows_privilege_escalation() + role_detail.composite_privilege_escalation_paths = composite_escalations + + + @property def inline_policies(self) -> dict[str, dict[str, Any]]: diff --git a/cloudsplaining/scan/policy_document.py b/cloudsplaining/scan/policy_document.py index 0d85ce9f..a75b6a13 100644 --- a/cloudsplaining/scan/policy_document.py +++ b/cloudsplaining/scan/policy_document.py @@ -6,6 +6,8 @@ # Licensed under the BSD 3-Clause license. # For full license text, see the LICENSE file in the repo root # or https://opensource.org/licenses/BSD-3-Clause +# cloudsplaining/scan/policy_document.py +from copy import deepcopy from __future__ import annotations import logging @@ -63,6 +65,27 @@ def __init__( flag_resource_arn_statements=self.flag_resource_arn_statements, ) ) + + @staticmethod + def merge_policy_documents(policy_documents): + """ + Merge multiple PolicyDocument objects into a single composite PolicyDocument. + Consolidates all 'Allow' and 'Deny' statements. + """ + if not policy_documents: + return None + + merged_data = {"Version": "2012-10-17", "Statement": []} + + for policy in policy_documents: + doc = policy.document # already parsed JSON dict + statements = doc.get("Statement", []) + if isinstance(statements, dict): + statements = [statements] + merged_data["Statement"].extend(deepcopy(statements)) + + return PolicyDocument(merged_data) + @property def json(self) -> dict[str, Any]: diff --git a/cloudsplaining/scan/role_details.py b/cloudsplaining/scan/role_details.py index d7e9f078..a346febf 100644 --- a/cloudsplaining/scan/role_details.py +++ b/cloudsplaining/scan/role_details.py @@ -6,6 +6,7 @@ import json import logging from typing import TYPE_CHECKING, Any +from cloudsplaining.scan.policy_document import PolicyDocument from policy_sentry.util.arns import get_account_from_arn @@ -72,18 +73,34 @@ def __init__( this_role_path, ) else: - self.roles.append( - RoleDetail( - role_detail, - policy_details, - exclusions=exclusions, - flag_conditional_statements=self.flag_conditional_statements, - flag_resource_arn_statements=self.flag_resource_arn_statements, - flag_trust_policies=flag_trust_policies, - severity=self.severity, - ) + # ✅ Define role_obj first + role_obj = RoleDetail( + role_detail, + policy_details, + exclusions=exclusions, + flag_conditional_statements=self.flag_conditional_statements, + flag_resource_arn_statements=self.flag_resource_arn_statements, + flag_trust_policies=flag_trust_policies, + severity=self.severity, ) + # Append the role to the list + self.roles.append(role_obj) + + # ----------------------------- + # Composite Privilege Escalation Logic + # ----------------------------- + policy_documents = [ + p.policy_document + for p in getattr(role_obj, "attached_policies", []) + getattr(role_obj, "inline_policies", []) + ] + + merged_doc = PolicyDocument.merge(policy_documents) + if merged_doc: + composite_escalations = merged_doc.allows_privilege_escalation() + role_obj.add_composite_escalations(composite_escalations) + + def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: self.iam_data = iam_data for role in self.roles: diff --git a/cloudsplaining/shared/aws_principal.py b/cloudsplaining/shared/aws_principal.py new file mode 100644 index 00000000..aeebd225 --- /dev/null +++ b/cloudsplaining/shared/aws_principal.py @@ -0,0 +1,42 @@ +# cloudsplaining/shared/aws_principal.py +""" +AWS Principal Data Model +Holds IAM principal information (User, Role, Group) and privilege escalation findings. +""" + +from typing import List + + +class AWSPrincipal: + """Base class representing a generic AWS IAM Principal.""" + + def __init__(self, name: str, arn: str, policies: list): + self.name = name + self.arn = arn + self.policies = policies + + # Existing per-policy privilege escalation findings + self.privilege_escalation: List[str] = [] + + # New composite findings discovered from merged policies + self.composite_privilege_escalation_paths: List[str] = [] + + def add_composite_escalations(self, escalation_paths: list[str]): + """Add findings from merged policy analysis.""" + if escalation_paths: + self.composite_privilege_escalation_paths.extend(escalation_paths) + + +class AWSUser(AWSPrincipal): + """IAM User Principal""" + pass + + +class AWSRole(AWSPrincipal): + """IAM Role Principal""" + pass + + +class AWSGroup(AWSPrincipal): + """IAM Group Principal""" + pass diff --git a/docs/report/cloudsplaining_limitation.md b/docs/report/cloudsplaining_limitation.md new file mode 100644 index 00000000..3ea4a4b9 --- /dev/null +++ b/docs/report/cloudsplaining_limitation.md @@ -0,0 +1,8 @@ +### Multi-Policy Privilege Escalation Detection + +**Previous Limitation:** +Cloudsplaining analyzed each IAM policy independently, missing privilege escalation risks that arise only from the combination of multiple policies attached to a single principal. + +**New Capability:** +Cloudsplaining now supports *principal-centric* privilege escalation analysis. +All attached and inline policies are merged before evaluation, enabling detection of composite escalation paths. diff --git a/test/scanning/test_policy_document.py b/test/scanning/test_policy_document.py index a38813e6..49dd07b7 100644 --- a/test/scanning/test_policy_document.py +++ b/test/scanning/test_policy_document.py @@ -1,9 +1,10 @@ import unittest import json -from cloudsplaining.scan.policy_document import PolicyDocument +from cloudsplaining.scan.policy_document import PolicyDocument, merge_policy_documents from cloudsplaining.shared.exclusions import is_name_excluded, Exclusions + class TestPolicyDocument(unittest.TestCase): def test_policy_document_return_json(self): test_policy = { @@ -22,6 +23,12 @@ def test_policy_document_return_json(self): result = policy_document.json # That function returns the Policy as JSON self.assertEqual(result, test_policy) + def test_merge_policy_documents_combines_allow_and_deny(self): + p1 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "iam:PassRole"}]}) + p2 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "ec2:RunInstances"}]}) + merged = merge_policy_documents([p1, p2]) + self.assertEqual(len(merged.document["Statement"]), 2) + def test_policy_document_return_statement_results(self): test_policy = {