Skip to content

Commit

Permalink
Feat(plugins): Add "unique_keys" in avdschema. (aristanetworks#3725)
Browse files Browse the repository at this point in the history
  • Loading branch information
ClausHolbechArista authored Mar 17, 2024
1 parent b48d3e3 commit cbbac76
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ The meta-schema does not allow for other keys to be set in the schema.
| <samp>min_length</samp> | Integer | | | | Minimum length |
| <samp>primary_key</samp> | String | | | Pattern: `^[a-z][a-z0-9_]*$` | Name of a primary key in a list of dictionaries.<br>The configured key is implicitly required and must have unique values between the list elements |
| <samp>secondary_key</samp> | String | | | Pattern: `^[a-z][a-z0-9_]*$` | Name of a secondary key, which is used with `convert_types:['dict']` in case of values not being dictionaries |
| <samp>unique_keys</samp> | List, items: String | | | Item Pattern: `^[a-z][a-z0-9_]*$` | Name of a key in a list of dictionaries.<br>The configured key must have unique values between the list elements.<br>This can also be a variable path using dot-notation like `parent_key.child_key` in case of nested lists of dictionaries. |
| <samp>display_name</samp> | String | | | Regex Pattern: `"^[^\n]+$"` | Free text display name for forms and documentation (single line) |
| <samp>description</samp> | String | | | Minimum Length: 1 | Free text description for forms and documentation (multi line) |
| <samp>required</samp> | Boolean | | | | Set if variable is required |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@
"pattern": "^[a-z][a-z0-9_]*$",
"description": "Name of a secondary key, which is used with `convert_types:[dict]` in case of values not being dictionaries."
},
"unique_keys": {
"type": "array",
"description": "List of types to auto-convert from.\nFor 'list of dicts' auto-conversion is supported from 'dict' if 'primary_key' is set on the list schema\nFor other list item types conversion from dict will use the keys as list items.",
"items": {
"type": "string",
"$comment": "The regex here matches valid key names",
"pattern": "^[a-z][a-z0-9_\\.]*$",
"description": "Name of a key in a list of dictionaries.\nThe configured key must have unique values between the list elements.\nThis can also be a variable path using dot-notation like 'parent_key.child_key' in case of nested lists of dictionaries."
}
},
"display_name": { "$ref": "#/$defs/display_name" },
"description": { "$ref": "#/$defs/description" },
"required": { "$ref": "#/$defs/required" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ansible_collections.arista.avd.plugins.plugin_utils.errors import AristaAvdError
from ansible_collections.arista.avd.plugins.plugin_utils.schema.refresolver import create_refresolver
from ansible_collections.arista.avd.plugins.plugin_utils.utils import get_all
from ansible_collections.arista.avd.plugins.plugin_utils.utils import get_all, get_all_with_path, get_indices_of_duplicate_items

try:
import jsonschema
Expand All @@ -19,11 +19,42 @@
JSONSCHEMA_IMPORT_ERROR = None


def _unique_keys_validator(validator, unique_keys: list[str], instance: list, schema: dict):
if not validator.is_type(unique_keys, "list"):
return

if not validator.is_type(instance, "list") or not instance:
return

if not all(validator.is_type(element, "dict") for element in instance):
return

for unique_key in unique_keys:
if not (paths_and_values := tuple(get_all_with_path(instance, unique_key))):
# No values matching the unique key, check the next unique_key
continue

# Separate all paths and values
paths, values = zip(*paths_and_values)

key = unique_key.split(".")[-1]
is_nested_key = unique_key != key

# Find any duplicate values and emit errors for each index.
for duplicate_value, duplicate_indices in get_indices_of_duplicate_items(values):
for duplicate_index in duplicate_indices:
yield jsonschema.ValidationError(
f"The value '{duplicate_value}' is not unique between all {'nested ' if is_nested_key else ''}list items as required.",
path=[*paths[duplicate_index], key],
schema_path=["items"],
)


def _primary_key_validator(validator, primary_key: str, instance: list, schema: dict):
if not validator.is_type(primary_key, "str"):
return

if not validator.is_type(instance, "list"):
if not validator.is_type(instance, "list") or not instance:
return

if not all(validator.is_type(element, "dict") for element in instance):
Expand All @@ -32,8 +63,8 @@ def _primary_key_validator(validator, primary_key: str, instance: list, schema:
if not all(element.get(primary_key) is not None for element in instance):
yield jsonschema.ValidationError(f"Primary key '{primary_key}' is not set on all items as required.")

if len(set(element.get(primary_key) for element in instance)) < len(instance):
yield jsonschema.ValidationError(f"Values of Primary key '{primary_key}' are not unique as required.")
# Reusing the unique keys validator
yield from _unique_keys_validator(validator, [primary_key], instance, schema)


def _keys_validator(validator, keys: dict, instance: dict, schema: dict):
Expand Down Expand Up @@ -140,6 +171,7 @@ def __new__(cls, schema, store):
"pattern": jsonschema._validators.pattern,
"items": jsonschema._validators.items,
"primary_key": _primary_key_validator,
"unique_keys": _unique_keys_validator,
"keys": _keys_validator,
"dynamic_keys": _dynamic_keys_validator,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from .cprofile_decorator import cprofile
from .default import default
from .get import get
from .get_all import get_all
from .get_all import get_all, get_all_with_path
from .get_indices_of_duplicate_items import get_indices_of_duplicate_items
from .get_ip_from_pool import get_ip_from_pool
from .get_item import get_item
from .get_templar import get_templar
Expand All @@ -33,6 +34,8 @@
"default",
"get",
"get_all",
"get_all_with_path",
"get_indices_of_duplicate_items",
"get_ip_from_pool",
"get_item",
"get_templar",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2023-2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
from typing import Any, Generator

from ansible_collections.arista.avd.plugins.plugin_utils.errors import AristaAvdMissingVariableError


Expand Down Expand Up @@ -60,3 +62,48 @@ def get_all(data, path: str, required: bool = False, org_path=None):
return [value]

return []


def get_all_with_path(data, path: str, _current_path: list[str | int] | None = None) -> Generator[tuple[list[str | int], Any], None, None]:
"""
Get all values from data matching a data path including the path they were found in.
Path supports dot-notation like "foo.bar" to do deeper lookups. Lists will be unpacked recursively.
Returns an empty list if the path is not found and required is False.
Parameters
----------
data : any
Data to walk through
path : str
Data Path - supporting dot-notation for nested dictionaries/lists
_current_path : list[str|int]
Internal variable used for tracking the full path even when called recursively
Returns
-------
Generator yielding Tuples (<path>, <value>) for all values from data matching a data path.
"""
if _current_path is None:
_current_path = []

path_elements = str(path).split(".")
if isinstance(data, list):
for index, data_item in enumerate(data):
yield from get_all_with_path(data_item, path, _current_path=[*_current_path, index])

elif isinstance(data, dict):
value = data.get(path_elements[0])

if value is None:
return

if len(path_elements) > 1:
yield from get_all_with_path(value, ".".join(path_elements[1:]), _current_path=[*_current_path, path_elements[0]])
return

else:
yield (_current_path, value)

return
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2024 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
from collections import defaultdict
from typing import Any, Generator


def get_indices_of_duplicate_items(values: list) -> Generator[tuple[Any, list[int]], None, None]:
"""
Returns a Generator of Tuples with (<value>, [<indices of duplicate items>])
"""
counters = defaultdict(list)
for index, item in enumerate(values):
counters[item].append(index)
return ((value, indices) for value, indices in counters.items() if len(indices) > 1)
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,47 @@
]


UNIQUE_KEYS_SCHEMAS = [
{
"type": "list",
"unique_keys": ["key", "nested_list.nested_list_key"],
"items": {
"type": "dict",
"keys": {"key": {"type": "str"}, "nested_list": {"type": "list", "items": {"type": "dict", "keys": {"nested_list_key": {"type": "int"}}}}},
},
}
]

UNIQUE_KEYS_VALID_DATA = [
[
{"key": "a", "nested_list": [{"nested_list_key": 1}, {"nested_list_key": 2}]},
{"key": "b", "nested_list": [{"nested_list_key": 3}, {"nested_list_key": 4}]},
],
[
{"key": "a", "nested_list": [{"nested_list_key": 1}, {}]},
{"nested_list": [{"nested_list_key": 3}, {"nested_list_key": 4}]},
],
[],
[{}],
[{"nested_list": []}],
]

UNIQUE_KEYS_INVALID_DATA = [
[
{"key": "a", "nested_list": [{"nested_list_key": 1}, {"nested_list_key": 2}]},
{"key": "b", "nested_list": [{"nested_list_key": 3}, {"nested_list_key": 3}]},
],
[
{"key": "a", "nested_list": [{"nested_list_key": 1}, {"nested_list_key": 2}]},
{"key": "b", "nested_list": [{"nested_list_key": 1}, {"nested_list_key": 4}]},
],
[
{"key": "a", "nested_list": [{"nested_list_key": 1}, {"nested_list_key": 2}]},
{"key": "a", "nested_list": [{"nested_list_key": 3}, {"nested_list_key": 4}]},
],
]


class TestAvdSchema:
def test_avd_schema_init_without_schema(self):
avdschema = AvdSchema()
Expand Down Expand Up @@ -198,3 +239,27 @@ def test_avd_schema_subschema_with_ref_to_store_schemas(self):
subschema = avdschema.subschema([id])
assert subschema.get("type") == "dict"
assert subschema.get("keys") is not None

@pytest.mark.parametrize("TEST_SCHEMA", UNIQUE_KEYS_SCHEMAS)
@pytest.mark.parametrize("TEST_DATA", UNIQUE_KEYS_VALID_DATA)
def test_avd_schema_validate_unique_keys_valid_data(self, TEST_SCHEMA, TEST_DATA):
try:
for validation_error in AvdSchema(TEST_SCHEMA).validate(TEST_DATA):
assert False, f"Validation Error '{validation_error.message}' returned"
except Exception as e:
assert False, f"AvdSchema(UNIQUE_KEYS_SCHEMAS).validate(UNIQUE_KEYS_VALID_DATA) raised an exception: {e}"
assert True

@pytest.mark.parametrize("TEST_SCHEMA", UNIQUE_KEYS_SCHEMAS)
@pytest.mark.parametrize("INVALID_DATA", UNIQUE_KEYS_INVALID_DATA)
def test_avd_schema_validate_unique_keys_invalid_data(self, TEST_SCHEMA, INVALID_DATA):
try:
validation_errors = tuple(AvdSchema(TEST_SCHEMA).validate(INVALID_DATA))
if not validation_errors:
assert False, "did NOT fail validation"
for validation_error in validation_errors:
assert isinstance(validation_error, AvdValidationError)
assert validation_error.path.endswith((".key", ".nested_list_key"))

except Exception as e:
assert False, f"AvdSchema(UNIQUE_KEYS_SCHEMAS).validate(UNIQUE_KEYS_INVALID_DATA) raised an exception: {e}"
6 changes: 6 additions & 0 deletions python-avd/schema_tools/metaschema/meta_schema_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ class ConvertType(str, Enum):
"""
secondary_key: str | None = Field(None, pattern=KEY_PATTERN)
"""Name of a secondary key, which is used with `convert_types:[dict]` in case of values not being dictionaries."""
unique_keys: list[str] | None = None
"""
Name of a key in a list of dictionaries.
The configured key must have unique values between the list elements.
This can also be a variable path using dot-notation like 'parent_key.child_key' in case of nested lists of dictionaries.
"""

# Type of schema docs generators to use for this schema field.
_table_row_generator = TableRowGenList
Expand Down

0 comments on commit cbbac76

Please sign in to comment.