Skip to content

Commit 21b9386

Browse files
committed
CopyObject: add test for encrypted objects
Add test for copy on sse-s3 and sse-c encrypted objects. Signed-off-by: Seena Fallah <[email protected]>
1 parent 08df935 commit 21b9386

File tree

1 file changed

+172
-0
lines changed

1 file changed

+172
-0
lines changed

s3tests_boto3/functional/test_s3.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13915,3 +13915,175 @@ def test_post_object_upload_checksum():
1391513915

1391613916
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
1391713917
assert r.status_code == 400
13918+
13919+
13920+
#########################
13921+
# COPY ENCRYPTION TESTS #
13922+
#########################
13923+
_copy_enc_source_modes = {
13924+
'unencrypted': {
13925+
'marks': [pytest.mark.fails_on_aws],
13926+
},
13927+
'sse-s3': {
13928+
'args': {'ServerSideEncryption': 'AES256'},
13929+
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
13930+
'marks': [pytest.mark.sse_s3],
13931+
},
13932+
'sse-c': {
13933+
'args': {
13934+
'SSECustomerAlgorithm': 'AES256',
13935+
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
13936+
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
13937+
},
13938+
'source_copy_args': {
13939+
'CopySourceSSECustomerAlgorithm': 'AES256',
13940+
'CopySourceSSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
13941+
'CopySourceSSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw==',
13942+
},
13943+
'assert': lambda r: (
13944+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
13945+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'DWygnHRtgiJ77HCm+1rvHw=='
13946+
)
13947+
},
13948+
'sse-kms': {
13949+
'args': {
13950+
'ServerSideEncryption': 'aws:kms',
13951+
'SSEKMSKeyId': lambda: get_main_kms_keyid()
13952+
},
13953+
'assert': lambda r: (
13954+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
13955+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_main_kms_keyid()
13956+
)
13957+
}
13958+
}
13959+
_copy_enc_dest_modes = {
13960+
'unencrypted': {
13961+
'marks': [pytest.mark.fails_on_aws],
13962+
},
13963+
'sse-s3': {
13964+
'args': {'ServerSideEncryption': 'AES256'},
13965+
'assert': lambda r: r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256',
13966+
'marks': [pytest.mark.sse_s3],
13967+
},
13968+
'sse-c': {
13969+
'args': {
13970+
'SSECustomerAlgorithm': 'AES256',
13971+
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
13972+
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
13973+
},
13974+
'get_args': {
13975+
'SSECustomerAlgorithm': 'AES256',
13976+
'SSECustomerKey': '6b+WOZ1T3cqZMxgThRcXAQBrS5mXKdDUphvpxptl9/4=',
13977+
'SSECustomerKeyMD5': 'arxBvwY2V4SiOne6yppVPQ=='
13978+
},
13979+
'assert': lambda r: (
13980+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-algorithm'] == 'AES256' and
13981+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-customer-key-MD5'] == 'arxBvwY2V4SiOne6yppVPQ=='
13982+
)
13983+
},
13984+
'sse-kms': {
13985+
'args': {
13986+
'ServerSideEncryption': 'aws:kms',
13987+
'SSEKMSKeyId': lambda: get_secondary_kms_keyid()
13988+
},
13989+
'assert': lambda r: (
13990+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'aws:kms' and
13991+
r['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption-aws-kms-key-id'] == get_secondary_kms_keyid()
13992+
)
13993+
}
13994+
}
13995+
13996+
def _test_copy_enc(file_size, source_mode_key, dest_mode_key):
13997+
source_args = _copy_enc_source_modes[source_mode_key]
13998+
dest_args = _copy_enc_dest_modes[dest_mode_key]
13999+
14000+
bucket_name = get_new_bucket()
14001+
client = get_client()
14002+
14003+
# upload original file with source encryption
14004+
data = 'A'*file_size
14005+
args = {key: value() if callable(value) else value for key, value in source_args.get('args', {}).items()}
14006+
response = client.put_object(Bucket=bucket_name, Key='testobj', Body=data, **args)
14007+
assert source_args.get('assert', lambda r: True)(response)
14008+
14009+
# copy the object to a new key, with destination encryption
14010+
dest_bucket_name = get_new_bucket()
14011+
copy_args = {key: value() if callable(value) else value for key, value in dest_args.get('args', {}).items()}
14012+
copy_args.update(source_args.get('source_copy_args', {}))
14013+
response = client.copy_object(Bucket=dest_bucket_name, Key='testobj2', CopySource={'Bucket': bucket_name, 'Key': 'testobj'}, **copy_args)
14014+
assert dest_args.get('assert', lambda r: True)(response)
14015+
14016+
# verify the copy is encrypted
14017+
get_args = dest_args.get('get_args', {})
14018+
response = client.get_object(Bucket=dest_bucket_name, Key='testobj2', **get_args)
14019+
assert dest_args.get('assert', lambda r: True)(response)
14020+
body = _get_body(response)
14021+
assert body == data
14022+
14023+
@pytest.mark.encryption
14024+
@pytest.mark.fails_on_dbstore
14025+
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
14026+
pytest.param(
14027+
source_key,
14028+
dest_key,
14029+
marks=[
14030+
*_copy_enc_source_modes[source_key].get('marks', []),
14031+
*_copy_enc_dest_modes[dest_key].get('marks', [])
14032+
]
14033+
)
14034+
for source_key in _copy_enc_source_modes.keys()
14035+
for dest_key in _copy_enc_dest_modes.keys()
14036+
])
14037+
def test_copy_enc_1b(source_mode_key, dest_mode_key):
14038+
_test_copy_enc(1, source_mode_key, dest_mode_key)
14039+
14040+
@pytest.mark.encryption
14041+
@pytest.mark.fails_on_dbstore
14042+
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
14043+
pytest.param(
14044+
source_key,
14045+
dest_key,
14046+
marks=[
14047+
*_copy_enc_source_modes[source_key].get('marks', []),
14048+
*_copy_enc_dest_modes[dest_key].get('marks', [])
14049+
]
14050+
)
14051+
for source_key in _copy_enc_source_modes.keys()
14052+
for dest_key in _copy_enc_dest_modes.keys()
14053+
])
14054+
def test_copy_enc_1kb(source_mode_key, dest_mode_key):
14055+
_test_copy_enc(1024, source_mode_key, dest_mode_key)
14056+
14057+
@pytest.mark.encryption
14058+
@pytest.mark.fails_on_dbstore
14059+
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
14060+
pytest.param(
14061+
source_key,
14062+
dest_key,
14063+
marks=[
14064+
*_copy_enc_source_modes[source_key].get('marks', []),
14065+
*_copy_enc_dest_modes[dest_key].get('marks', [])
14066+
]
14067+
)
14068+
for source_key in _copy_enc_source_modes.keys()
14069+
for dest_key in _copy_enc_dest_modes.keys()
14070+
])
14071+
def test_copy_enc_1mb(source_mode_key, dest_mode_key):
14072+
_test_copy_enc(1024*1024, source_mode_key, dest_mode_key)
14073+
14074+
@pytest.mark.encryption
14075+
@pytest.mark.fails_on_dbstore
14076+
@pytest.mark.parametrize("source_mode_key, dest_mode_key", [
14077+
pytest.param(
14078+
source_key,
14079+
dest_key,
14080+
marks=[
14081+
*_copy_enc_source_modes[source_key].get('marks', []),
14082+
*_copy_enc_dest_modes[dest_key].get('marks', [])
14083+
]
14084+
)
14085+
for source_key in _copy_enc_source_modes.keys()
14086+
for dest_key in _copy_enc_dest_modes.keys()
14087+
])
14088+
def test_copy_enc_8mb(source_mode_key, dest_mode_key):
14089+
_test_copy_enc(8*1024*1024, source_mode_key, dest_mode_key)

0 commit comments

Comments
 (0)