diff --git a/.env.keycloak.example b/.env.keycloak.example index f57e29e4..08db8575 100644 --- a/.env.keycloak.example +++ b/.env.keycloak.example @@ -18,4 +18,7 @@ KUKKUU_TICKET_VERIFICATION_URL=http://localhost:3000/ticket-verification-endpoin MAIL_MAILGUN_KEY MAIL_MAILGUN_DOMAIN=hel.fi MAIL_MAILGUN_API=https://api.eu.mailgun.net/v3 -KUKKUU_NOTIFICATIONS_SHEET_ID=1TkdQsO50DHOg5pi1JhzudOL1GKpiK-V2DCIoAipKj-M \ No newline at end of file +KUKKUU_NOTIFICATIONS_SHEET_ID=1TkdQsO50DHOg5pi1JhzudOL1GKpiK-V2DCIoAipKj-M +TOKEN_AUTH_BROWSER_TEST_ENABLED=1 +TOKEN_AUTH_BROWSER_TEST_JWT_256BIT_SIGN_SECRET=your-256-bit-secret +TOKEN_AUTH_BROWSER_TEST_JWT_ISSUER=https://kukkuu-ui.test.hel.ninja,https://kukkuu-admin-ui.test.hel.ninja \ No newline at end of file diff --git a/browser-tests/.env.example b/browser-tests/.env.example index 3b85040d..17615ce0 100644 --- a/browser-tests/.env.example +++ b/browser-tests/.env.example @@ -1,5 +1,5 @@ BROWSER_TESTS_API_URL=https://localhost BROWSER_TESTS_API_ADMIN_USER_NAME=admin BROWSER_TESTS_API_ADMIN_PASSWORD=admin -BROWSER_TESTS_USER_NAME=kukkuu.exampleuser@gmail.com +BROWSER_TESTS_USER_NAME=kukkuu.browsertest-example@kummilapset.hel.fi BROWSER_TESTS_USER_PASSWORD=examplePassword \ No newline at end of file diff --git a/kukkuu/middleware.py b/kukkuu/middleware.py index c2684929..e37f19de 100644 --- a/kukkuu/middleware.py +++ b/kukkuu/middleware.py @@ -1,4 +1,4 @@ -from helusers.oidc import RequestJWTAuthentication +from kukkuu.oidc import BrowserTestAwareJWTAuthentication # copied from https://github.com/City-of-Helsinki/open-city-profile/blob/4f46f9f9f195c4254f79f5dfbd97d03b7fa87a5b/open_city_profile/middleware.py#L6 # noqa @@ -9,7 +9,7 @@ def __init__(self, get_response): def __call__(self, request): if not request.user.is_authenticated: try: - authenticator = RequestJWTAuthentication() + authenticator = BrowserTestAwareJWTAuthentication() user_auth = authenticator.authenticate(request) if user_auth is not None: request.user_auth = user_auth diff --git a/kukkuu/oidc.py b/kukkuu/oidc.py new file mode 100644 index 00000000..4e359360 --- /dev/null +++ b/kukkuu/oidc.py @@ -0,0 +1,145 @@ +import logging +from typing import Optional + +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from helusers.authz import UserAuthorization +from helusers.jwt import JWT, ValidationError +from helusers.oidc import AuthenticationError, RequestJWTAuthentication +from helusers.settings import api_token_auth_settings +from helusers.user_utils import get_or_create_user +from jose import jwt as jose_jwt + +from kukkuu.tests.utils.jwt_utils import is_valid_256_bit_key + +logger = logging.getLogger(__name__) + + +class ApiTokenAuthSettings: + def __init__(self, **entries): + self.__dict__.update(entries) + + +class BrowserTestAwareJWTAuthentication(RequestJWTAuthentication): + def __init__(self): + super().__init__() + combined_settings = { + **api_token_auth_settings._settings, + **settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH, + } + self._api_token_auth_settings = ApiTokenAuthSettings(**combined_settings) + self.algorithms = ["HS256"] + + if self._api_token_auth_settings.ENABLED: + if not self._api_token_auth_settings.ISSUER: + raise ImproperlyConfigured( + "ISSUER must be configured when test JWT auth is enabled." + ) + if not is_valid_256_bit_key(self._api_token_auth_settings.JWT_SIGN_SECRET): + raise ImproperlyConfigured( + "JWT_SIGN_SECRET (JWT secret key) must be 256 bits" + ) + + def _get_auth_header_jwt(self, request): + """Looks for a JWT from the request's "Authorization" header. + + If the header is not found, or it doesn't contain a JWT, returns None. + If the header is found and contains a JWT then returns a JWT. + + Args: + request: the request object + + Returns: + JWT|None: JWT if the Authorization header contains one. Otherwise None. + """ + auth_header = request.headers["Authorization"] + + if not auth_header: + return None + + auth_scheme, jwt_value = auth_header.split() + if auth_scheme.lower() != "bearer": + return None + + return JWT(jwt_value, self._api_token_auth_settings) + + def _validate_symmetrically_signed_jwt(self, jwt: JWT): + """ + Validate a symmetrically signed JWT that is signed by a shared secret. + + NOTE: This function is implemented since the `django_helusers` + does not verify symmetrically signed JWT that are signed by a shared secret. + The `helusers` always uses a issuer specific `OIDCConfig` that fetches the + keys from a server (from a path "/.well-known/openid-configuration"). + """ + logger.debug("Validating a symmetrically signed test JWT", extra=jwt) + try: + jwt.validate_issuer() + except ValidationError as e: + raise AuthenticationError(str(e)) from e + try: + jose_jwt.decode( + token=jwt._encoded_jwt, + key=self._api_token_auth_settings.JWT_SIGN_SECRET, + audience=jwt.claims.get("aud"), + issuer=jwt.claims.get("iss"), + subject=jwt.claims.get("sub"), + algorithms=self.algorithms, + ) + except ValidationError as e: + raise AuthenticationError(str(e)) from e + except Exception: + raise AuthenticationError("JWT verification failed.") + + def has_auth_token_for_testing(self, request) -> Optional[JWT]: + """Checks whether the request contains a JWT which is + issued for end-to-end browser testing use only. + + Args: + request: the request object. + + Returns: + Optional[JWT]: JWT if it is issued for brower test use. Otherwise None. + """ + jwt = self._get_auth_header_jwt(request) + if jwt.claims.get("iss") not in self._api_token_auth_settings.ISSUER: + return None + return jwt + + def authenticate_test_user(self, jwt: JWT): + """Authenticate a user who is sending the browser test request. + + Args: + jwt (JWT): the JWT issued for browser testing use that is + attached into the request. + + Returns: + UserAuthorization: user authorization instance. + """ + logger.info("Authenticating with a test JWT!") + self._validate_symmetrically_signed_jwt(jwt) + logger.debug("The symmetrically signed JWT was valid.", extra=jwt) + user = get_or_create_user(jwt.claims, oidc=True) + logger.debug("The user %s returned from get_or_create_user", user, extra=user) + return UserAuthorization(user, jwt.claims) + + def authenticate(self, request): + """ + Looks for a JWT from the request's "Authorization" header. + If the header is not found, or it doesn't contain a JWT, returns None. + If the header is found and contains a JWT then the JWT gets verified. + + Test whether the JWT is issued for the end-to-end browser test use. + IF the JWT is for test use, then handle it with `authenticate_test_user`, + since the `django_helusers` does not support symmetrically signed JWT. + + If verification passes, takes a user's id from the JWT's "sub" claim. + Creates a User if it doesn't already exist. + On success returns a UserAuthorization object. + Raises an AuthenticationError on authentication failure. + + """ + if self._api_token_auth_settings.ENABLED: + if jwt := self.has_auth_token_for_testing(request): + return self.authenticate_test_user(jwt) + return super().authenticate(request) diff --git a/kukkuu/settings.py b/kukkuu/settings.py index 4f918ebf..4ba09b12 100644 --- a/kukkuu/settings.py +++ b/kukkuu/settings.py @@ -7,6 +7,8 @@ from django.utils.translation import gettext_lazy as _ from sentry_sdk.integrations.django import DjangoIntegration +from kukkuu.tests.utils.jwt_utils import is_valid_256_bit_key + checkout_dir = environ.Path(__file__) - 2 assert os.path.exists(checkout_dir("manage.py")) @@ -44,10 +46,13 @@ SENTRY_ENVIRONMENT=(str, ""), CORS_ORIGIN_WHITELIST=(list, []), CORS_ORIGIN_ALLOW_ALL=(bool, False), - TOKEN_AUTH_ACCEPTED_SCOPE_PREFIX=(str, "kukkuu"), - TOKEN_AUTH_REQUIRE_SCOPE_PREFIX=(bool, True), + TOKEN_AUTH_ACCEPTED_SCOPE_PREFIX=(str, ""), + TOKEN_AUTH_REQUIRE_SCOPE_PREFIX=(bool, False), TOKEN_AUTH_ACCEPTED_AUDIENCE=(list, ["https://api.hel.fi/auth/kukkuu"]), - TOKEN_AUTH_AUTHSERVER_URL=(list, ["https://tunnistamo.test.hel.ninja/openid"]), + TOKEN_AUTH_AUTHSERVER_URL=( + list, + ["https://tunnistus.test.hel.ninja/auth/realms/helsinkitunnistus"], + ), ILMOITIN_QUEUE_NOTIFICATIONS=(bool, True), DEFAULT_FILE_STORAGE=(str, "django.core.files.storage.FileSystemStorage"), GS_BUCKET_NAME=(str, ""), @@ -71,18 +76,13 @@ VERIFICATION_TOKEN_LENGTH=(int, 8), SUBSCRIPTIONS_AUTH_TOKEN_VALID_MINUTES=(int, 30 * 24 * 60), # 30 days SUBSCRIPTIONS_AUTH_TOKEN_LENGTH=(int, 16), - # NOTE: TOKEN_AUTH_ACCEPTED_SCOPE_PREFIX sets a prefix - # for `GDPR_API_QUERY_SCOPE` and `GDPR_API_DELETE_SCOPE`. - GDPR_API_QUERY_SCOPE=(str, "kukkuu.gdprquery"), - GDPR_API_DELETE_SCOPE=(str, "kukkuu.gdprdelete"), - GDPR_API_AUTHORIZATION_FIELD=(str, "https://api.hel.fi/auth"), - # NOTE: For a Keycloak, the following GDPR variables are needed - # TOKEN_AUTH_ACCEPTED_SCOPE_PREFIX=(str, ""), - # TOKEN_AUTH_REQUIRE_SCOPE_PREFIX=(bool, False), - # GDPR_API_QUERY_SCOPE=(str, "gdprquery"), - # GDPR_API_DELETE_SCOPE=(str, "gdprdelete"), - # GDPR_API_AUTHORIZATION_FIELD=(str, "authorization.permissions.scopes"), + GDPR_API_QUERY_SCOPE=(str, "gdprquery"), + GDPR_API_DELETE_SCOPE=(str, "gdprdelete"), + GDPR_API_AUTHORIZATION_FIELD=(str, "authorization.permissions.scopes"), HELUSERS_BACK_CHANNEL_LOGOUT_ENABLED=(bool, False), + TOKEN_AUTH_BROWSER_TEST_JWT_256BIT_SIGN_SECRET=(str, None), + TOKEN_AUTH_BROWSER_TEST_JWT_ISSUER=(list, None), + TOKEN_AUTH_BROWSER_TEST_ENABLED=(bool, False), ) if os.path.exists(env_file): @@ -276,6 +276,30 @@ OIDC_AUTH = {"OIDC_LEEWAY": 60 * 60} +OIDC_BROWSER_TEST_API_TOKEN_AUTH = { + "ENABLED": env.bool("TOKEN_AUTH_BROWSER_TEST_ENABLED"), + "JWT_SIGN_SECRET": env.str("TOKEN_AUTH_BROWSER_TEST_JWT_256BIT_SIGN_SECRET"), + "ISSUER": env.list("TOKEN_AUTH_BROWSER_TEST_JWT_ISSUER"), + "AUDIENCE": env.list("TOKEN_AUTH_ACCEPTED_AUDIENCE"), + "API_SCOPE_PREFIX": env.str("TOKEN_AUTH_ACCEPTED_SCOPE_PREFIX"), + "REQUIRE_API_SCOPE_FOR_AUTHENTICATION": env.bool("TOKEN_AUTH_REQUIRE_SCOPE_PREFIX"), + "API_AUTHORIZATION_FIELD": env.str("GDPR_API_AUTHORIZATION_FIELD"), +} + +# Ensure that the browser test JWT authentication is configured properly. +if OIDC_BROWSER_TEST_API_TOKEN_AUTH["ENABLED"]: + if not OIDC_BROWSER_TEST_API_TOKEN_AUTH["ISSUER"]: + raise ImproperlyConfigured( + "API token authentication is enabled, but no issuer is configured. " + "Set OIDC_BROWSER_TEST_API_TOKEN_AUTH['ISSUER']." + ) + if not is_valid_256_bit_key(OIDC_BROWSER_TEST_API_TOKEN_AUTH["JWT_SIGN_SECRET"]): + raise ImproperlyConfigured( + "JWT secret key for BrowserTestAwareJWTAuthentication must be 256-bits. " + "Set OIDC_BROWSER_TEST_API_TOKEN_AUTH['JWT_SIGN_SECRET']." + ) + + SITE_ID = 1 PARLER_LANGUAGES = {SITE_ID: ({"code": "fi"}, {"code": "sv"}, {"code": "en"})} diff --git a/kukkuu/tests/conftest.py b/kukkuu/tests/conftest.py index 9a3cb867..e55e4130 100644 --- a/kukkuu/tests/conftest.py +++ b/kukkuu/tests/conftest.py @@ -1,8 +1,40 @@ +import secrets + import pytest from kukkuu.service import get_hashid_service +from kukkuu.tests.utils.jwt_utils import generate_symmetric_test_jwt +from users.factories import UserFactory @pytest.fixture def hashids(): return get_hashid_service() + + +@pytest.fixture(autouse=True) +def oidc_browser_test_api_token_auth_settings(settings): + settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH = { + "ENABLED": True, + "AUDIENCE": ["kukkuu-api-dev", "profile-api-test", "kukkuu-admin-ui-test"], + "API_SCOPE_PREFIX": "", + "REQUIRE_API_SCOPE_FOR_AUTHENTICATION": False, + "API_AUTHORIZATION_FIELD": "authorization.permissions.scopes", + "ISSUER": "https://kukkuu-ui.test.hel.ninja", + "JWT_SIGN_SECRET": secrets.token_bytes(32).hex(), + } + + +@pytest.fixture +def get_browser_test_bearer_token_for_user(oidc_browser_test_api_token_auth_settings): + """Returns a test JWT token generator function. + + The generator function returns a signed bearer token to authenticate through + the authentcation made for browser testing.""" + + default_user = UserFactory.build() + + def generate_test_jwt_token(user=default_user): + return generate_symmetric_test_jwt(user) + + return generate_test_jwt_token diff --git a/kukkuu/tests/test_authentication.py b/kukkuu/tests/test_authentication.py index 263a34ba..c0eda316 100644 --- a/kukkuu/tests/test_authentication.py +++ b/kukkuu/tests/test_authentication.py @@ -1,17 +1,26 @@ from contextlib import contextmanager -from unittest import mock +from datetime import timedelta +from unittest.mock import Mock, patch +import pytest import requests from django.contrib.auth.models import AnonymousUser +from django.core.exceptions import ImproperlyConfigured +from django.http import HttpRequest +from django.utils import timezone +from freezegun import freeze_time from helusers.authz import UserAuthorization +from helusers.jwt import JWT from helusers.oidc import AuthenticationError -from jose import ExpiredSignatureError +from jose import ExpiredSignatureError, JWTError from common.tests.utils import assert_match_error_code, assert_permission_denied from kukkuu.consts import AUTHENTICATION_ERROR, AUTHENTICATION_EXPIRED_ERROR +from kukkuu.oidc import BrowserTestAwareJWTAuthentication +from kukkuu.tests.utils.jwt_utils import TEST_JWT_EXP_TIME_IN_SECONDS from users.factories import GuardianFactory -HELUSERS_AUTHENTICATE = "helusers.oidc.RequestJWTAuthentication.authenticate" +KUKKUU_AUTHENTICATE = "kukkuu.oidc.BrowserTestAwareJWTAuthentication.authenticate" SENTRY_CAPTURE_EXCEPTION = "sentry_sdk.capture_exception" MY_PROFILE_QUERY = """ @@ -23,22 +32,36 @@ """ +@pytest.fixture +def request_factory(get_browser_test_bearer_token_for_user): + def _request_factory( + auth_header=get_browser_test_bearer_token_for_user(), + ): + request = HttpRequest() + request.headers = {"Authorization": auth_header} + return request + + return _request_factory + + @contextmanager def set_authenticated_user(user): - with mock.patch( - HELUSERS_AUTHENTICATE, + with patch( + KUKKUU_AUTHENTICATE, return_value=UserAuthorization(user=user, api_token_payload={}), ): yield -def graphql_request(live_server, query=MY_PROFILE_QUERY): - return requests.post(live_server.url + "/graphql", json={"query": query}) +def graphql_request(live_server, query=MY_PROFILE_QUERY, headers=None): + return requests.post( + live_server.url + "/graphql", json={"query": query}, headers=headers + ) def test_authentication_unauthenticated(live_server): with set_authenticated_user(AnonymousUser()): - with mock.patch(SENTRY_CAPTURE_EXCEPTION) as sentry: + with patch(SENTRY_CAPTURE_EXCEPTION) as sentry: response = graphql_request(live_server) assert_permission_denied(response.json()) @@ -58,9 +81,9 @@ def test_authentication_authenticated(live_server): def test_authentication_error(live_server): - with mock.patch(SENTRY_CAPTURE_EXCEPTION) as sentry: - with mock.patch( - HELUSERS_AUTHENTICATE, + with patch(SENTRY_CAPTURE_EXCEPTION) as sentry: + with patch( + KUKKUU_AUTHENTICATE, side_effect=AuthenticationError("JWT verification failed."), ): response = graphql_request(live_server) @@ -75,11 +98,122 @@ def expired_token_authenticate(*args): except Exception: raise AuthenticationError("JWT verification failed.") - with mock.patch(SENTRY_CAPTURE_EXCEPTION) as sentry: - with mock.patch( - HELUSERS_AUTHENTICATE, + with patch(SENTRY_CAPTURE_EXCEPTION) as sentry: + with patch( + KUKKUU_AUTHENTICATE, side_effect=expired_token_authenticate, ): response = graphql_request(live_server) assert_match_error_code(response.json(), AUTHENTICATION_EXPIRED_ERROR) sentry.assert_not_called() + + +def test_browser_test_authentication_using_live_server( + live_server, get_browser_test_bearer_token_for_user +): + """The test JWT should be valid for authentication.""" + guardian = GuardianFactory(email="gustavo.guardian@example.com") + response = graphql_request( + live_server, + headers={ + "authorization": get_browser_test_bearer_token_for_user(guardian.user) + }, + ) + assert ( + response.json()["data"]["myProfile"]["email"] == "gustavo.guardian@example.com" + ) + + +@patch("kukkuu.oidc.get_or_create_user") +@patch( + "kukkuu.oidc.BrowserTestAwareJWTAuthentication._validate_symmetrically_signed_jwt" +) +@pytest.mark.django_db() +def test_authenticate_test_user(mock_validate_jwt, mock_get_or_create_user): + """The test JWT validation should be called when a test token is used.""" + jwt_claims = { + "iss": "test_issuer", + "sub": "test_user_id", + } + jwt = Mock(spec=JWT, claims=jwt_claims) + mock_get_or_create_user.return_value = "test_user" + auth = BrowserTestAwareJWTAuthentication() + result = auth.authenticate_test_user(jwt) + assert isinstance(result, UserAuthorization) + assert result.user == "test_user" + mock_validate_jwt.assert_called_once_with(jwt) + + +def test_get_auth_header_jwt_valid(request_factory): + request = request_factory() + auth = BrowserTestAwareJWTAuthentication() + jwt = auth._get_auth_header_jwt(request) + assert isinstance(jwt, JWT) + + +def test_get_auth_header_jwt_invalid_scheme(request_factory): + request = request_factory("Basic invalid") + auth = BrowserTestAwareJWTAuthentication() + jwt = auth._get_auth_header_jwt(request) + assert jwt is None + + +def test_get_auth_header_jwt_invalid_bearer(request_factory): + request = request_factory("bearer invalid.jwt.structure") + auth = BrowserTestAwareJWTAuthentication() + with pytest.raises(JWTError): + auth._get_auth_header_jwt(request) + + +def test_get_auth_header_jwt_no_header(request_factory): + request = request_factory(None) + auth = BrowserTestAwareJWTAuthentication() + jwt = auth._get_auth_header_jwt(request) + assert jwt is None + + +def test_browser_test_auth_enabled_without_issuer_should_raise(settings): + """Issuer is a mandatory config when test auth is enabled.""" + settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH["ENABLED"] = True + settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH["ISSUER"] = None + with pytest.raises(ImproperlyConfigured): + BrowserTestAwareJWTAuthentication() + + +@patch("helusers.oidc.RequestJWTAuthentication.authenticate") +def test_browser_test_auth_disabled_should_always_call_helusers_authenticate( + mock_helusers_authenticate, + settings, + request_factory, +): + """When browser test authentication is disabled, + the helusers authentication is used for test JWTs. + """ + settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH["ENABLED"] = False + request = request_factory() + auth = BrowserTestAwareJWTAuthentication() + auth.authenticate(request) + mock_helusers_authenticate.assert_called_once_with(request) + + +@pytest.mark.django_db() +def test_browser_test_auth_with_expired_token(request_factory): + """Advance time after issuing a JWT so that the AuthenticationError + is thrown for using an expired token + """ + datetime_for_expired_token = timezone.now() + timedelta( + TEST_JWT_EXP_TIME_IN_SECONDS + 1 + ) + # advance time so that the JWT expires + with freeze_time(datetime_for_expired_token.isoformat()): + request = request_factory() + with pytest.raises(AuthenticationError): + auth = BrowserTestAwareJWTAuthentication() + auth.authenticate(request) + + +@pytest.mark.django_db() +def test_browser_test_auth_with_valid_token(request_factory): + request = request_factory() + auth = BrowserTestAwareJWTAuthentication() + assert auth.authenticate(request) is not None diff --git a/kukkuu/tests/test_utils.py b/kukkuu/tests/test_utils.py index 5553cf7e..ee4b9b39 100644 --- a/kukkuu/tests/test_utils.py +++ b/kukkuu/tests/test_utils.py @@ -1,6 +1,54 @@ import random +import pytest + +from kukkuu.tests.utils.jwt_utils import is_valid_256_bit_key + def test_hashids(hashids): for x in random.sample(range(1, 99999), 10): assert hashids.decode(hashids.encode(x))[0] == x + + +@pytest.mark.parametrize( + "key_string, expected_result", + [ + ( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + True, + ), # Valid key + ( + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + True, + ), # All Fs + ( + "0000000000000000000000000000000000000000000000000000000000000000", + True, + ), # All 0s + ( + "abcdef0123456789abcdef0123456789abcdef0123456789abcdef01234567", + False, + ), # Too short + ( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0", + False, + ), # Too long + ( + "0123456789abcdef0123456789abcdef0123456789abcdeF0123456789abcdef", + True, + ), # Upper and lowercase + ( + "0123456789abcdefg0123456789abcdef0123456789abcdef0123456789abcde", + False, + ), # Invalid char 'g' + ("", False), # Empty string + (" ", False), # Whitespace + (None, False), # None + (12345, False), # Integer + ("dda26ea70e53b156594d97b97c1e50c4e0e3687bec29f3463e86764b258dd5b6", True), + ], +) +def test_is_valid_256_bit_key(key_string, expected_result): + """Test the is_valid_256_bit_key function with various inputs.""" + result = is_valid_256_bit_key(key_string) + assert result == expected_result diff --git a/kukkuu/tests/utils/__init__.py b/kukkuu/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kukkuu/tests/utils/jwt_utils.py b/kukkuu/tests/utils/jwt_utils.py new file mode 100644 index 00000000..e8c8b7ae --- /dev/null +++ b/kukkuu/tests/utils/jwt_utils.py @@ -0,0 +1,71 @@ +import time +import uuid +from typing import Optional, TYPE_CHECKING + +from django.conf import settings +from jose import jwt + +if TYPE_CHECKING: + from users.models import User as UserType + + +TEST_JWT_EXP_TIME_IN_SECONDS = 60 + + +def get_epoch_timeframe_for_test_jwt(): + """Get test JWT valid timeframe as epoch times + + Returns: + tuple[int, int]: issued at (epoch), expiration (epoch) + """ + epoch_time = int(time.time()) + return epoch_time, epoch_time + TEST_JWT_EXP_TIME_IN_SECONDS + + +def generate_symmetric_test_jwt( + user: "UserType", + shared_secret_for_signature: Optional[str] = None, + issuer="https://kukkuu-ui.test.hel.ninja", + prefix="bearer", +): + headers = { + "alg": "HS256", + "typ": "JWT", + } + epoch_time, exp_epoch = get_epoch_timeframe_for_test_jwt() + payload = { + "iat": epoch_time, + "auth_time": epoch_time, + "exp": exp_epoch, + "jti": str(uuid.uuid4()), + "iss": issuer, + "aud": "kukkuu-api-test", + "sub": str(user.uuid), + "typ": "Bearer", + "authorization": {"permissions": [{"scopes": ["access"]}]}, + "scope": "profile email", + "email_verified": False, + "amr": ["helsinki_tunnus"], + "name": f"{user.first_name} {user.last_name}", + "preferred_username": user.username, + "given_name": user.first_name, + "family_name": user.last_name, + "email": user.email, + "loa": "low", + } + token = jwt.encode( + claims=payload, + key=shared_secret_for_signature + or settings.OIDC_BROWSER_TEST_API_TOKEN_AUTH["JWT_SIGN_SECRET"], + headers=headers, + ) + return f"{prefix} {token}" + + +def is_valid_256_bit_key(key): + """Checks if the provided key is a 256-bit hexadecimal string.""" + return ( + isinstance(key, str) + and len(key) == 64 # 256-bit key is 64 hexadecimal digits + and all(c in "0123456789abcdefABCDEF" for c in key) # Is hexadecimal + )