diff --git a/s3tests_boto3/functional/test_sts.py b/s3tests_boto3/functional/test_sts.py index 8969167c4..89d02a94b 100644 --- a/s3tests_boto3/functional/test_sts.py +++ b/s3tests_boto3/functional/test_sts.py @@ -1,36 +1,11 @@ import boto3 -import botocore.session from botocore.exceptions import ClientError -from botocore.exceptions import ParamValidationError import pytest -import isodate -import email.utils -import datetime -import threading -import re -import pytz -from collections import OrderedDict -import requests import json -import base64 -import hmac -import hashlib -import xml.etree.ElementTree as ET import time -import operator -import os -import string -import random -import socket -import ssl import logging -from collections import namedtuple -from email.header import decode_header - -from . import( - configfile, - setup_teardown, +from . import ( get_iam_client, get_sts_client, get_client, @@ -54,61 +29,83 @@ log = logging.getLogger(__name__) -def create_role(iam_client,path,rolename,policy_document,description,sessionduration,permissionboundary,tag_list=None): - role_err=None + +def create_role( + iam_client, + path, + rolename, + policy_document, + description, + sessionduration, + permissionboundary, + tag_list=None, +): + role_err = None role_response = None if rolename is None: - rolename=get_parameter_name() + rolename = get_parameter_name() if tag_list is None: tag_list = [] try: - role_response = iam_client.create_role(Path=path,RoleName=rolename,AssumeRolePolicyDocument=policy_document,Tags=tag_list) + role_response = iam_client.create_role( + Path=path, + RoleName=rolename, + AssumeRolePolicyDocument=policy_document, + Tags=tag_list, + ) except ClientError as e: - role_err = e.response['Code'] - return (role_err,role_response,rolename) + role_err = e.response['Code'] + return role_err, role_response, rolename -def put_role_policy(iam_client,rolename,policyname,role_policy): - role_err=None + +def put_role_policy(iam_client, rolename, policyname, role_policy): + role_err = None role_response = None if policyname is None: - policyname=get_parameter_name() + policyname = get_parameter_name() try: - role_response = iam_client.put_role_policy(RoleName=rolename,PolicyName=policyname,PolicyDocument=role_policy) + role_response = iam_client.put_role_policy( + RoleName=rolename, + PolicyName=policyname, + PolicyDocument=role_policy, + ) except ClientError as e: - role_err = e.response['Code'] - return (role_err,role_response) + role_err = e.response['Code'] + return role_err, role_response + -def put_user_policy(iam_client,username,policyname,policy_document): - role_err=None +def put_user_policy(iam_client, username, policyname, policy_document): + role_err = None role_response = None if policyname is None: - policyname=get_parameter_name() + policyname = get_parameter_name() try: - role_response = iam_client.put_user_policy(UserName=username,PolicyName=policyname,PolicyDocument=policy_document) + role_response = iam_client.put_user_policy( + UserName=username, + PolicyName=policyname, + PolicyDocument=policy_document, + ) except ClientError as e: role_err = e.response['Code'] - return (role_err,role_response,policyname) - -def get_s3_client_using_iam_creds(): - iam_access_key = get_iam_access_key() - iam_secret_key = get_iam_secret_key() - default_endpoint = get_config_endpoint() + return role_err, role_response, policyname - s3_client_iam_creds = boto3.client('s3', - aws_access_key_id = iam_access_key, - aws_secret_access_key = iam_secret_key, - endpoint_url=default_endpoint, - region_name='', - ) +def get_s3_client_using_iam_creds() -> boto3: + s3_client_iam_creds = boto3.client( + 's3', + aws_access_key_id=get_iam_access_key(), + aws_secret_access_key=get_iam_secret_key(), + endpoint_url=get_config_endpoint(), + region_name='', + ) return s3_client_iam_creds + def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): - oidc_arn = None oidc_error = None clientids = [] if clientidlist is None: - clientidlist=clientids + clientidlist = clientids try: oidc_response = iam_client.create_open_id_connect_provider( Url=url, @@ -116,13 +113,13 @@ def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): ThumbprintList=thumbprintlist, ) oidc_arn = oidc_response['OpenIDConnectProviderArn'] - print (oidc_arn) + print(oidc_arn) except ClientError as e: oidc_error = e.response['Code'] - print (oidc_error) + print(oidc_error) try: oidc_error = None - print (url) + print(url) if url.startswith('http://'): url = url[len('http://'):] elif url.startswith('https://'): @@ -130,210 +127,272 @@ def create_oidc_provider(iam_client, url, clientidlist, thumbprintlist): elif url.startswith('www.'): url = url[len('www.'):] oidc_arn = 'arn:aws:iam:::oidc-provider/{}'.format(url) - print (url) - print (oidc_arn) - oidc_response = iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) - except ClientError as e: + print(url) + print(oidc_arn) + iam_client.get_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + except ClientError: oidc_arn = None - return (oidc_arn, oidc_error) - -def get_s3_resource_using_iam_creds(): - iam_access_key = get_iam_access_key() - iam_secret_key = get_iam_secret_key() - default_endpoint = get_config_endpoint() + return oidc_arn, oidc_error - s3_res_iam_creds = boto3.resource('s3', - aws_access_key_id = iam_access_key, - aws_secret_access_key = iam_secret_key, - endpoint_url=default_endpoint, - region_name='', - ) +def get_s3_resource_using_iam_creds(): + s3_res_iam_creds = boto3.resource( + 's3', + aws_access_key_id=get_iam_access_key(), + aws_secret_access_key=get_iam_secret_key(), + endpoint_url=get_config_endpoint(), + region_name='', + ) return s3_res_iam_creds + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_get_session_token(): - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" - (resp_err,resp,policy_name)=put_user_policy(iam_client,sts_user_id,None,user_policy) + resp_err, resp, policy_name = put_user_policy( + iam_client, + sts_user_id, + None, + user_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - - response=sts_client.get_session_token() + + response = sts_client.get_session_token() assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - - s3_client=boto3.client('s3', - aws_access_key_id = response['Credentials']['AccessKeyId'], - aws_secret_access_key = response['Credentials']['SecretAccessKey'], - aws_session_token = response['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=response['Credentials']['AccessKeyId'], + aws_secret_access_key=response['Credentials']['SecretAccessKey'], + aws_session_token=response['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() try: s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 - finish=s3_client.delete_bucket(Bucket=bucket_name) - finally: # clean up user policy even if create_bucket/delete_bucket fails - iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name) + s3_client.delete_bucket(Bucket=bucket_name) + finally: # clean up user policy even if create_bucket/delete_bucket fails + iam_client.delete_user_policy(UserName=sts_user_id, PolicyName=policy_name) + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_get_session_token_permanent_creds_denied(): - s3bucket_error=None - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - s3_main_access_key=get_main_aws_access_key() - s3_main_secret_key=get_main_aws_secret_key() - + s3bucket_error = None + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + s3_main_access_key = get_main_aws_access_key() + s3_main_secret_key = get_main_aws_secret_key() + user_policy = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"],\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}},{\"Effect\":\"Allow\",\"Action\":\"sts:GetSessionToken\",\"Resource\":\"*\",\"Condition\":{\"BoolIfExists\":{\"sts:authentication\":\"false\"}}}]}" - (resp_err,resp,policy_name)=put_user_policy(iam_client,sts_user_id,None,user_policy) + resp_err, resp, policy_name = put_user_policy(iam_client, sts_user_id, None, user_policy) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - - response=sts_client.get_session_token() + + response = sts_client.get_session_token() assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - - s3_client=boto3.client('s3', - aws_access_key_id = s3_main_access_key, - aws_secret_access_key = s3_main_secret_key, - aws_session_token = response['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=s3_main_access_key, + aws_secret_access_key=s3_main_secret_key, + aws_session_token=response['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name) + s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") assert s3bucket_error == 'AccessDenied' - iam_client.delete_user_policy(UserName=sts_user_id,PolicyName=policy_name) + iam_client.delete_user_policy(UserName=sts_user_id, PolicyName=policy_name) + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_assume_role_allow(): - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) if role_response: assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' else: assert False, role_error - + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - - resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + + resp = sts_client.assume_role( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_assume_role_deny(): - s3bucket_error=None - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - + s3bucket_error = None + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) if role_response: assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' else: assert False, role_error - + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - - resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + + resp = sts_client.assume_role( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name) + s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") assert s3bucket_error == 'AccessDenied' + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_assume_role_creds_expiry(): - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/"+sts_user_id+"\"]},\"Action\":[\"sts:AssumeRole\"]}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) if role_response: assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' else: assert False, role_error - + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - - resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,DurationSeconds=900) + + resp = sts_client.assume_role( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + DurationSeconds=900, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 time.sleep(900) - - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name) + s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") - assert s3bucket_error == 'AccessDenied' + assert s3bucket_error == 'AccessDenied' + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore @@ -342,14 +401,22 @@ def test_assume_role_deny_head_nonexistent(): bucket_name = get_new_bucket_name() get_client().create_bucket(Bucket=bucket_name) - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) if role_response: assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name else: @@ -357,28 +424,39 @@ def test_assume_role_deny_head_nonexistent(): # allow GetObject but deny ListBucket role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":"s3:GetObject","Principal":"*","Resource":"arn:aws:s3:::*"}}' - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + resp = sts_client.assume_role( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='') - status=200 + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + status = 200 try: s3_client.head_object(Bucket=bucket_name, Key='nonexistent') except ClientError as e: status = e.response['ResponseMetadata']['HTTPStatusCode'] assert status == 403 + @pytest.mark.test_of_sts @pytest.mark.fails_on_dbstore def test_assume_role_allow_head_nonexistent(): @@ -386,14 +464,22 @@ def test_assume_role_allow_head_nonexistent(): bucket_name = get_new_bucket_name() get_client().create_bucket(Bucket=bucket_name) - iam_client=get_iam_client() - sts_client=get_sts_client() - sts_user_id=get_alt_user_id() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + sts_user_id = get_alt_user_id() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) if role_response: assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name else: @@ -401,22 +487,32 @@ def test_assume_role_allow_head_nonexistent(): # allow GetObject and ListBucket role_policy = '{"Version":"2012-10-17","Statement":{"Effect":"Allow","Action":["s3:GetObject","s3:ListBucket"],"Principal":"*","Resource":"arn:aws:s3:::*"}}' - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - resp=sts_client.assume_role(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name) + resp = sts_client.assume_role( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='') - status=200 + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) + status = 200 try: s3_client.head_object(Bucket=bucket_name, Key='nonexistent') except ClientError as e: @@ -429,53 +525,70 @@ def test_assume_role_allow_head_nonexistent(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() - + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() + oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[thumbprint], ) - + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' - + role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) if response: assert response['ResponseMetadata']['HTTPStatusCode'] == 200 else: assert False, role_err - - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] + + iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"], ) + ''' @pytest.mark.webidentity_test def test_assume_role_with_web_identity_invalid_webtoken(): @@ -523,73 +636,93 @@ def test_assume_role_with_web_identity_invalid_webtoken(): # Session Policy Tests ####################### + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_check_on_different_buckets(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"arn:aws:s3:::test2\",\"arn:aws:s3:::test2/*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_name_1 = 'test1' + s3bucket_error_1 = 'AccessGranted' try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name_1) + s3_client.create_bucket(Bucket=bucket_name_1) except ClientError as e: - s3bucket_error = e.response.get("Error", {}).get("Code") - assert s3bucket_error == 'AccessDenied' + s3bucket_error_1 = e.response.get("Error", {}).get("Code") + assert s3bucket_error_1 == 'AccessDenied' + s3bucket_error_2 = 'AccessGranted' bucket_name_2 = 'test2' try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name_2) + s3_client.create_bucket(Bucket=bucket_name_2) except ClientError as e: - s3bucket_error = e.response.get("Error", {}).get("Code") - assert s3bucket_error == 'AccessDenied' + s3bucket_error_2 = e.response.get("Error", {}).get("Code") + assert s3bucket_error_2 == 'AccessDenied' bucket_body = 'please-write-something' - #body.encode(encoding='utf_8') + s3_put_obj_error = 'BucketExists' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'NoSuchBucket' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) @pytest.mark.webidentity_test @@ -597,28 +730,41 @@ def test_session_policy_check_on_different_buckets(): @pytest.mark.fails_on_dbstore def test_session_policy_check_on_same_bucket(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -629,24 +775,28 @@ def test_session_policy_check_on_same_bucket(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) @pytest.mark.webidentity_test @@ -654,30 +804,41 @@ def test_session_policy_check_on_same_bucket(): @pytest.mark.fails_on_dbstore def test_session_policy_check_put_obj_denial(): check_webidentity() - iam_client=get_iam_client() - iam_access_key=get_iam_access_key() - iam_secret_key=get_iam_secret_key() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -688,27 +849,32 @@ def test_session_policy_check_put_obj_denial(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' + s3_put_obj_error = "AccessGranted" try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) @pytest.mark.webidentity_test @@ -716,30 +882,41 @@ def test_session_policy_check_put_obj_denial(): @pytest.mark.fails_on_dbstore def test_swapping_role_policy_and_session_policy(): check_webidentity() - iam_client=get_iam_client() - iam_access_key=get_iam_access_key() - iam_secret_key=get_iam_secret_key() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\",\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -750,53 +927,69 @@ def test_swapping_role_policy_and_session_policy(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_check_different_op_permissions(): check_webidentity() - iam_client=get_iam_client() - iam_access_key=get_iam_access_key() - iam_secret_key=get_iam_secret_key() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -807,27 +1000,32 @@ def test_session_policy_check_different_op_permissions(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' + s3_put_obj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) @pytest.mark.webidentity_test @@ -835,30 +1033,41 @@ def test_session_policy_check_different_op_permissions(): @pytest.mark.fails_on_dbstore def test_session_policy_check_with_deny_effect(): check_webidentity() - iam_client=get_iam_client() - iam_access_key=get_iam_access_key() - iam_secret_key=get_iam_secret_key() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -869,26 +1078,31 @@ def test_session_policy_check_with_deny_effect(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' + s3_put_obj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) @pytest.mark.webidentity_test @@ -896,30 +1110,41 @@ def test_session_policy_check_with_deny_effect(): @pytest.mark.fails_on_dbstore def test_session_policy_check_with_deny_on_same_op(): check_webidentity() - iam_client=get_iam_client() - iam_access_key=get_iam_access_key() - iam_secret_key=get_iam_secret_key() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy_new = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy_new) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy_new, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3_client_iam_creds = get_s3_client_using_iam_creds() @@ -930,54 +1155,73 @@ def test_session_policy_check_with_deny_on_same_op(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Deny\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' + s3_put_obj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_bucket_policy_role_arn(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3client_iamcreds = get_s3_client_using_iam_creds() @@ -988,72 +1232,91 @@ def test_session_policy_bucket_policy_role_arn(): resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolearn = "arn:aws:iam:::role/" + general_role_name - bucket_policy = json.dumps( - { + bucket_policy = json.dumps({ "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": {"AWS": "{}".format(rolearn)}, - "Action": ["s3:GetObject","s3:PutObject"], - "Resource": [ - "{}".format(resource1), - "{}".format(resource2) - ] - }] + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": "{}".format(rolearn), + }, + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": ["{}".format(resource1), "{}".format(resource2)], + }, + ], }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 + s3object_error = 'AccessGranted' try: - obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") + s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3object_error = e.response.get("Error", {}).get("Code") assert s3object_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_bucket_policy_session_arn(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3client_iamcreds = get_s3_client_using_iam_creds() @@ -1064,70 +1327,87 @@ def test_session_policy_bucket_policy_session_arn(): resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name - bucket_policy = json.dumps( - { + bucket_policy = json.dumps({ "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": {"AWS": "{}".format(rolesessionarn)}, - "Action": ["s3:GetObject","s3:PutObject"], - "Resource": [ - "{}".format(resource1), - "{}".format(resource2) - ] - }] + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": "{}".format(rolesessionarn), + }, + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": ["{}".format(resource1), "{}".format(resource2)], + }, + ], }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_copy_object(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3client_iamcreds = get_s3_client_using_iam_creds() @@ -1138,40 +1418,46 @@ def test_session_policy_copy_object(): resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name - print (rolesessionarn) - bucket_policy = json.dumps( - { + print(rolesessionarn) + bucket_policy = json.dumps({ "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": {"AWS": "{}".format(rolesessionarn)}, - "Action": ["s3:GetObject","s3:PutObject"], - "Resource": [ - "{}".format(resource1), - "{}".format(resource2) - ] - }] + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": "{}".format(rolesessionarn), + }, + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": ["{}".format(resource1), "{}".format(resource2)], + }, + ], }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 copy_source = { - 'Bucket': bucket_name_1, - 'Key': 'test-1.txt' + 'Bucket': bucket_name_1, + 'Key': 'test-1.txt' } s3_client.copy(copy_source, bucket_name_1, "test-2.txt") @@ -1179,32 +1465,39 @@ def test_session_policy_copy_object(): s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-2.txt") assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_no_bucket_role_policy(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' s3client_iamcreds = get_s3_client_using_iam_creds() @@ -1214,53 +1507,72 @@ def test_session_policy_no_bucket_role_policy(): session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\",\"s3:GetObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' + s3putobj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3putobj_error = e.response.get("Error", {}).get("Code") assert s3putobj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_arn) + @pytest.mark.webidentity_test @pytest.mark.session_policy @pytest.mark.fails_on_dbstore def test_session_policy_bucket_policy_deny(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - aud=get_aud() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + aud = get_aud() + token = get_token() + realm = get_realm_name() url = 'http://localhost:8080/auth/realms/{}'.format(realm) thumbprintlist = [thumbprint] - (oidc_arn,oidc_error) = create_oidc_provider(iam_client, url, None, thumbprintlist) + oidc_arn, oidc_error = create_oidc_provider(iam_client, url, None, thumbprintlist) if oidc_error is not None: raise RuntimeError('Unable to create/get openid connect provider {}'.format(oidc_error)) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_arn+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":app_id\":\""+aud+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":[\"*\"]}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 s3client_iamcreds = get_s3_client_using_iam_creds() @@ -1271,141 +1583,179 @@ def test_session_policy_bucket_policy_deny(): resource1 = "arn:aws:s3:::" + bucket_name_1 resource2 = "arn:aws:s3:::" + bucket_name_1 + "/*" rolesessionarn = "arn:aws:iam:::assumed-role/" + general_role_name + "/" + role_session_name - bucket_policy = json.dumps( - { + bucket_policy = json.dumps({ "Version": "2012-10-17", - "Statement": [{ - "Effect": "Deny", - "Principal": {"AWS": "{}".format(rolesessionarn)}, - "Action": ["s3:GetObject","s3:PutObject"], - "Resource": [ - "{}".format(resource1), - "{}".format(resource2) - ] - }] + "Statement": [ + { + "Effect": "Deny", + "Principal": {"AWS": "{}".format(rolesessionarn)}, + "Action": ["s3:GetObject", "s3:PutObject"], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2), + ], + }, + ], }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + Policy=session_policy, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], - endpoint_url=default_endpoint, - region_name='', - ) + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='', + ) bucket_body = 'this is a test file' - + s3putobj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") + s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") except ClientError as e: s3putobj_error = e.response.get("Error", {}).get("Code") assert s3putobj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_arn - ) @pytest.mark.webidentity_test @pytest.mark.token_claims_trust_policy_test @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_with_sub(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - sub=get_sub() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + sub = get_sub() + token = get_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":sub\":\""+sub+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.token_claims_trust_policy_test @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_with_azp(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - azp=get_azp() - token=get_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + azp = get_azp() + token = get_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\"],\"Condition\":{\"StringEquals\":{\"localhost:8080/auth/realms/"+realm+":azp\":\""+azp+"\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1413,48 +1763,65 @@ def test_assume_role_with_web_identity_with_azp(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_with_request_tag(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\"}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1462,48 +1829,65 @@ def test_assume_role_with_web_identity_with_request_tag(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_with_principal_tag(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:PrincipalTag/Department\":\"Engineering\"}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1511,48 +1895,65 @@ def test_assume_role_with_web_identity_with_principal_tag(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_for_all_values(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\",\"Marketing\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1560,50 +1961,68 @@ def test_assume_role_with_web_identity_for_all_values(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_for_all_values_deny(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' - #ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy + # ForAllValues: The condition returns true if every key value in the request matches at least one value in the policy role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAllValues:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() + s3bucket_error = 'AccessGranted' try: - s3bucket = s3_client.create_bucket(Bucket=bucket_name) + s3_client.create_bucket(Bucket=bucket_name) except ClientError as e: s3bucket_error = e.response.get("Error", {}).get("Code") assert s3bucket_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1611,48 +2030,65 @@ def test_assume_role_with_web_identity_for_all_values_deny(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_tag_keys_trust_policy(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":\"Department\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"ForAnyValue:StringEquals\":{\"aws:PrincipalTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1660,48 +2096,65 @@ def test_assume_role_with_web_identity_tag_keys_trust_policy(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_tag_keys_role_policy(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"aws:TagKeys\":[\"Department\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_name = get_new_bucket_name() s3bucket = s3_client.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bkt = s3_client.delete_bucket(Bucket=bucket_name) assert bkt['ResponseMetadata']['HTTPStatusCode'] == 204 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1709,13 +2162,13 @@ def test_assume_role_with_web_identity_tag_keys_role_policy(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_resource_tag(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() @@ -1726,41 +2179,70 @@ def test_assume_role_with_web_identity_resource_tag(): assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]}) + bucket_tagging.put( + Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + { + 'Key': 'Department', + 'Value': 'Marketing', + }, + ], + }, + ) oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) - bucket_body = 'this is a test file' - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + s3_put_obj = s3_client.put_object(Body='this is a test file', Bucket=bucket_name, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1768,13 +2250,13 @@ def test_assume_role_with_web_identity_resource_tag(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_resource_tag_deny(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() @@ -1785,41 +2267,58 @@ def test_assume_role_with_web_identity_resource_tag_deny(): assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) - bucket_body = 'this is a test file' + s3_put_obj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + s3_client.put_object(Body='this is a test file', Bucket=bucket_name, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1827,13 +2326,13 @@ def test_assume_role_with_web_identity_resource_tag_deny(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_wrong_resource_tag_deny(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() @@ -1844,44 +2343,69 @@ def test_assume_role_with_web_identity_wrong_resource_tag_deny(): assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'WrongResourcetag'}]}) + bucket_tagging.put(Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'WrongResourcetag', + }, + ], + }, + ) oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy( + iam_client, + general_role_name, + None, + role_policy, + ) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) - bucket_body = 'this is a test file' + s3_put_obj_error = 'AccessGranted' try: - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + s3_client.put_object(Body='this is a test file', Bucket=bucket_name, Key="test-1.txt") except ClientError as e: s3_put_obj_error = e.response.get("Error", {}).get("Code") assert s3_put_obj_error == 'AccessDenied' - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1889,13 +2413,13 @@ def test_assume_role_with_web_identity_wrong_resource_tag_deny(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_resource_tag_princ_tag(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() @@ -1906,33 +2430,54 @@ def test_assume_role_with_web_identity_resource_tag_princ_tag(): assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + bucket_tagging.put(Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + ], + }, + ) oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy(iam_client, general_role_name, None, role_policy) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_body = 'this is a test file' tags = 'Department=Engineering&Department=Marketing' @@ -1943,9 +2488,8 @@ def test_assume_role_with_web_identity_resource_tag_princ_tag(): s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key=key) assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -1953,58 +2497,87 @@ def test_assume_role_with_web_identity_resource_tag_princ_tag(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_resource_tag_copy_obj(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() s3_client_iam_creds = s3_res_iam_creds.meta.client - #create two buckets and add same tags to both + # create two buckets and add same tags to both bucket_name = get_new_bucket_name() s3bucket = s3_client_iam_creds.create_bucket(Bucket=bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + bucket_tagging.put(Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + ], + }, + ) copy_bucket_name = get_new_bucket_name() s3bucket = s3_client_iam_creds.create_bucket(Bucket=copy_bucket_name) assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(copy_bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'}]}) + bucket_tagging.put(Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + ], + }, + ) oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"aws:RequestTag/Department\":\"Engineering\"}}}]}" - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None) + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"${aws:PrincipalTag/Department}\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy(iam_client, general_role_name, None, role_policy) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) bucket_body = 'this is a test file' tags = 'Department=Engineering' @@ -2012,10 +2585,10 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj(): s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key=key, Tagging=tags) assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - #copy to same bucket + # copy to same bucket copy_source = { - 'Bucket': bucket_name, - 'Key': 'test-1.txt' + 'Bucket': bucket_name, + 'Key': 'test-1.txt' } s3_client.copy(copy_source, bucket_name, "test-2.txt") @@ -2023,10 +2596,10 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj(): s3_get_obj = s3_client.get_object(Bucket=bucket_name, Key="test-2.txt") assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - #copy to another bucket + # copy to another bucket copy_source = { - 'Bucket': bucket_name, - 'Key': 'test-1.txt' + 'Bucket': bucket_name, + 'Key': 'test-1.txt' } s3_client.copy(copy_source, copy_bucket_name, "test-1.txt") @@ -2034,9 +2607,8 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj(): s3_get_obj = s3_client.get_object(Bucket=copy_bucket_name, Key="test-1.txt") assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"]) + @pytest.mark.webidentity_test @pytest.mark.abac_test @@ -2044,13 +2616,13 @@ def test_assume_role_with_web_identity_resource_tag_copy_obj(): @pytest.mark.fails_on_dbstore def test_assume_role_with_web_identity_role_resource_tag(): check_webidentity() - iam_client=get_iam_client() - sts_client=get_sts_client() - default_endpoint=get_config_endpoint() - role_session_name=get_parameter_name() - thumbprint=get_thumbprint() - user_token=get_user_token() - realm=get_realm_name() + iam_client = get_iam_client() + sts_client = get_sts_client() + default_endpoint = get_config_endpoint() + role_session_name = get_parameter_name() + thumbprint = get_thumbprint() + user_token = get_user_token() + realm = get_realm_name() s3_res_iam_creds = get_s3_resource_using_iam_creds() @@ -2061,44 +2633,73 @@ def test_assume_role_with_web_identity_role_resource_tag(): assert s3bucket['ResponseMetadata']['HTTPStatusCode'] == 200 bucket_tagging = s3_res_iam_creds.BucketTagging(bucket_name) - Set_Tag = bucket_tagging.put(Tagging={'TagSet':[{'Key':'Department', 'Value': 'Engineering'},{'Key':'Department', 'Value': 'Marketing'}]}) + bucket_tagging.put(Tagging={ + 'TagSet': [ + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + { + 'Key': 'Department', + 'Value': 'Marketing', + }, + ], + }, + ) oidc_response = iam_client.create_open_id_connect_provider( - Url='http://localhost:8080/auth/realms/{}'.format(realm), - ThumbprintList=[ - thumbprint, - ], + Url='http://localhost:8080/auth/realms/{}'.format(realm), + ThumbprintList=[ + thumbprint, + ], ) - #iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy. + # iam:ResourceTag refers to the tag attached to role, hence the role is allowed to be assumed only when it has a tag matching the policy. policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Federated\":[\""+oidc_response["OpenIDConnectProviderArn"]+"\"]},\"Action\":[\"sts:AssumeRoleWithWebIdentity\",\"sts:TagSession\"],\"Condition\":{\"StringEquals\":{\"iam:ResourceTag/Department\":\"Engineering\"}}}]}" tags_list = [ - {'Key':'Department','Value':'Engineering'}, - {'Key':'Department','Value':'Marketing'} - ] - - (role_error,role_response,general_role_name)=create_role(iam_client,'/',None,policy_document,None,None,None,tags_list) + { + 'Key': 'Department', + 'Value': 'Engineering', + }, + { + 'Key': 'Department', + 'Value': 'Marketing', + }, + ] + + role_error, role_response, general_role_name = create_role( + iam_client, + '/', + None, + policy_document, + None, + None, + None, + tags_list, + ) assert role_response['Role']['Arn'] == 'arn:aws:iam:::role/'+general_role_name+'' role_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":\"s3:*\",\"Resource\":\"arn:aws:s3:::*\",\"Condition\":{\"StringEquals\":{\"s3:ResourceTag/Department\":[\"Engineering\"]}}}}" - (role_err,response)=put_role_policy(iam_client,general_role_name,None,role_policy) + role_err, response = put_role_policy(iam_client, general_role_name, None, role_policy) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 - resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=user_token) + resp = sts_client.assume_role_with_web_identity( + RoleArn=role_response['Role']['Arn'], + RoleSessionName=role_session_name, + WebIdentityToken=user_token, + ) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 - s3_client = boto3.client('s3', - aws_access_key_id = resp['Credentials']['AccessKeyId'], - aws_secret_access_key = resp['Credentials']['SecretAccessKey'], - aws_session_token = resp['Credentials']['SessionToken'], + s3_client = boto3.client( + 's3', + aws_access_key_id=resp['Credentials']['AccessKeyId'], + aws_secret_access_key=resp['Credentials']['SecretAccessKey'], + aws_session_token=resp['Credentials']['SessionToken'], endpoint_url=default_endpoint, region_name='', - ) + ) - bucket_body = 'this is a test file' - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name, Key="test-1.txt") + s3_put_obj = s3_client.put_object(Body='this is a test file', Bucket=bucket_name, Key="test-1.txt") assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - oidc_remove=iam_client.delete_open_id_connect_provider( - OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"] - ) + iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_response["OpenIDConnectProviderArn"])