diff --git a/cloudmapper.py b/cloudmapper.py index 44047d186..a8be8b880 100755 --- a/cloudmapper.py +++ b/cloudmapper.py @@ -30,7 +30,7 @@ import pkgutil import importlib -__version__ = "2.8.2" +__version__ = "2.8.3" def show_help(commands): diff --git a/commands/access_check.py b/commands/access_check.py index 0997ca41f..1f447de7d 100644 --- a/commands/access_check.py +++ b/commands/access_check.py @@ -17,13 +17,16 @@ __description__ = "[proof-of-concept] Check who has access to a resource" + def replace_principal_variables(reference, principal): """ Given a resource reference string (ie. the Resource string from an IAM policy) and a prinicipal, replace any variables in the resource string that are principal related. """ reference = reference.lower() for tag in principal.tags: - reference = reference.replace("${aws:principaltag/"+tag["Key"].lower()+"}", tag["Value"].lower()) + reference = reference.replace( + "${aws:principaltag/" + tag["Key"].lower() + "}", tag["Value"].lower() + ) reference = reference.replace("${aws:principaltype}", principal.mytype.lower()) return reference @@ -36,7 +39,7 @@ def apply_condition_function(condition_function, left_side, right_side): # "t2.*", # "m3.*" # ]}} - # + # # or # # "Condition": { @@ -48,7 +51,7 @@ def apply_condition_function(condition_function, left_side, right_side): # ] # } # } - # + # # or # # "Condition": { @@ -60,27 +63,28 @@ def apply_condition_function(condition_function, left_side, right_side): # } # } - if condition_function == 'StringEquals': + if condition_function == "StringEquals": return left_side == right_side - elif condition_function == 'StringNotEquals': + elif condition_function == "StringNotEquals": return left_side != right_side - elif condition_function == 'StringEqualsIgnoreCase': + elif condition_function == "StringEqualsIgnoreCase": return left_side.lower() == right_side.lower() - elif condition_function == 'StringNotEqualsIgnoreCase': + elif condition_function == "StringNotEqualsIgnoreCase": return left_side.lower() != right_side.lower() - - elif condition_function == 'StringLike': + + elif condition_function == "StringLike": right_side.replace("*", ".*") matcher = re.compile("^{}$".format(right_side)) return matcher.match(left_side) - elif condition_function == 'StringNotLike': + elif condition_function == "StringNotLike": right_side.replace("*", ".*") matcher = re.compile("^{}$".format(right_side)) return not matcher.match(left_side) - + # TODO Need to handle other operators from https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition_operators.html return None + def get_condition_result(condition_function, condition_values, resource_arn, principal): """ Given a condition_function such as: 'StringEquals' @@ -93,9 +97,12 @@ def get_condition_result(condition_function, condition_values, resource_arn, pri for k in condition_values: if k.startswith("aws:PrincipalTag/"): for tag in principal.tags: - if k == "aws:PrincipalTag/"+tag["Key"]: - results.append(apply_condition_function(condition_function, tag["Value"], condition_values[k])) - + if k == "aws:PrincipalTag/" + tag["Key"]: + results.append( + apply_condition_function( + condition_function, tag["Value"], condition_values[k] + ) + ) # The array results should now look something like [True, False, True], # although more commonly is just [], [False], or [True] @@ -114,7 +121,10 @@ def get_condition_result(condition_function, condition_values, resource_arn, pri return None -def get_privilege_statements(policy_doc, privilege_matches, resource_arn, principal): + +def get_privilege_statements( + policy_doc, privilege_matches, resource_arn, principal, policy_identifier +): policy = parliament.policy.Policy(policy_doc) policy.analyze() @@ -138,10 +148,16 @@ def get_privilege_statements(policy_doc, privilege_matches, resource_arn, princi stmts = references[reference] condition_allowed_stmts = [] for stmt in stmts: + stmt.set_policy_identifier(policy_identifier) allowed_by_conditions = True for condition_function in stmt.stmt.get("Condition", {}): condition_values = stmt.stmt["Condition"][condition_function] - condition_result = get_condition_result(condition_function, condition_values, resource_arn, principal) + condition_result = get_condition_result( + condition_function, + condition_values, + resource_arn, + principal, + ) # TODO Need to do something different for Deny, to avoid false negatives if condition_result is not None: if condition_result == False: @@ -255,7 +271,11 @@ def access_check_command(accounts, config, args): policy_doc = get_managed_policy(iam, policy["PolicyArn"]) privileged_statements.extend( get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + policy["PolicyArn"], ) ) @@ -264,7 +284,11 @@ def access_check_command(accounts, config, args): policy_doc = policy["PolicyDocument"] privileged_statements.extend( get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + role["Arn"] + ":" + policy["PolicyName"], ) ) @@ -282,7 +306,11 @@ def access_check_command(accounts, config, args): if boundary is not None: policy_doc = get_managed_policy(iam, boundary["PermissionsBoundaryArn"]) boundary_statements = get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + boundary["PermissionsBoundaryArn"], ) # Find the allowed privileges @@ -290,11 +318,12 @@ def access_check_command(accounts, config, args): privilege_matches, privileged_statements, boundary_statements ) for priv in allowed_privileges: - print( - "{} - {}:{}".format( - role["Arn"], priv["privilege_prefix"], priv["privilege_name"] - ) - ) + priv_object = { + "principal": role["Arn"], + "privilege": f"{priv['privilege_prefix']}:{priv['privilege_name']}", + "references": list(priv["references"]), + } + print(json.dumps(priv_object)) # Check the users for user in iam["UserDetailList"]: @@ -307,7 +336,11 @@ def access_check_command(accounts, config, args): policy_doc = get_managed_policy(iam, policy["PolicyArn"]) privileged_statements.extend( get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + policy["PolicyArn"], ) ) @@ -316,7 +349,11 @@ def access_check_command(accounts, config, args): policy_doc = policy["PolicyDocument"] privileged_statements.extend( get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + user["Arn"] + ":" + policy["PolicyName"], ) ) @@ -333,6 +370,7 @@ def access_check_command(accounts, config, args): privilege_matches, args.resource_arn, principal, + policy["PolicyArn"], ) ) @@ -344,6 +382,7 @@ def access_check_command(accounts, config, args): privilege_matches, args.resource_arn, principal, + group["Arn"] + ":" + policy["PolicyName"], ) ) @@ -361,7 +400,11 @@ def access_check_command(accounts, config, args): if boundary is not None: policy_doc = get_managed_policy(iam, boundary["PermissionsBoundaryArn"]) boundary_statements = get_privilege_statements( - policy_doc, privilege_matches, args.resource_arn, principal + policy_doc, + privilege_matches, + args.resource_arn, + principal, + boundary["PermissionsBoundaryArn"], ) # Find the allowed privileges @@ -369,11 +412,12 @@ def access_check_command(accounts, config, args): privilege_matches, privileged_statements, boundary_statements ) for priv in allowed_privileges: - print( - "{} - {}:{}".format( - user["Arn"], priv["privilege_prefix"], priv["privilege_name"] - ) - ) + priv_object = { + "principal": user["Arn"], + "privilege": f"{priv['privilege_prefix']}:{priv['privilege_name']}", + "references": list(priv["references"]), + } + print(json.dumps(priv_object)) def get_managed_policy(iam, policy_arn): @@ -406,7 +450,7 @@ def is_allowed(privilege_prefix, privilege_name, statements): is_allowed = False if is_allowed: - return True + return stmts_for_privilege return False @@ -425,11 +469,15 @@ def get_allowed_privileges( ): continue - if is_allowed( + allowed_stmts = is_allowed( privilege["privilege_prefix"], privilege["privilege_name"], privileged_statements, - ): + ) + if allowed_stmts: + privilege["references"] = set() + for stmt in allowed_stmts: + privilege["references"].add(stmt.policy_id) allowed_privileges.append(privilege) return allowed_privileges diff --git a/requirements.txt b/requirements.txt index c052e7c83..7fcf64cf9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,8 +18,8 @@ mccabe==0.6.1 mock==4.0.2 netaddr==0.7.19 nose==1.3.7 +parliament==0.4.14 pandas==1.0.3 -parliament==0.3.6 policyuniverse==1.1.0.1 pycodestyle==2.5.0 pyflakes==2.2.0 diff --git a/tests/unit/test_access_check.py b/tests/unit/test_access_check.py index 73f32c6b9..818c049f1 100644 --- a/tests/unit/test_access_check.py +++ b/tests/unit/test_access_check.py @@ -93,7 +93,15 @@ def test_get_privilege_statements(self): } ] assert_true( - len(get_privilege_statements(policy_doc, privilege_matches, "*", principal)) + len( + get_privilege_statements( + policy_doc, + privilege_matches, + "*", + principal, + policy_identifier="unit_test", + ) + ) > 0 ) @@ -106,7 +114,15 @@ def test_get_privilege_statements(self): } ] assert_true( - len(get_privilege_statements(policy_doc, privilege_matches, "*", principal)) + len( + get_privilege_statements( + policy_doc, + privilege_matches, + "*", + principal, + policy_identifier="unit_test", + ) + ) == 0 ) @@ -119,7 +135,15 @@ def test_get_privilege_statements(self): } ] assert_true( - len(get_privilege_statements(policy_doc, privilege_matches, "*", principal)) + len( + get_privilege_statements( + policy_doc, + privilege_matches, + "*", + principal, + policy_identifier="unit_test", + ) + ) > 0 ) @@ -127,7 +151,11 @@ def test_get_privilege_statements(self): assert_true( len( get_privilege_statements( - policy_doc, privilege_matches, "arn:aws:sns:*:*:prod-*", principal + policy_doc, + privilege_matches, + "arn:aws:sns:*:*:prod-*", + principal, + policy_identifier="unit_test", ) ) > 0 @@ -136,7 +164,11 @@ def test_get_privilege_statements(self): assert_true( len( get_privilege_statements( - policy_doc, privilege_matches, "arn:aws:sns:*:*:dev-*", principal + policy_doc, + privilege_matches, + "arn:aws:sns:*:*:dev-*", + principal, + policy_identifier="unit_test", ) ) == 0 @@ -180,7 +212,9 @@ def test_get_allowed_privileges(self): "resource_type": "object", } ] - stmts = get_privilege_statements(policy_doc, privilege_matches, "*", principal) + stmts = get_privilege_statements( + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" + ) # Ensure we have allowed statements when there is no boundary assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) > 0) @@ -201,19 +235,23 @@ def test_get_allowed_privileges(self): }""" policy_doc = json.loads(policy_doc) boundary = get_privilege_statements( - policy_doc, privilege_matches, "*", principal + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" ) # Ensure nothing is allowed when the boundary denies all assert_true( len(get_allowed_privileges(privilege_matches, stmts, boundary)) == 0 ) - + def test_conditions_on_principal_tags(self): # Example from https://aws.amazon.com/blogs/security/working-backward-from-iam-policies-and-principal-tags-to-standardized-names-and-tags-for-your-aws-resources/ principal = Principal( "AssumedRole", - [{"Key": "project", "Value": "web"}, {"Key": "access-team", "Value": "eng"}, {"Key": "cost-center", "Value": "987654"}], + [ + {"Key": "project", "Value": "web"}, + {"Key": "access-team", "Value": "eng"}, + {"Key": "cost-center", "Value": "987654"}, + ], username="role", userid="", ) @@ -243,20 +281,27 @@ def test_conditions_on_principal_tags(self): "resource_type": "object", } ] - stmts = get_privilege_statements(policy_doc, privilege_matches, "*", principal) + stmts = get_privilege_statements( + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" + ) assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) > 0) principal = Principal( "AssumedRole", - [{"Key": "project", "Value": "database"}, {"Key": "access-team", "Value": "eng"}, {"Key": "cost-center", "Value": "987654"}], + [ + {"Key": "project", "Value": "database"}, + {"Key": "access-team", "Value": "eng"}, + {"Key": "cost-center", "Value": "987654"}, + ], username="role", userid="", ) - stmts = get_privilege_statements(policy_doc, privilege_matches, "*", principal) + stmts = get_privilege_statements( + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" + ) assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) == 0) - # def test_conditions_on_resource_tags_ec2(self): # # TODO # # Example from https://aws.amazon.com/blogs/security/working-backward-from-iam-policies-and-principal-tags-to-standardized-names-and-tags-for-your-aws-resources/ @@ -298,7 +343,6 @@ def test_conditions_on_principal_tags(self): # print("get_privilege_statements: {}".format(stmts)) # assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) > 0) - # def test_conditions_on_resource_and_principal_tags_ec2(self): # # TODO # # Example from https://aws.amazon.com/blogs/security/working-backward-from-iam-policies-and-principal-tags-to-standardized-names-and-tags-for-your-aws-resources/ @@ -339,7 +383,6 @@ def test_conditions_on_principal_tags(self): # # TODO Need to get resource info # print("get_privilege_statements: {}".format(stmts)) # assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) > 0) - # def test_conditions_on_resource_and_principal_tags_rds(self): # # TODO @@ -395,7 +438,11 @@ def test_conditions_on_principal_tags(self): def test_conditions_on_resource_and_principal_tags_complex_secrets(self): principal = Principal( "AssumedRole", - [{"Key": "access-project", "Value": "peg"}, {"Key": "access-team", "Value": "eng"}, {"Key": "cost-center", "Value": "987654"}], + [ + {"Key": "access-project", "Value": "peg"}, + {"Key": "access-team", "Value": "eng"}, + {"Key": "cost-center", "Value": "987654"}, + ], username="role", userid="", ) @@ -484,7 +531,9 @@ def test_conditions_on_resource_and_principal_tags_complex_secrets(self): "resource_type": "secret", } ] - stmts = get_privilege_statements(policy_doc, privilege_matches, "*", principal) + stmts = get_privilege_statements( + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" + ) assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) > 0) # Ensure the Deny in the policy works @@ -495,14 +544,13 @@ def test_conditions_on_resource_and_principal_tags_complex_secrets(self): "resource_type": "secret", } ] - stmts = get_privilege_statements(policy_doc, privilege_matches, "*", principal) + stmts = get_privilege_statements( + policy_doc, privilege_matches, "*", principal, policy_identifier="unit_test" + ) assert_true(len(get_allowed_privileges(privilege_matches, stmts, None)) == 0) # TODO Testing these conditions requires getting resource tags - - - # for stmt in stmts: # for m in stmt["matching_statements"]: # print(m) @@ -511,7 +559,6 @@ def test_conditions_on_resource_and_principal_tags_complex_secrets(self): # len(get_privilege_statements(policy_doc, privilege_matches, "*", principal)) # > 0 # ) - def test_access_check(self): # This test calls the access_check command across the demo data,