Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions docs/docs/services/kms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ kms
- [ ] create_custom_key_store
- [X] create_grant
- [X] create_key

The provided Policy currently does not need to be valid. If it is valid, Moto will perform authorization checks on key-related operations, just like AWS does.

These authorization checks are quite basic for now. Moto will only throw an AccessDeniedException if the following conditions are met:
- The principal is set to "*"
- The resource is set to "*"
- The Action matches `describe_key`


One may use two addition Moto-specific Tag keys to influence how Moto creates Keys:

- ``_custom_key_material_`` is a *base64* encoded means of providing the master key material rather than having Moto generate it

Be aware that this key material is still subject to the cryptographic constraints for the key use
- ``_custom_id_`` is the desired Key Id (this Tag key is actually not KMS specific but is useful in generate-key for the same deterministic behavior)

They may be used together or separate depending on the deterministic behavior required

- [X] decrypt
- [X] delete_alias
Expand Down Expand Up @@ -70,11 +78,11 @@ kms
- [X] rotate_key_on_demand
- [X] schedule_key_deletion
- [X] sign

Sign message using generated private key.

- grant_tokens are not implemented


- [X] tag_resource
- [X] untag_resource
Expand All @@ -83,12 +91,11 @@ kms
- [X] update_key_description
- [ ] update_primary_region
- [X] verify

Verify message using public key from generated private key.

- grant_tokens are not implemented
- The MessageType-parameter DIGEST is not yet implemented


- [X] verify_mac

- [X] verify_mac
35 changes: 33 additions & 2 deletions moto/kms/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import json
import os
from collections.abc import Iterable
Expand All @@ -14,6 +15,7 @@
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition

from ..utilities.id_generator import TAG_KEY_CUSTOM_ID
from .exceptions import (
InvalidKeyUsageException,
KMSInvalidMacException,
Expand All @@ -31,6 +33,11 @@
generate_private_key,
)

TAG_KEY_CUSTOM_KEY_MATERIAL = "_custom_key_material_"
"""Allow the caller to influence the behavior of ``create-key`` such that it is deterministic

This value must be base64 to survive the --tag serialization process"""


class Grant(BaseModel):
def __init__(
Expand Down Expand Up @@ -135,8 +142,29 @@ def __init__(
region: str,
multi_region: bool = False,
origin: str = "AWS_KMS",
tags: Optional[list[dict[str, str]]] = None,
):
self.id = generate_key_id(multi_region)
key_id = generate_key_id(multi_region)
key_material: Optional[bytes] = None
if tags is not None:
maybe_id = [
it["TagValue"].strip()
for it in tags
if it.get("TagKey", "") == TAG_KEY_CUSTOM_ID
]
if maybe_id:
# for this exercise, it is an error to not include the TagValue
key_id = maybe_id[0]
maybe_material = [
it["TagValue"].strip()
for it in tags
if it.get("TagKey", "") == TAG_KEY_CUSTOM_KEY_MATERIAL
]
if maybe_material:
key_material = base64.b64decode(maybe_material[0])
if key_material is None:
key_material = generate_master_key()
self.id = key_id
self.creation_date = unix_time()
self.account_id = account_id
self.region = region
Expand All @@ -157,7 +185,9 @@ def __init__(
}
self.key_rotation_status = False
self.deletion_date: Optional[datetime] = None
self.key_material = generate_master_key()
if key_material is None:
raise TypeError("not without key_material")
self.key_material = key_material
self.origin = origin
self.key_manager = "CUSTOMER"
self.key_spec = key_spec or "SYMMETRIC_DEFAULT"
Expand Down Expand Up @@ -399,6 +429,7 @@ def create_key(
self.region_name,
multi_region,
origin,
tags=tags or [],
)
self.keys[key.id] = key
if tags is not None and len(tags) > 0:
Expand Down
6 changes: 4 additions & 2 deletions moto/kms/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any

from moto.core.responses import BaseResponse
from moto.kms.utils import RESERVED_ALIASE_TARGET_KEY_IDS, RESERVED_ALIASES
from moto.kms.utils import RESERVED_ALIAS_TARGET_KEY_IDS, RESERVED_ALIASES
from moto.utilities.utils import get_partition

from .exceptions import (
Expand Down Expand Up @@ -287,7 +287,7 @@ def list_aliases(self) -> str:
"TargetKeyId": alias.target_key_id,
}
)
for reserved_alias, target_key_id in RESERVED_ALIASE_TARGET_KEY_IDS.items():
for reserved_alias, target_key_id in RESERVED_ALIAS_TARGET_KEY_IDS.items():
if key_id and target_key_id != key_id:
continue
existing = [a for a in response_aliases if a["AliasName"] == reserved_alias]
Expand Down Expand Up @@ -603,6 +603,8 @@ def generate_mac(self) -> str:
dry_run = self._get_param("DryRun")

self._validate_key_id(key_id)
if "alias/" in key_id:
key_id = self.kms_backend.get_key_id_from_alias(key_id)

