Skip to content

Commit 7054528

Browse files
author
m00s3c0d3
committed
continue building NanoTDF support
1 parent 67e1468 commit 7054528

10 files changed

Lines changed: 1620 additions & 178 deletions

File tree

src/otdf_python/ecc_mode.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class ECCMode:
66
"secp256r1": 0,
77
"secp384r1": 1,
88
"secp521r1": 2,
9+
"secp256k1": 3,
910
}
1011

1112
def __init__(self, curve_mode: int = 0, use_ecdsa_binding: bool = False):
@@ -24,15 +25,25 @@ def set_elliptic_curve(self, curve_mode: int):
2425
def get_elliptic_curve_type(self) -> int:
2526
return self.curve_mode
2627

28+
def get_curve_name(self) -> str:
29+
"""Get the curve name as a string (e.g., 'secp256r1')."""
30+
for name, mode in self._CURVE_MAP.items():
31+
if mode == self.curve_mode:
32+
return name
33+
# Default to secp256r1 if not found
34+
return "secp256r1"
35+
2736
@staticmethod
2837
def get_ec_compressed_pubkey_size(curve_type: int) -> int:
29-
# 0: secp256r1, 1: secp384r1, 2: secp521r1
38+
# 0: secp256r1, 1: secp384r1, 2: secp521r1, 3: secp256k1
3039
if curve_type == 0:
31-
return 33
40+
return 33 # secp256r1
3241
elif curve_type == 1:
33-
return 49
42+
return 49 # secp384r1
3443
elif curve_type == 2:
35-
return 67
44+
return 67 # secp521r1
45+
elif curve_type == 3:
46+
return 33 # secp256k1 (same size as secp256r1)
3647
else:
3748
raise ValueError("Unsupported ECC algorithm.")
3849

