|
37 | 37 | get_iam_path_prefix, |
38 | 38 | make_iam_name, |
39 | 39 | get_client, |
| 40 | + get_main_user_id, |
| 41 | + get_alt_client, |
40 | 42 | get_alt_user_id, |
41 | 43 | get_config_endpoint, |
42 | 44 | get_new_bucket_name, |
| 45 | + get_new_bucket, |
43 | 46 | get_parameter_name, |
44 | 47 | get_main_aws_access_key, |
45 | 48 | get_main_aws_secret_key, |
|
55 | 58 | get_azp, |
56 | 59 | get_user_token |
57 | 60 | ) |
| 61 | +from .utils import (assert_raises, _get_status) |
58 | 62 |
|
59 | 63 | log = logging.getLogger(__name__) |
60 | 64 |
|
@@ -394,6 +398,97 @@ def test_assume_role_allow_head_nonexistent(): |
394 | 398 | status = e.response['ResponseMetadata']['HTTPStatusCode'] |
395 | 399 | assert status == 404 |
396 | 400 |
|
| 401 | +@pytest.mark.test_of_sts |
| 402 | +@pytest.mark.fails_on_dbstore |
| 403 | +def test_assume_role_owner_allow(): |
| 404 | + iam_client=get_iam_client() |
| 405 | + sts_client=get_sts_client() |
| 406 | + sts_user_id=get_alt_user_id() |
| 407 | + default_endpoint=get_config_endpoint() |
| 408 | + role_name=get_parameter_name() |
| 409 | + role_session_name=get_parameter_name() |
| 410 | + |
| 411 | + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' |
| 412 | + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) |
| 413 | + |
| 414 | + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) |
| 415 | + |
| 416 | + s3_client = boto3.client('s3', |
| 417 | + aws_access_key_id = resp['Credentials']['AccessKeyId'], |
| 418 | + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], |
| 419 | + aws_session_token = resp['Credentials']['SessionToken'], |
| 420 | + endpoint_url=default_endpoint, |
| 421 | + region_name='') |
| 422 | + |
| 423 | + # create a bucket with the alt user |
| 424 | + bucket_name = get_new_bucket(get_alt_client()) |
| 425 | + |
| 426 | + # access allowed from role assumed by alt user |
| 427 | + s3_client.get_bucket_location(Bucket=bucket_name) |
| 428 | + |
| 429 | +@pytest.mark.test_of_sts |
| 430 | +@pytest.mark.fails_on_dbstore |
| 431 | +def test_assume_role_owner_deny(): |
| 432 | + iam_client=get_iam_client() |
| 433 | + sts_client=get_sts_client() |
| 434 | + sts_user_id=get_alt_user_id() |
| 435 | + default_endpoint=get_config_endpoint() |
| 436 | + role_name=get_parameter_name() |
| 437 | + role_session_name=get_parameter_name() |
| 438 | + |
| 439 | + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' |
| 440 | + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) |
| 441 | + |
| 442 | + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) |
| 443 | + |
| 444 | + s3_client = boto3.client('s3', |
| 445 | + aws_access_key_id = resp['Credentials']['AccessKeyId'], |
| 446 | + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], |
| 447 | + aws_session_token = resp['Credentials']['SessionToken'], |
| 448 | + endpoint_url=default_endpoint, |
| 449 | + region_name='') |
| 450 | + |
| 451 | + # create a bucket with the main user |
| 452 | + main_client = get_client() |
| 453 | + bucket_name = get_new_bucket(main_client) |
| 454 | + |
| 455 | + # access denied from role assumed by alt user |
| 456 | + e = assert_raises(ClientError, s3_client.get_bucket_location, Bucket=bucket_name) |
| 457 | + assert 403 == _get_status(e.response) |
| 458 | + |
| 459 | +@pytest.mark.test_of_sts |
| 460 | +@pytest.mark.fails_on_dbstore |
| 461 | +def test_assume_role_acl_allow(): |
| 462 | + iam_client=get_iam_client() |
| 463 | + sts_client=get_sts_client() |
| 464 | + main_user_id=get_main_user_id() |
| 465 | + sts_user_id=get_alt_user_id() |
| 466 | + default_endpoint=get_config_endpoint() |
| 467 | + role_name=get_parameter_name() |
| 468 | + role_session_name=get_parameter_name() |
| 469 | + |
| 470 | + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' |
| 471 | + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) |
| 472 | + |
| 473 | + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) |
| 474 | + |
| 475 | + s3_client = boto3.client('s3', |
| 476 | + aws_access_key_id = resp['Credentials']['AccessKeyId'], |
| 477 | + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], |
| 478 | + aws_session_token = resp['Credentials']['SessionToken'], |
| 479 | + endpoint_url=default_endpoint, |
| 480 | + region_name='') |
| 481 | + |
| 482 | + # create a bucket with the main user and grant read acl to alt user |
| 483 | + main_client = get_client() |
| 484 | + bucket_name = get_new_bucket(main_client) |
| 485 | + main_client.put_bucket_acl(Bucket=bucket_name, |
| 486 | + GrantFullControl=f'id={main_user_id}', |
| 487 | + GrantReadACP=f'id={sts_user_id}') |
| 488 | + |
| 489 | + # access allowed from role assumed by alt user |
| 490 | + s3_client.get_bucket_location(Bucket=bucket_name) |
| 491 | + |
397 | 492 |
|
398 | 493 | @pytest.mark.webidentity_test |
399 | 494 | @pytest.mark.token_claims_trust_policy_test |
|
0 commit comments