mac_algorithms = {
"HMAC_SHA_224",
Expand Down
8 changes: 5 additions & 3 deletions moto/kms/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
CIPHERTEXT_HEADER_FORMAT = f">{KEY_ID_LEN}s{IV_LEN}s{TAG_LEN}s"
Ciphertext = namedtuple("Ciphertext", ("key_id", "iv", "ciphertext", "tag"))

RESERVED_ALIASE_TARGET_KEY_IDS = {
RESERVED_ALIAS_TARGET_KEY_IDS = {
# NOTE: These would technically differ across account, but in that they are
# out of customer control, testing that they are different would be redundant.
"alias/aws/acm": "4f58743d-e279-4214-9270-8cc28277958d",
Expand All @@ -52,8 +52,10 @@
"alias/aws/ssm": "cb3f6250-5078-48c0-a75f-0290bf47694e",
"alias/aws/xray": "e9b758eb-6230-4744-93d1-ad3b7d71f2f6",
}
"""Maps from the key alias to the KeyId"""

RESERVED_ALIASES = list(RESERVED_ALIASE_TARGET_KEY_IDS.keys())
RESERVED_ALIASES = list(RESERVED_ALIAS_TARGET_KEY_IDS.keys())
"""The list of `create-alias` that should not succeed since they are reserved"""


class KeySpec(str, Enum):
Expand Down Expand Up @@ -137,7 +139,7 @@ def generate_data_key(number_of_bytes: int) -> bytes:


def generate_master_key() -> bytes:
"""Generate a master key."""
"""Generate a master key which is ``MASTER_KEY_LEN`` bytes long"""
return generate_data_key(MASTER_KEY_LEN)


Expand Down
49 changes: 49 additions & 0 deletions tests/test_kms/test_kms_encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from botocore.exceptions import ClientError

from moto import mock_aws
from moto.kms.models import TAG_KEY_CUSTOM_KEY_MATERIAL, Key
from moto.kms.utils import encrypt
from moto.utilities.id_generator import TAG_KEY_CUSTOM_ID
from tests import aws_verified

from .test_kms import PLAINTEXT_VECTORS, _get_encoded_value
Expand Down Expand Up @@ -213,3 +216,49 @@ def test_re_encrypt_to_invalid_destination():
CiphertextBlob=encrypt_response["CiphertextBlob"],
DestinationKeyId="alias/DoesNotExist",
)


@mock_aws
def test_create_key_custom_key_material_symmetric_decrypt():
# Arrange
custom_key_material = b"editor dog green pencils"
custom_key_tag_value = base64.b64encode(custom_key_material).decode("utf-8")
algo = "SYMMETRIC_DEFAULT"
message = b"test message 123 !%$@ 1234567890"

# thanks to the deterministic behavior, we can actually make our own Key for use in encrypt()
# regrettably this needs to test the custom KeyId behavior, too, since encrypt()
# includes the KeyId in the ciphertext
tmp_key = Key(
policy=None,
key_usage="",
key_spec="",
description="",
account_id="",
region="",
)
tmp_key.id = "00000001-0002-0003-0004-000000000005"
tmp_key.key_material = custom_key_material
expected_ciphertext_blob = encrypt(
master_keys={tmp_key.id: tmp_key},
key_id=tmp_key.id,
plaintext=message,
encryption_context={},
)

# Act
client = boto3.client("kms", region_name="us-west-2")
key_id = client.create_key(
Tags=[
{"TagKey": TAG_KEY_CUSTOM_ID, "TagValue": tmp_key.id},
{"TagKey": TAG_KEY_CUSTOM_KEY_MATERIAL, "TagValue": custom_key_tag_value},
]
)["KeyMetadata"]["KeyId"]

plaintext = client.decrypt(
KeyId=key_id,
CiphertextBlob=expected_ciphertext_blob,
EncryptionAlgorithm=algo,
)["Plaintext"]

assert plaintext == message
64 changes: 64 additions & 0 deletions tests/test_kms/test_kms_mac.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Unit tests for kms-supported APIs."""

import base64
import hmac

import boto3
import pytest

from moto import mock_aws
from moto.kms.models import TAG_KEY_CUSTOM_KEY_MATERIAL

# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
Expand Down Expand Up @@ -133,3 +135,65 @@ def test_verify_mac_fails_for_another_key_id():
Message=base64.b64encode(b"Hello World"),
Mac=mac,
)


@mock_aws
def test_generate_mac_allows_aliases():
# Arrange
my_alias = "alias/test-1234"
key_id = create_hmac_key()

# Act + Assert
client = boto3.client("kms", region_name="eu-central-1")
client.create_alias(
AliasName=my_alias,
TargetKeyId=key_id,
)
mac_key_id = client.generate_mac(
KeyId=my_alias,
MacAlgorithm="HMAC_SHA_256",
Message=base64.b64encode(b"aliases work fine"),
)["KeyId"]

assert mac_key_id == key_id


@mock_aws
def test_create_key_custom_key_material_hmac():
# Arrange
custom_key_material = b"custom test key material"
custom_key_tag_value = base64.b64encode(custom_key_material).decode("utf-8")
message = "some important message"
key_spec = "HMAC_256"
mac_algo = "HMAC_SHA_256"

h = hmac.HMAC(custom_key_material, message.encode("utf-8"), "SHA256")
expected_mac = h.digest()

# Act
client = boto3.client("kms", region_name="eu-central-1")

key_id = client.create_key(
KeySpec=key_spec,
KeyUsage="GENERATE_VERIFY_MAC",
Tags=[
{"TagKey": TAG_KEY_CUSTOM_KEY_MATERIAL, "TagValue": custom_key_tag_value}
],
)["KeyMetadata"]["KeyId"]

mac = client.generate_mac(
KeyId=key_id,
Message=message,
MacAlgorithm=mac_algo,
)["Mac"]
verify_mac_response = client.verify_mac(
KeyId=key_id,
Message=message,
MacAlgorithm=mac_algo,
Mac=expected_mac,
)

# Assert
assert mac == expected_mac

assert verify_mac_response["MacValid"]