src/otdf_python/ecdh.py

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
"""
2+
ECDH (Elliptic Curve Diffie-Hellman) key exchange for NanoTDF.
3+
4+
This module implements the ECDH key exchange protocol with HKDF key derivation
5+
as specified in the NanoTDF spec. It supports the following curves:
6+
- secp256r1 (NIST P-256)
7+
- secp384r1 (NIST P-384)
8+
- secp521r1 (NIST P-521)
9+
- secp256k1 (Bitcoin curve)
10+
11+
The protocol follows ECIES methodology similar to S/MIME and GPG:
12+
1. Generate ephemeral keypair
13+
2. Perform ECDH with recipient's public key to get shared secret
14+
3. Use HKDF to derive symmetric encryption key from shared secret
15+
"""
16+
17+
from cryptography.hazmat.backends import default_backend
18+
from cryptography.hazmat.primitives import hashes, serialization
19+
from cryptography.hazmat.primitives.asymmetric import ec
20+
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
21+
from cryptography.hazmat.primitives.serialization import (
22+
Encoding,
23+
PrivateFormat,
24+
PublicFormat,
25+
)
26+
27+
# Mapping from curve names to cryptography curve objects
28+
CURVE_MAP = {
29+
"secp256r1": ec.SECP256R1(),
30+
"secp384r1": ec.SECP384R1(),
31+
"secp521r1": ec.SECP521R1(),
32+
"secp256k1": ec.SECP256K1(),
33+
}
34+
35+
# Compressed public key sizes for each curve
36+
COMPRESSED_KEY_SIZES = {
37+
"secp256r1": 33, # 1 byte prefix + 32 bytes
38+
"secp384r1": 49, # 1 byte prefix + 48 bytes
39+
"secp521r1": 67, # 1 byte prefix + 66 bytes
40+
"secp256k1": 33, # 1 byte prefix + 32 bytes
41+
}
42+
43+
# HKDF salt for NanoTDF key derivation
44+
# Per spec: "salt value derived from magic number/version"
45+
# This is the SHA-256 hash of the NanoTDF magic number and version
46+
NANOTDF_HKDF_SALT = bytes.fromhex("3de3ca1e50cf62d8b6aba603a96fca6761387a7ac86c3d3afe85ae2d1812edfc")
47+
48+
49+
class ECDHError(Exception):
50+
"""Base exception for ECDH operations."""
51+
52+
pass
53+
54+
55+
class UnsupportedCurveError(ECDHError):
56+
"""Raised when an unsupported curve is specified."""
57+
58+
pass
59+
60+
61+
class InvalidKeyError(ECDHError):
62+
"""Raised when a key is invalid or malformed."""
63+
64+
pass
65+
66+
67+
def get_curve(curve_name: str) -> ec.EllipticCurve:
68+
"""
69+
Get the cryptography curve object for a given curve name.
70+
71+
Args:
72+
curve_name: Name of the curve (e.g., "secp256r1")
73+
74+
Returns:
75+
ec.EllipticCurve: The curve object
76+
77+
Raises:
78+
UnsupportedCurveError: If the curve is not supported
79+
"""
80+
curve_name_lower = curve_name.lower()
81+
if curve_name_lower not in CURVE_MAP:
82+
raise UnsupportedCurveError(
83+
f"Unsupported curve: {curve_name}. "
84+
f"Supported curves: {', '.join(CURVE_MAP.keys())}"
85+
)
86+
return CURVE_MAP[curve_name_lower]
87+
88+
89+
def get_compressed_key_size(curve_name: str) -> int:
90+
"""
91+
Get the size of a compressed public key for a given curve.
92+
93+
Args:
94+
curve_name: Name of the curve (e.g., "secp256r1")
95+
96+
Returns:
97+
int: Size in bytes of the compressed public key
98+
99+
Raises:
100+
UnsupportedCurveError: If the curve is not supported
101+
"""
102+
curve_name_lower = curve_name.lower()
103+
if curve_name_lower not in COMPRESSED_KEY_SIZES:
104+
raise UnsupportedCurveError(f"Unsupported curve: {curve_name}")
105+
return COMPRESSED_KEY_SIZES[curve_name_lower]
106+
107+
108+
def generate_ephemeral_keypair(
109+
curve_name: str,
110+
) -> tuple[ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey]:
111+
"""
112+
Generate an ephemeral keypair for ECDH.
113+
114+
Args:
115+
curve_name: Name of the curve (e.g., "secp256r1")
116+
117+
Returns:
118+
tuple: (private_key, public_key)
119+
120+
Raises:
121+
UnsupportedCurveError: If the curve is not supported
122+
"""
123+
curve = get_curve(curve_name)
124+
private_key = ec.generate_private_key(curve, default_backend())
125+
public_key = private_key.public_key()
126+
return private_key, public_key
127+
128+
129+
def compress_public_key(public_key: ec.EllipticCurvePublicKey) -> bytes:
130+
"""
131+
Compress an EC public key to compressed point format.
132+
133+
Args:
134+
public_key: The EC public key to compress
135+
136+
Returns:
137+
bytes: Compressed public key (33-67 bytes depending on curve)
138+
"""
139+
return public_key.public_bytes(
140+
encoding=Encoding.X962, format=PublicFormat.CompressedPoint
141+
)
142+
143+
144+
def decompress_public_key(
145+
compressed_key: bytes, curve_name: str
146+
) -> ec.EllipticCurvePublicKey:
147+
"""
148+
Decompress a public key from compressed point format.
149+
150+
Args:
151+
compressed_key: The compressed public key bytes
152+
curve_name: Name of the curve (e.g., "secp256r1")
153+
154+
Returns:
155+
ec.EllipticCurvePublicKey: The decompressed public key
156+
157+
Raises:
158+
InvalidKeyError: If the key cannot be decompressed
159+
UnsupportedCurveError: If the curve is not supported
160+
"""
161+
try:
162+
curve = get_curve(curve_name)
163+
# Verify the size matches expected compressed size
164+
expected_size = get_compressed_key_size(curve_name)
165+
if len(compressed_key) != expected_size:
166+
raise InvalidKeyError(
167+
f"Invalid compressed key size for {curve_name}: "
168+
f"expected {expected_size} bytes, got {len(compressed_key)} bytes"
169+
)
170+
171+
return ec.EllipticCurvePublicKey.from_encoded_point(curve, compressed_key)
172+
except (ValueError, TypeError) as e:
173+
raise InvalidKeyError(f"Failed to decompress public key: {e}")
174+
175+
176+
def derive_shared_secret(
177+
private_key: ec.EllipticCurvePrivateKey, public_key: ec.EllipticCurvePublicKey
178+
) -> bytes:
179+
"""
180+
Derive a shared secret using ECDH.
181+
182+
Args:
183+
private_key: The private key (can be ephemeral or recipient's key)
184+
public_key: The public key (recipient's or ephemeral key)
185+
186+
Returns:
187+
bytes: The raw shared secret (x-coordinate of the ECDH point)
188+
189+
Raises:
190+
ECDHError: If ECDH fails
191+
"""
192+
try:
193+
shared_secret = private_key.exchange(ec.ECDH(), public_key)
194+
return shared_secret
195+
except Exception as e:
196+
raise ECDHError(f"Failed to derive shared secret: {e}")
197+
198+
199+
def derive_key_from_shared_secret(
200+
shared_secret: bytes, key_length: int = 32, salt: bytes | None = None, info: bytes = b""
201+
) -> bytes:
202+
"""
203+
Derive a symmetric encryption key from the ECDH shared secret using HKDF.
204+
205+
Args:
206+
shared_secret: The raw ECDH shared secret
207+
key_length: Length of the derived key in bytes (default: 32 for AES-256)
208+
salt: Optional salt for HKDF (default: NANOTDF_HKDF_SALT)
209+
info: Optional context/application-specific info (default: empty)
210+
211+
Returns:
212+
bytes: Derived symmetric encryption key
213+
214+
Raises:
215+
ECDHError: If key derivation fails
216+
"""
217+
if salt is None:
218+
salt = NANOTDF_HKDF_SALT
219+
220+
try:
221+
hkdf = HKDF(
222+
algorithm=hashes.SHA256(),
223+
length=key_length,
224+
salt=salt,
225+
info=info,
226+
backend=default_backend(),
227+
)
228+
return hkdf.derive(shared_secret)
229+
except Exception as e:
230+
raise ECDHError(f"Failed to derive key from shared secret: {e}")
231+
232+
233+
def encrypt_key_with_ecdh(
234+
recipient_public_key_pem: str, curve_name: str = "secp256r1"
235+
) -> tuple[bytes, bytes]:
236+
"""
237+
High-level function: Generate ephemeral keypair and derive encryption key.
238+
239+
This is used during NanoTDF encryption to derive the key that will be used
240+
to encrypt the payload. The ephemeral public key must be stored in the
241+
NanoTDF header so the recipient can derive the same key.
242+
243+
Args:
244+
recipient_public_key_pem: Recipient's public key in PEM format (e.g., KAS public key)
245+
curve_name: Name of the curve to use (default: "secp256r1")
246+
247+
Returns:
248+
tuple: (derived_key, compressed_ephemeral_public_key)
249+
- derived_key: 32-byte AES-256 key for encrypting the payload
250+
- compressed_ephemeral_public_key: Ephemeral public key to store in header
251+
252+
Raises:
253+
ECDHError: If key derivation fails
254+
InvalidKeyError: If recipient's public key is invalid
255+
UnsupportedCurveError: If the curve is not supported
256+
"""
257+
# Load recipient's public key
258+
try:
259+
recipient_public_key = serialization.load_pem_public_key(
260+
recipient_public_key_pem.encode(), backend=default_backend()
261+
)
262+
if not isinstance(recipient_public_key, ec.EllipticCurvePublicKey):
263+
raise InvalidKeyError("Recipient's public key is not an EC key")
264+
except Exception as e:
265+
raise InvalidKeyError(f"Failed to load recipient's public key: {e}")
266+
267+
# Generate ephemeral keypair
268+
ephemeral_private_key, ephemeral_public_key = generate_ephemeral_keypair(
269+
curve_name
270+
)
271+
272+
# Derive shared secret
273+
shared_secret = derive_shared_secret(ephemeral_private_key, recipient_public_key)
274+
275+
# Derive encryption key from shared secret
276+
derived_key = derive_key_from_shared_secret(shared_secret, key_length=32)
277+
278+
# Compress ephemeral public key for storage in header
279+
compressed_ephemeral_key = compress_public_key(ephemeral_public_key)
280+
281+
return derived_key, compressed_ephemeral_key
282+
283+
284+
def decrypt_key_with_ecdh(
285+
recipient_private_key_pem: str,
286+
compressed_ephemeral_public_key: bytes,
287+
curve_name: str = "secp256r1",
288+
) -> bytes:
289+
"""
290+
High-level function: Derive decryption key from ephemeral public key and recipient's private key.
291+
292+
This is used during NanoTDF decryption to derive the same key that was used
293+
to encrypt the payload. The ephemeral public key is extracted from the
294+
NanoTDF header.
295+
296+
Args:
297+
recipient_private_key_pem: Recipient's private key in PEM format (e.g., KAS private key)
298+
compressed_ephemeral_public_key: Ephemeral public key from NanoTDF header
299+
curve_name: Name of the curve (default: "secp256r1")
300+
301+
Returns:
302+
bytes: 32-byte AES-256 key for decrypting the payload
303+
304+
Raises:
305+
ECDHError: If key derivation fails
306+
InvalidKeyError: If keys are invalid
307+
UnsupportedCurveError: If the curve is not supported
308+
"""
309+
# Load recipient's private key
310+
try:
311+
recipient_private_key = serialization.load_pem_private_key(
312+
recipient_private_key_pem.encode(), password=None, backend=default_backend()
313+
)
314+
if not isinstance(recipient_private_key, ec.EllipticCurvePrivateKey):
315+
raise InvalidKeyError("Recipient's private key is not an EC key")
316+
except Exception as e:
317+
raise InvalidKeyError(f"Failed to load recipient's private key: {e}")
318+
319+
# Decompress ephemeral public key
320+
ephemeral_public_key = decompress_public_key(
321+
compressed_ephemeral_public_key, curve_name
322+
)
323+
324+
# Derive shared secret
325+
shared_secret = derive_shared_secret(recipient_private_key, ephemeral_public_key)
326+
327+
# Derive decryption key from shared secret
328+
derived_key = derive_key_from_shared_secret(shared_secret, key_length=32)
329+
330+
return derived_key

0 commit comments

Comments
 (0)