Skip to content

Commit c5eb36d

Browse files
committed
feat: New Container: Lowkey Vault
- Add Lowkey Vault container implementation - Cover the new container with tests - Add basic example for secret use Resolves #948
1 parent 9a97385 commit c5eb36d

File tree

8 files changed

+780
-7
lines changed

8 files changed

+780
-7
lines changed

modules/lowkey-vault/README.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.. autoclass:: testcontainers.lowkeyvault.LowkeyVaultContainer
2+
.. title:: testcontainers.lowkeyvault.LowkeyVaultContainer
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import urllib3
2+
from azure.core.pipeline.transport._requests_basic import RequestsTransport
3+
from azure.keyvault.secrets import SecretClient
4+
5+
from testcontainers.lowkeyvault import LowkeyVaultContainer
6+
7+
8+
def basic_example():
9+
with LowkeyVaultContainer() as lowkey_vault_container:
10+
# get connection details
11+
connection_url = lowkey_vault_container.get_connection_url()
12+
print(f"Lowkey Vault is running: {connection_url}")
13+
token = lowkey_vault_container.get_token()
14+
print("Obtained token")
15+
# prepare a transport ignoring self signed certificate issues
16+
transport = RequestsTransport(connection_verify=False)
17+
# make sure to turn off challenge resource verification
18+
secret_client: SecretClient = SecretClient(
19+
vault_url=connection_url, credential=token, verify_challenge_resource=False, transport=transport
20+
)
21+
22+
# set a secret
23+
secret_client.set_secret(name="test-secret", value="a secret message")
24+
print("The secret has been set.")
25+
26+
# get the value of the secret
27+
actual: str = secret_client.get_secret(name="test-secret").value
28+
print(f"The secret has been retrieved with value: '{actual}'")
29+
30+
# close the secret client
31+
secret_client.close()
32+
33+
34+
if __name__ == "__main__":
35+
# ignore cert errors
36+
urllib3.disable_warnings()
37+
# run the code
38+
basic_example()
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from enum import Enum
2+
from typing import Any, Optional
3+
4+
import requests
5+
from azure.core.credentials import AccessToken, TokenCredential
6+
7+
from testcontainers.core.container import DockerContainer
8+
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
9+
10+
# This comment can be removed (Used for testing)
11+
12+
13+
class StaticTokenCredential(TokenCredential):
14+
def __init__(self, json_token: str):
15+
self.access_token = AccessToken(token=json_token.get("access_token"), expires_on=json_token.get("expires_on"))
16+
17+
def get_token(
18+
self,
19+
*scopes: str,
20+
claims: Optional[str] = None,
21+
tenant_id: Optional[str] = None,
22+
enable_cae: bool = False,
23+
**kwargs: Any,
24+
) -> AccessToken:
25+
return self.access_token
26+
27+
28+
class NetworkType(Enum):
29+
NETWORK = "network"
30+
LOCAL = "local"
31+
32+
33+
class LowkeyVaultContainer(DockerContainer):
34+
"""
35+
Container for a Lowkey Vault instance for emulating Azure Key Vault.
36+
37+
Example:
38+
39+
.. doctest::
40+
41+
>>> from azure.core.pipeline.transport._requests_basic import RequestsTransport
42+
>>> from azure.keyvault.secrets import SecretClient
43+
>>> from testcontainers.lowkeyvault import LowkeyVaultContainer
44+
45+
>>> with LowkeyVaultContainer() as lowkey_vault:
46+
... connection_url = lowkey_vault.get_connection_url()
47+
... token = lowkey_vault.get_token()
48+
... transport = RequestsTransport(connection_verify=False)
49+
... # make sure to turn off challenge resource verification
50+
... secret_client = SecretClient(
51+
... vault_url=connection_url,
52+
... credential=token,
53+
... verify_challenge_resource=False,
54+
... transport=transport
55+
... )
56+
"""
57+
58+
def __init__(
59+
self, image: str = "nagyesta/lowkey-vault:7.0.3-ubi10-minimal", container_alias: Optional[str] = None, **kwargs
60+
) -> None:
61+
super().__init__(image, **kwargs)
62+
self.api_port = 8443
63+
self.metadata_port = 8080
64+
self.with_exposed_ports(self.api_port, self.metadata_port)
65+
self.with_env("LOWKEY_VAULT_RELAXED_PORTS", "true")
66+
if container_alias is not None:
67+
self.with_network_aliases(container_alias)
68+
self.with_env("LOWKEY_VAULT_ALIASES", f"localhost={container_alias}:<port>")
69+
self.container_alias = container_alias
70+
self.waiting_for(LogMessageWaitStrategy("Started LowkeyVaultApp."))
71+
72+
def _configure(self) -> None:
73+
return
74+
75+
def get_connection_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
76+
if network_type == NetworkType.LOCAL:
77+
return f"https://{self.get_container_host_ip()}:{self.get_exposed_port(self.api_port)}"
78+
else:
79+
return f"https://{self.container_alias}:{self.api_port}"
80+
81+
def get_imds_endpoint(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
82+
if network_type == NetworkType.LOCAL:
83+
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.metadata_port)}"
84+
else:
85+
return f"http://{self.container_alias}:{self.metadata_port}"
86+
87+
def get_token_url(self, network_type: NetworkType = NetworkType.LOCAL) -> str:
88+
base_url = self.get_imds_endpoint(network_type=network_type)
89+
return f"{base_url}/metadata/identity/oauth2/token"
90+
91+
def get_token(self, network_type: NetworkType = NetworkType.LOCAL) -> StaticTokenCredential:
92+
resource = self.get_connection_url(network_type=network_type)
93+
token_url = self.get_token_url(network_type=network_type)
94+
json_response = requests.get(f"{token_url}?resource={resource}").json()
95+
return StaticTokenCredential(json_token=json_response)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use an official Python runtime as a parent image
2+
FROM python:3.10-slim
3+
4+
# Set the working directory in the container
5+
WORKDIR /app
6+
7+
# Install dependencies and create a dummy key file for the authentication to work
8+
RUN \
9+
pip install azure-keyvault-certificates==4.10.0 && \
10+
pip install azure-keyvault-secrets==4.10.0 && \
11+
pip install azure-keyvault-keys==4.11.0 && \
12+
pip install cryptography==46.0.3 && \
13+
pip install azure-identity==1.25.1 && \
14+
mkdir -p /var/opt/azcmagent/tokens/ && \
15+
touch /var/opt/azcmagent/tokens/assumed-identity.key
16+
17+
COPY ./netowrk_container.py netowrk_container.py
18+
EXPOSE 80
19+
# Define the command to run the application
20+
CMD ["python", "netowrk_container.py"]
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
import base64
2+
import os
3+
4+
import urllib3
5+
from azure.core.pipeline.transport._requests_basic import RequestsTransport
6+
from azure.identity import DefaultAzureCredential
7+
from azure.keyvault.certificates import CertificateClient, CertificatePolicy
8+
from azure.keyvault.keys import KeyClient, KeyOperation
9+
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm, EncryptResult, DecryptResult
10+
from azure.keyvault.secrets import SecretClient
11+
from cryptography import x509
12+
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
13+
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
14+
15+
API_VERSION = "7.6"
16+
17+
18+
def hello_secrets_from_an_external_container():
19+
"""
20+
Entry point function for a custom Docker container to test connectivity
21+
and "secrets" functionality with Lowkey Vault.
22+
23+
This function is designed to run inside a separate container within the
24+
same Docker network as a Lowkey Vault instance.
25+
"""
26+
vault_url = os.environ["CONNECTION_URL"]
27+
secret_message = os.environ["SECRET_VALUE"]
28+
secret_name = os.environ["SECRET_NAME"]
29+
# test secrets API to see that the container works
30+
secret_client = None
31+
try:
32+
# ignore SSL errors because we are using a self-signed certificate
33+
transport_secrets = RequestsTransport(connection_verify=False)
34+
# create the client we need
35+
secret_client = SecretClient(
36+
vault_url=vault_url,
37+
credential=DefaultAzureCredential(),
38+
verify_challenge_resource=False,
39+
transport=transport_secrets,
40+
api_version=API_VERSION,
41+
)
42+
# set the result as a secret
43+
secret_client.set_secret(name=secret_name, value=secret_message)
44+
# get back the value
45+
actual = secret_client.get_secret(name=secret_name).value
46+
47+
# verify the result
48+
assert actual == secret_message
49+
print("Lowkey Vault Container created.")
50+
except Exception as e:
51+
print(f"Something went wrong : {e}")
52+
finally:
53+
# close client
54+
if secret_client is not None:
55+
secret_client.close()
56+
57+
58+
def hello_keys_from_an_external_container():
59+
"""
60+
Entry point function for a custom Docker container to test connectivity
61+
and "keys" functionality with Lowkey Vault.
62+
63+
This function is designed to run inside a separate container within the
64+
same Docker network as a Lowkey Vault instance.
65+
"""
66+
vault_url = os.environ["CONNECTION_URL"]
67+
secret_message = os.environ["SECRET_VALUE"]
68+
key_name = os.environ["KEY_NAME"]
69+
# test key API to see that the container works
70+
key_client = None
71+
crypto_client = None
72+
try:
73+
# ignore SSL errors because we are using a self-signed certificate
74+
transport_keys = RequestsTransport(connection_verify=False)
75+
transport_crypto = RequestsTransport(connection_verify=False)
76+
# create the clients we need
77+
key_client = KeyClient(
78+
vault_url=vault_url,
79+
credential=DefaultAzureCredential(),
80+
verify_challenge_resource=False,
81+
transport=transport_keys,
82+
api_version=API_VERSION,
83+
)
84+
# create a new key
85+
key_client.create_rsa_key(
86+
name=key_name,
87+
size=2048,
88+
key_operations=[KeyOperation.encrypt, KeyOperation.decrypt, KeyOperation.wrap_key, KeyOperation.unwrap_key],
89+
)
90+
91+
crypto_client = CryptographyClient(
92+
key=key_client.get_key(name=key_name).id,
93+
credential=DefaultAzureCredential(),
94+
verify_challenge_resource=False,
95+
transport=transport_crypto,
96+
api_version=API_VERSION,
97+
)
98+
99+
# encode the text
100+
text_as_bytes: bytes = bytes(secret_message.encode("utf-8"))
101+
encrypted: EncryptResult = crypto_client.encrypt(
102+
algorithm=EncryptionAlgorithm.rsa_oaep_256, plaintext=text_as_bytes
103+
)
104+
cipher_text: bytes = encrypted.ciphertext
105+
106+
# decode the cipher text
107+
decrypted: DecryptResult = crypto_client.decrypt(
108+
algorithm=EncryptionAlgorithm.rsa_oaep_256, ciphertext=cipher_text
109+
)
110+
decrypted_text: str = decrypted.plaintext.decode("utf-8")
111+
112+
# verify the result
113+
assert decrypted_text == secret_message
114+
print("Lowkey Vault Container created.")
115+
except Exception as e:
116+
print(f"Something went wrong : {e}")
117+
finally:
118+
# close clients
119+
if key_client is not None:
120+
key_client.close()
121+
if crypto_client is not None:
122+
crypto_client.close()
123+
124+
125+
def hello_certificates_from_an_external_container():
126+
"""
127+
Entry point function for a custom Docker container to test connectivity
128+
and "certificates" functionality with Lowkey Vault.
129+
130+
This function is designed to run inside a separate container within the
131+
same Docker network as a Lowkey Vault instance.
132+
"""
133+
vault_url = os.environ["CONNECTION_URL"]
134+
cert_name = os.environ["CERT_NAME"]
135+
# test certificates API to see that the container works
136+
certificate_client = None
137+
secret_client = None
138+
try:
139+
# ignore SSL errors because we are using a self-signed certificate
140+
transport_certs = RequestsTransport(connection_verify=False)
141+
transport_secrets = RequestsTransport(connection_verify=False)
142+
# create the clients we need
143+
certificate_client = CertificateClient(
144+
vault_url=vault_url,
145+
credential=DefaultAzureCredential(),
146+
verify_challenge_resource=False,
147+
transport=transport_certs,
148+
api_version=API_VERSION,
149+
)
150+
secret_client = SecretClient(
151+
vault_url=vault_url,
152+
credential=DefaultAzureCredential(),
153+
verify_challenge_resource=False,
154+
transport=transport_secrets,
155+
api_version=API_VERSION,
156+
)
157+
158+
subject_name: str = "CN=example.com"
159+
policy: CertificatePolicy = CertificatePolicy(
160+
issuer_name="Self",
161+
subject=subject_name,
162+
key_curve_name="P-256",
163+
key_type="EC",
164+
validity_in_months=12,
165+
content_type="application/x-pkcs12",
166+
)
167+
certificate_client.begin_create_certificate(certificate_name=cert_name, policy=policy).wait()
168+
169+
cert_value = secret_client.get_secret(name=cert_name).value
170+
171+
# decode base64 secret
172+
decoded = base64.b64decode(cert_value)
173+
# open decoded secret as PKCS12 file
174+
pkcs12 = load_key_and_certificates(decoded, b"")
175+
176+
# get the components
177+
ec_key: EllipticCurvePrivateKey = pkcs12[0]
178+
x509_cert: x509.Certificate = pkcs12[1]
179+
180+
# verify the result
181+
assert subject_name == x509_cert.subject.rdns[0].rfc4514_string()
182+
assert "secp256r1" == ec_key.curve.name
183+
184+
print("Lowkey Vault Container created.")
185+
except Exception as e:
186+
print(f"Something went wrong : {e}")
187+
finally:
188+
# close clients
189+
if certificate_client is not None:
190+
certificate_client.close()
191+
if secret_client is not None:
192+
secret_client.close()
193+
194+
195+
if __name__ == "__main__":
196+
mode = os.getenv("TEST")
197+
# ignore cert errors
198+
urllib3.disable_warnings()
199+
if mode == "secrets":
200+
hello_secrets_from_an_external_container()
201+
elif mode == "keys":
202+
hello_keys_from_an_external_container()
203+
elif mode == "certificates":
204+
hello_certificates_from_an_external_container()
205+
else:
206+
print("The TEST env variable must be 'secrets', 'keys', or 'certificates'.")

0 commit comments

Comments
 (0)