+
Successfully logged into StackStorm using SSO!
+
Please check your terminal
+
You may now close this page
+
+
+ """,
+ "utf-8",
+ )
+ )
+
+ def log_message(self, format, *args):
+ LOG.debug("%s " + format, "SSO Proxy: ", *args)
+ return
+
+ return SSOProxyServer
diff --git a/st2client/tests/unit/test_auth.py b/st2client/tests/unit/test_auth.py
index e59b31dfaf..bb1d951214 100644
--- a/st2client/tests/unit/test_auth.py
+++ b/st2client/tests/unit/test_auth.py
@@ -15,6 +15,8 @@
from __future__ import absolute_import
import os
+import re
+from time import sleep, time
import uuid
import json
import mock
@@ -22,6 +24,8 @@
import requests
import argparse
import logging
+from threading import Thread
+from datetime import datetime, timedelta
import six
@@ -29,12 +33,16 @@
from st2client import shell
from st2client.models.core import add_auth_token_to_kwargs_from_env
from st2client.commands.resource import add_auth_token_to_kwargs_from_cli
+from st2client.utils.crypto import (
+ AESKey,
+ read_crypto_key_from_dict,
+ symmetric_encrypt,
+)
from st2client.utils.httpclient import (
add_auth_token_to_headers,
add_json_content_type_to_headers,
)
-
LOG = logging.getLogger(__name__)
if six.PY3:
@@ -165,6 +173,159 @@ def runTest(self):
)
+class TestLoginSSO(TestLoginBase):
+
+ ORIGINAL_POST_FN = requests.post
+
+ CONFIG_FILE_NAME = "logintest.cfg"
+
+ LOGIN_REQUEST_MOCK_KEY = read_crypto_key_from_dict(
+ {
+ "hmacKey": {
+ "hmacKeyString": "-qdRklvhm4xvzIfaL6Z2nmQ-2N-c4IUtNa1_BowCVfg",
+ "size": 256,
+ },
+ "aesKeyString": "0UyXFjBTQ9PMyHZ0mqrvuqCSzesuFup1d6m-4Vi3vdo",
+ "mode": "CBC",
+ "size": 256,
+ }
+ )
+
+ TOKEN = {
+ "user": "stanley",
+ "token": "44583f15945b4095afbf57058535ca64",
+ "expiry": "2017-02-12T00:53:09.632783Z",
+ "id": "589e607532ed3535707f10eb",
+ "metadata": {},
+ }
+
+ ENCRYPTED_TOKEN = symmetric_encrypt(
+ LOGIN_REQUEST_MOCK_KEY, json.dumps(TOKEN)
+ ).decode("utf-8")
+
+ LOGIN_REQUEST_RESPONSE = {
+ # This is just a placeholder name, it's all mocked :)
+ "sso_url": "http://keycloak/realms/StackStorm/protocol/saml?SAMLRequest=fZFRS8MwFIX%2FSsl7TJPV1Ya1MB3iYOJYqw%2B%2BSJpFF2yTmXsr%2Bu%2FNplNU2OM53HNzvtyJg1ROB9y4lXkZDGDy1ncOZLRLMgQnvQIbpeoNSNSynl4vpDhJ5TZ49Np35DvAjwcUgAlovSPJfFYSu34QOs9yk2f0LB8rmqm2oAUfjempUCLX3Ohx25LkzgSIqZLEJTEKMJi5A1QOo5UKQdOC8qLhuRRCZuKeJLOIYZ3CfWqDuJWMdV6rbuMB5SjlnAWjuh5YjUo%2F1%2BhDzw48DFQfoZZf8ty6tXVPx9HazyGQV02zpMubuiHJ9IB74R0MvQm1Ca9Wm9vV4n8ppuIFGIBn0ejaWIpUk%2Fijco8bksvYUOHxEjvHrunjflQahxbfSfX3pQn7WVvtxO%2FrVx8%3D&RelayState=%7B%22referer%22%3A+%22http%3A%2F%2Flocalhost%3A34000%2Fcallback%22%7D",
+ "expiry": (datetime.now() + timedelta(hours=3)).strftime(
+ "%Y-%m-%dT%H:%M:%S.%f"
+ )[:-3]
+ + "000+00:00",
+ }
+
+ @mock.patch.object(AESKey, "generate", return_value=LOGIN_REQUEST_MOCK_KEY)
+ @mock.patch(
+ "requests.post",
+ return_value=base.FakeResponse(json.dumps(LOGIN_REQUEST_RESPONSE), 200, "OK"),
+ )
+ def runTest(self, mock_aeskey_generate, mock_post):
+ """Test 'st2 login --sso' functionality"""
+
+ expected_username = self.TOKEN["user"]
+ args = [
+ "--config",
+ self.CONFIG_FILE,
+ "login",
+ "--sso",
+ "--no-sso-browser",
+ "--sso-port",
+ "34000",
+ ]
+
+ def handle_sso_flow():
+ # Waiting for SSO link on the CLI
+ LOG.debug("Waiting for SSO link")
+ match = None
+ timeout_at = time() + 5
+ while not match and timeout_at > time():
+ sleep(1)
+ self.stdout.seek(0)
+ buffer = self.stdout.read()
+ LOG.debug("STDOUT buffer has: %s", buffer)
+ match = re.search(r"http://localhost:34000/\S+", buffer, re.MULTILINE)
+ self.assertIsNotNone(match)
+
+ # Hitting the localhost login url
+ login_url = match[0]
+ LOG.debug("GETting SSO login to %s", login_url)
+ response = requests.get(login_url, allow_redirects=False)
+ self.assertEquals(response.status_code, 307)
+ self.assertEquals(
+ response.headers["Location"], self.LOGIN_REQUEST_RESPONSE["sso_url"]
+ )
+
+ # Ignoring IDP flow and just hittin callback with proper response :)
+ LOG.debug("Calling back to local server")
+ response = requests.get(
+ "http://localhost:34000/callback",
+ params={"response": self.ENCRYPTED_TOKEN},
+ allow_redirects=False,
+ )
+ self.assertEquals(response.status_code, 302)
+ self.assertEquals(response.headers["Location"], "/success")
+ LOG.debug("Finished SSO flow")
+
+ def run_shell():
+ self.shell.run(args)
+
+ shellThread = Thread(target=run_shell)
+ shellThread.start()
+
+ handle_sso_flow()
+
+ shellThread.join()
+
+ with open(self.CONFIG_FILE, "r") as config_file:
+ for line in config_file.readlines():
+ print(line)
+ # Make sure certain values are not present
+ self.assertNotIn("password", line)
+ self.assertNotIn("olduser", line)
+
+ # Make sure configured username is what we expect
+ if "username" in line:
+ self.assertEqual(line.split(" ")[2][:-1], expected_username)
+
+ # validate token was created
+ self.assertTrue(
+ os.path.isfile("%stoken-%s" % (self.DOTST2_PATH, expected_username))
+ )
+
+
+class TestLoginWithMissingUsername(TestLoginBase):
+
+ CONFIG_FILE_NAME = "logintest.cfg"
+
+ TOKEN = {
+ "user": "st2admin",
+ "token": "44583f15945b4095afbf57058535ca64",
+ "expiry": "2017-02-12T00:53:09.632783Z",
+ "id": "589e607532ed3535707f10eb",
+ "metadata": {},
+ }
+
+ @mock.patch.object(
+ requests,
+ "post",
+ mock.MagicMock(return_value=base.FakeResponse(json.dumps(TOKEN), 200, "OK")),
+ )
+ def runTest(self):
+ """Test 'st2 login' functionality missing the username and should fail"""
+
+ expected_username = self.TOKEN["user"] # noqa
+ args = [
+ "--config",
+ self.CONFIG_FILE,
+ "login",
+ "--password",
+ "Password1!",
+ ]
+
+ self.shell.run(args)
+ self.assertIn(
+ "Username expected when not using SSO login", self.stdout.getvalue()
+ )
+
+
class TestLoginIntPwdAndConfig(TestLoginBase):
CONFIG_FILE_NAME = "logintest.cfg"
diff --git a/st2client/tests/unit/test_shell.py b/st2client/tests/unit/test_shell.py
index 5eb27714ca..425347db97 100644
--- a/st2client/tests/unit/test_shell.py
+++ b/st2client/tests/unit/test_shell.py
@@ -37,6 +37,7 @@
from st2common.models.db.auth import TokenDB
from tests import base
+
LOG = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
diff --git a/st2common/st2common/exceptions/auth.py b/st2common/st2common/exceptions/auth.py
index 5eab1915f5..15a04a5680 100644
--- a/st2common/st2common/exceptions/auth.py
+++ b/st2common/st2common/exceptions/auth.py
@@ -31,6 +31,7 @@
"AmbiguousUserError",
"NotServiceUserError",
"SSOVerificationError",
+ "SSORequestNotFoundError",
]
@@ -38,6 +39,10 @@ class TokenNotProvidedError(StackStormBaseException):
pass
+class SSORequestNotFoundError(StackStormBaseException):
+ pass
+
+
class TokenNotFoundError(StackStormDBObjectNotFoundError):
pass
diff --git a/st2common/st2common/models/db/auth.py b/st2common/st2common/models/db/auth.py
index 2531ecb11a..ccb8558cea 100644
--- a/st2common/st2common/models/db/auth.py
+++ b/st2common/st2common/models/db/auth.py
@@ -16,6 +16,7 @@
from __future__ import absolute_import
import copy
+from enum import Enum
import mongoengine as me
from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
@@ -25,7 +26,7 @@
from st2common.rbac.backends import get_rbac_backend
from st2common.util import date as date_utils
-__all__ = ["UserDB", "TokenDB", "ApiKeyDB"]
+__all__ = ["UserDB", "TokenDB", "ApiKeyDB", "SSORequestDB"]
class UserDB(stormbase.StormFoundationDB):
@@ -85,6 +86,29 @@ class TokenDB(stormbase.StormFoundationDB):
service = me.BooleanField(required=True, default=False)
+class SSORequestDB(stormbase.StormFoundationDB):
+ class Type(Enum):
+ CLI = "cli"
+ WEB = "web"
+
+ """
+ An entity representing a SSO request.
+
+ Attribute:
+ request_id: Reference to the SSO request unique ID
+ expiry: Time at which this request expires.
+ type: What type of SSO request is this? web/cli
+
+ -- cli --
+ key: Symmetric key used to encrypt/decrypt contents from/to the CLI.
+ """
+
+ request_id = me.StringField(required=True)
+ key = me.StringField(required=False, unique=False)
+ expiry = me.DateTimeField(required=True)
+ type = me.EnumField(Type, required=True)
+
+
class ApiKeyDB(stormbase.StormFoundationDB, stormbase.UIDFieldMixin):
"""
An entity representing an API key object.
@@ -127,4 +151,4 @@ def mask_secrets(self, value):
return result
-MODELS = [UserDB, TokenDB, ApiKeyDB]
+MODELS = [UserDB, TokenDB, ApiKeyDB, SSORequestDB]
diff --git a/st2common/st2common/openapi.yaml b/st2common/st2common/openapi.yaml
index e86e42727d..de2c693a27 100644
--- a/st2common/st2common/openapi.yaml
+++ b/st2common/st2common/openapi.yaml
@@ -4526,10 +4526,11 @@ paths:
schema:
$ref: '#/definitions/Error'
security: []
- /auth/v1/sso/request:
+
+ /auth/v1/sso/request/web:
get:
- operationId: st2auth.controllers.v1.sso:sso_request_controller.get
- description: Redirects to the SSO Idp login page.
+ operationId: st2auth.controllers.v1.sso:sso_request_controller.get_web
+ description: Redirects to the SSO Idp login page from a user that's using the browser.
parameters:
- name: referer
in: header
@@ -4539,6 +4540,31 @@ paths:
'307':
description: Temporary redirect
security: []
+
+ /auth/v1/sso/request/cli:
+ post:
+ operationId: st2auth.controllers.v1.sso:sso_request_controller.post_cli
+ description: Issues an encrypted SSO login request for a CLI
+ parameters:
+ - name: response
+ in: body
+ description: SSO request with callback and key encryption
+ schema:
+ type: object
+ required:
+ - key
+ - callback_url
+ properties:
+ key:
+ type: string
+ description: The symmetric key to be used to encrypt contents of callback
+ callback_url:
+ type: string
+ description: What URL to be called back once the response from SSO is received
+ responses:
+ '200':
+ description: SSO request valid
+ security: []
/auth/v1/sso/callback:
post:
operationId: st2auth.controllers.v1.sso:idp_callback_controller.post
@@ -4552,6 +4578,8 @@ paths:
responses:
'200':
description: SSO response valid
+ '302':
+ description: SSO response valid and callback URL returned
'401':
description: Invalid or missing credentials has been provided
schema:
diff --git a/st2common/st2common/openapi.yaml.j2 b/st2common/st2common/openapi.yaml.j2
index f053f0f3d0..9c2177bc41 100644
--- a/st2common/st2common/openapi.yaml.j2
+++ b/st2common/st2common/openapi.yaml.j2
@@ -4522,10 +4522,11 @@ paths:
schema:
$ref: '#/definitions/Error'
security: []
- /auth/v1/sso/request:
+
+ /auth/v1/sso/request/web:
get:
- operationId: st2auth.controllers.v1.sso:sso_request_controller.get
- description: Redirects to the SSO Idp login page.
+ operationId: st2auth.controllers.v1.sso:sso_request_controller.get_web
+ description: Redirects to the SSO Idp login page from a user that's using the browser.
parameters:
- name: referer
in: header
@@ -4535,6 +4536,31 @@ paths:
'307':
description: Temporary redirect
security: []
+
+ /auth/v1/sso/request/cli:
+ post:
+ operationId: st2auth.controllers.v1.sso:sso_request_controller.post_cli
+ description: Issues an encrypted SSO login request for a CLI
+ parameters:
+ - name: response
+ in: body
+ description: SSO request with callback and key encryption
+ schema:
+ type: object
+ required:
+ - key
+ - callback_url
+ properties:
+ key:
+ type: string
+ description: The symmetric key to be used to encrypt contents of callback
+ callback_url:
+ type: string
+ description: What URL to be called back once the response from SSO is received
+ responses:
+ '200':
+ description: SSO request valid
+ security: []
/auth/v1/sso/callback:
post:
operationId: st2auth.controllers.v1.sso:idp_callback_controller.post
@@ -4548,6 +4574,8 @@ paths:
responses:
'200':
description: SSO response valid
+ '302':
+ description: SSO response valid and callback URL returned
'401':
description: Invalid or missing credentials has been provided
schema:
diff --git a/st2common/st2common/persistence/auth.py b/st2common/st2common/persistence/auth.py
index a8fad7488f..78b168c5da 100644
--- a/st2common/st2common/persistence/auth.py
+++ b/st2common/st2common/persistence/auth.py
@@ -15,6 +15,7 @@
from __future__ import absolute_import
from st2common.exceptions.auth import (
+ SSORequestNotFoundError,
TokenNotFoundError,
ApiKeyNotFoundError,
UserNotFoundError,
@@ -22,7 +23,7 @@
NoNicknameOriginProvidedError,
)
from st2common.models.db import MongoDBAccess
-from st2common.models.db.auth import UserDB, TokenDB, ApiKeyDB
+from st2common.models.db.auth import SSORequestDB, UserDB, TokenDB, ApiKeyDB
from st2common.persistence.base import Access
from st2common.util import hash as hash_utils
@@ -59,6 +60,44 @@ def _get_by_object(cls, object):
return cls.get_by_name(name)
+class SSORequest(Access):
+ impl = MongoDBAccess(SSORequestDB)
+
+ @classmethod
+ def _get_impl(cls):
+ return cls.impl
+
+ @classmethod
+ def add_or_update(cls, model_object, publish=True, validate=True):
+ if not getattr(model_object, "request_id", None):
+ raise ValueError("SSO Request ID is not provided in the object.")
+ if not getattr(model_object, "type", None):
+ raise ValueError("SSO request type is not defined in the object")
+ if not getattr(model_object, "expiry", None):
+ raise ValueError("SSO request expiry is not provided in the object.")
+ return super(SSORequest, cls).add_or_update(
+ model_object, publish=publish, validate=validate
+ )
+
+ @classmethod
+ def get(cls, value):
+ result = cls.query(id=value).first()
+
+ if not result:
+ raise SSORequestNotFoundError()
+
+ return result
+
+ @classmethod
+ def get_by_request_id(cls, value):
+ result = cls.query(request_id=value).first()
+
+ if not result:
+ raise SSORequestNotFoundError()
+
+ return result
+
+
class Token(Access):
impl = MongoDBAccess(TokenDB)
diff --git a/st2common/st2common/services/access.py b/st2common/st2common/services/access.py
index 9d88c39c42..73433ee849 100644
--- a/st2common/st2common/services/access.py
+++ b/st2common/st2common/services/access.py
@@ -21,16 +21,27 @@
from st2common.util import isotime
from st2common.util import date as date_utils
-from st2common.exceptions.auth import TokenNotFoundError, UserNotFoundError
+from st2common.exceptions.auth import (
+ TokenNotFoundError,
+ UserNotFoundError,
+)
from st2common.exceptions.auth import TTLTooLargeException
-from st2common.models.db.auth import TokenDB, UserDB
-from st2common.persistence.auth import Token, User
+from st2common.models.db.auth import SSORequestDB, TokenDB, UserDB
+from st2common.persistence.auth import SSORequest, Token, User
from st2common import log as logging
-__all__ = ["create_token", "delete_token"]
+__all__ = [
+ "create_token",
+ "delete_token",
+ "create_cli_sso_request",
+ "create_web_sso_request",
+ "get_sso_request_by_request_id",
+]
LOG = logging.getLogger(__name__)
+DEFAULT_SSO_REQUEST_TTL = 120
+
def create_token(
username, ttl=None, metadata=None, add_missing_user=True, service=False
@@ -105,3 +116,52 @@ def delete_token(token):
pass
except Exception:
raise
+
+
+def create_cli_sso_request(request_id, key, ttl=DEFAULT_SSO_REQUEST_TTL):
+ """
+ :param request_id: ID of the SSO request that is being created (usually uuid format prepended by _)
+ :type request_id: ``str``
+
+ :param key: Symmetric key used to encrypt/decrypt the request between the CLI and the server
+ :type key: ``str``
+
+ :param ttl: SSO request TTL (in seconds).
+ :type ttl: ``int``
+ """
+
+ return _create_sso_request(request_id, ttl, SSORequestDB.Type.CLI, key=key)
+
+
+def create_web_sso_request(request_id, ttl=DEFAULT_SSO_REQUEST_TTL):
+ """
+ :param request_id: ID of the SSO request that is being created (usually uuid format prepended by _)
+ :type request_id: ``str``
+
+ :param ttl: SSO request TTL (in seconds).
+ :type ttl: ``int``
+ """
+
+ return _create_sso_request(request_id, ttl, SSORequestDB.Type.WEB)
+
+
+def _create_sso_request(request_id, ttl, type, **kwargs) -> SSORequestDB:
+
+ expiry = date_utils.get_datetime_utc_now() + datetime.timedelta(seconds=ttl)
+
+ request = SSORequestDB(request_id=request_id, expiry=expiry, type=type, **kwargs)
+ SSORequest.add_or_update(request)
+
+ expire_string = isotime.format(expiry, offset=False)
+
+ LOG.audit(
+ 'Created SAML request with ID "%s" set to expire at "%s" of type "%s".'
+ % (request_id, expire_string, type)
+ )
+
+ return request
+
+
+def get_sso_request_by_request_id(request_id) -> SSORequestDB:
+ request_db = SSORequest.get_by_request_id(request_id)
+ return request_db
diff --git a/st2common/st2common/util/crypto.py b/st2common/st2common/util/crypto.py
index 0aea24763c..e6be862101 100644
--- a/st2common/st2common/util/crypto.py
+++ b/st2common/st2common/util/crypto.py
@@ -184,16 +184,33 @@ def read_crypto_key(key_path):
content = json_decode(content)
+ try:
+ return read_crypto_key_from_dict(content)
+ except KeyError as e:
+ msg = 'Invalid or malformed key file "%s": %s' % (key_path, six.text_type(e))
+ raise KeyError(msg)
+
+
+def read_crypto_key_from_dict(key_dict):
+ """
+ Read crypto key from provided Keyczar JSON-format dict and return parsed AESKey object.
+
+ :param key_dict: A dictionary with a key in Keyczar format (same keys as the JSON).
+ :type key_dict: ``dict``
+
+ :rtype: :class:`AESKey`
+ """
+
try:
aes_key = AESKey(
- aes_key_string=content["aesKeyString"],
- hmac_key_string=content["hmacKey"]["hmacKeyString"],
- hmac_key_size=content["hmacKey"]["size"],
- mode=content["mode"].upper(),
- size=content["size"],
+ aes_key_string=key_dict["aesKeyString"],
+ hmac_key_string=key_dict["hmacKey"]["hmacKeyString"],
+ hmac_key_size=key_dict["hmacKey"]["size"],
+ mode=key_dict["mode"].upper(),
+ size=key_dict["size"],
)
except KeyError as e:
- msg = 'Invalid or malformed key file "%s": %s' % (key_path, six.text_type(e))
+ msg = "Invalid or malformed AES key dictionary: %s" % (six.text_type(e))
raise KeyError(msg)
return aes_key
diff --git a/st2common/tests/unit/services/test_access.py b/st2common/tests/unit/services/test_access.py
index 4f7d8169b4..7ca61b358b 100644
--- a/st2common/tests/unit/services/test_access.py
+++ b/st2common/tests/unit/services/test_access.py
@@ -18,6 +18,7 @@
import uuid
from oslo_config import cfg
+from st2common.models.db.auth import SSORequestDB
from st2tests.base import DbTestCase
from st2common.util import isotime
from st2common.util import date as date_utils
@@ -30,6 +31,8 @@
USERNAME = "manas"
+SSO_REQUEST_ID = "a58fa0cd-61c8-4bd9-a2e7-a4497d6aca68"
+
class AccessServiceTest(DbTestCase):
@classmethod
@@ -106,3 +109,37 @@ def test_create_token_service_token_can_use_arbitrary_ttl(self):
self.assertRaises(
TTLTooLargeException, access.create_token, USERNAME, ttl=ttl, service=False
)
+
+ def test_create_cli_sso_request(self):
+ request = access.create_cli_sso_request(SSO_REQUEST_ID, None, 20)
+ self.assertIsNotNone(request)
+ self.assertEqual(request.type, SSORequestDB.Type.CLI)
+ self.assertEqual(request.request_id, SSO_REQUEST_ID)
+ self.assertLessEqual(
+ abs(
+ request.expiry.timestamp()
+ - date_utils.get_datetime_utc_now().timestamp()
+ - 20
+ ),
+ 2,
+ )
+
+ def test_create_web_sso_request(self):
+ request = access.create_web_sso_request(SSO_REQUEST_ID, 20)
+ self.assertIsNotNone(request)
+ self.assertEqual(request.type, SSORequestDB.Type.WEB)
+ self.assertEqual(request.request_id, SSO_REQUEST_ID)
+ self.assertLessEqual(
+ abs(
+ request.expiry.timestamp()
+ - date_utils.get_datetime_utc_now().timestamp()
+ - 20
+ ),
+ 2,
+ )
+
+ def test_get_sso_request_by_id(self):
+ access.create_web_sso_request(SSO_REQUEST_ID, 20)
+ request = access.get_sso_request_by_request_id(SSO_REQUEST_ID)
+ self.assertIsNotNone(request)
+ self.assertEqual(request.request_id, SSO_REQUEST_ID)
diff --git a/st2common/tests/unit/test_db_auth.py b/st2common/tests/unit/test_db_auth.py
index b159580505..3cae8be5cb 100644
--- a/st2common/tests/unit/test_db_auth.py
+++ b/st2common/tests/unit/test_db_auth.py
@@ -14,14 +14,16 @@
# limitations under the License.
from __future__ import absolute_import
-from st2common.models.db.auth import UserDB
+import datetime
+from st2common.models.db.auth import SSORequestDB, UserDB
from st2common.models.db.auth import TokenDB
from st2common.models.db.auth import ApiKeyDB
-from st2common.persistence.auth import User
+from st2common.persistence.auth import SSORequest, User
from st2common.persistence.auth import Token
from st2common.persistence.auth import ApiKey
-from st2common.util.date import get_datetime_utc_now
+from st2common.util.date import add_utc_tz, get_datetime_utc_now
from st2tests import DbTestCase
+from mongoengine.errors import ValidationError
from tests.unit.base import BaseDBModelCRUDTestCase
@@ -58,3 +60,61 @@ class ApiKeyDBModelCRUDTestCase(BaseDBModelCRUDTestCase, DbTestCase):
persistance_class = ApiKey
model_class_kwargs = {"user": "pony", "key_hash": "token-token-token-token"}
update_attribute_name = "user"
+
+
+class SSORequestDBModelCRUDTestCase(BaseDBModelCRUDTestCase, DbTestCase):
+ model_class = SSORequestDB
+ persistance_class = SSORequest
+ model_class_kwargs = {
+ "request_id": "48144c2b-7969-4708-ba1d-96fd7d05393f",
+ "expiry": add_utc_tz(
+ datetime.datetime.strptime("2050-01-05T10:00:00", "%Y-%m-%dT%H:%M:%S")
+ ),
+ "type": SSORequestDB.Type.CLI,
+ }
+ update_attribute_name = "request_id"
+
+ def _save_model(self, **kwargs):
+ model_db = self.model_class(**kwargs)
+ self.persistance_class.add_or_update(model_db)
+
+ def test_missing_parameters(self):
+
+ self.assertRaises(
+ ValueError,
+ self._save_model,
+ **{
+ "request_id": self.model_class_kwargs["request_id"],
+ "expiry": self.model_class_kwargs["expiry"],
+ },
+ )
+
+ self.assertRaises(
+ ValueError,
+ self._save_model,
+ **{
+ "request_id": self.model_class_kwargs["request_id"],
+ "type": self.model_class_kwargs["type"],
+ },
+ )
+
+ self.assertRaises(
+ ValueError,
+ self._save_model,
+ **{
+ "type": self.model_class_kwargs["type"],
+ "expiry": self.model_class_kwargs["expiry"],
+ },
+ )
+
+ def test_invalid_parameters(self):
+
+ self.assertRaises(
+ ValidationError,
+ self._save_model,
+ **{
+ "type": "invalid",
+ "expiry": self.model_class_kwargs["expiry"],
+ "request_id": self.model_class_kwargs["request_id"],
+ },
+ )