From 8365a498b90d6421a09027fbf6212aef4d01aeb7 Mon Sep 17 00:00:00 2001 From: Suran de Silva Date: Tue, 12 Dec 2023 12:46:11 -0800 Subject: [PATCH] cloudvision/cvlibs: add methods for interface tag operations Change-Id: I72f91778731a8ba330f0ede0d66362b4b7706269 --- cloudvision/cvlib/context.py | 27 +- cloudvision/cvlib/device.py | 68 ++ cloudvision/cvlib/exceptions.py | 14 +- cloudvision/cvlib/tags.py | 301 ++++++-- test/cvlib/tags/test_interfaceTags.py | 1002 +++++++++++++++++++++++++ test/cvlib/tags/test_tags.py | 311 +++++++- 6 files changed, 1661 insertions(+), 62 deletions(-) create mode 100644 test/cvlib/tags/test_interfaceTags.py diff --git a/cloudvision/cvlib/context.py b/cloudvision/cvlib/context.py index 1d029a1d..ab1fe1bb 100644 --- a/cloudvision/cvlib/context.py +++ b/cloudvision/cvlib/context.py @@ -26,7 +26,7 @@ STUDIO_IDS_ARG, WORKSPACE_ID_ARG, ) -from .device import Device +from .device import Device, Interface from .execution import Execution from .exceptions import ( ConnectionFailed, @@ -803,3 +803,28 @@ def getDevicesByTag(self, tag: Tag, inTopology: bool = True): elif not inTopology: devices.append(Device(deviceId=devId)) return devices + + def getInterfacesByTag(self, tag: Tag, inTopology: bool = True): + ''' + Returns list of interfaces that have the user tag assigned to them. + If tag.value is unspecified then returns interfaces having that label assigned. + By default only interfaces in the topology are returned. + ''' + interfaces = [] + # Note use list instead of .items() + # parallel thread might add/delete tags + for devId in list(allTags := self.tags._getAllInterfaceTags()): + for intfId in list(devIntfTags := allTags.get(devId, {})): + tags = devIntfTags.get(intfId, {}) + if tags.get(tag.label) and ( + not tag.value or tag.value in tags.get(tag.label, [])): + if dev := self.topology._deviceMap.get(devId) if self.topology else None: + if intf := dev.getInterface(intfId): + interfaces.append(intf) + elif not inTopology: + interfaces.append(Interface(name=intfId, device=dev)) + elif not inTopology: + interfaces.append( + Interface(name=intfId, + device=Device(deviceId=devId))) + return interfaces diff --git a/cloudvision/cvlib/device.py b/cloudvision/cvlib/device.py index d759c27d..4034e16c 100644 --- a/cloudvision/cvlib/device.py +++ b/cloudvision/cvlib/device.py @@ -110,6 +110,25 @@ def _unassignTag(self, ctx, tag: Tag): else: ctx.tags._unassignDeviceTagLabel(self.id, tag.label) + def getInterfacesByTag(self, ctx, tag: Tag, inTopology: bool = True): + ''' + Returns list of interfaces that have the user tag assigned to them. + If tag.value is unspecified then returns interfaces having that label assigned. + By default only interfaces in the topology are returned. + ''' + interfaces = [] + # Note use list instead of .items() + # parallel thread might add/delete tags + for intfId in list(devIntfTags := ctx.tags._getAllInterfaceTags().get(self.id, {})): + tags = devIntfTags.get(intfId, {}) + if tags.get(tag.label) and ( + not tag.value or tag.value in tags.get(tag.label, [])): + if intf := self.getInterface(intfId): + interfaces.append(intf) + elif not inTopology: + interfaces.append(Interface(name=intfId, device=self)) + return interfaces + # Interfaces and devices are defined together to avoid circular imports class Interface: @@ -143,3 +162,52 @@ def setPeerInfo(self, device: Device, interface): def getPeerInfo(self): return self._peerDevice, self._peerInterface + + def getSingleTag(self, ctx, label: str, required: bool = True): + ''' + Returns a Tag of the label assigned to the interface. + Raises TagTooManyValuesException if there are multiple tags of the label assigned. + Raises TagMissingException if required is True and the tag is missing. + Returns None if required is False and the tag is missing. + ''' + devName = str(self._device.hostName) if self._device.hostName else str(self._device.id) + values = ctx.tags._getInterfaceTags(self._device.id, self.name).get(label) + if values and len(values) > 1: + raise TagTooManyValuesException(label, devName, values, self.name) + if required and not values: + raise TagMissingException(label, devName, self.name) + return Tag(label, values[0]) if values else None + + def getTags(self, ctx, label: str = None): + ''' + Returns a list of Tags matching the specified label assigned to the interface. + If label is unspecified then it returns all Tags assigned to the interface. + ''' + tags: List[Tag] = [] + if not (ctxTags := ctx.tags._getInterfaceTags(self._device.id, self.name)): + return tags + # Note use list instead of .items() + # parallel thread might add/delete tags + for tagLabel in list(ctxTags): + if label and label != tagLabel: + continue + for value in ctxTags.get(tagLabel, []): + tags.append(Tag(tagLabel, value)) + return tags + + def _assignTag(self, ctx, tag: Tag, replaceValue: bool = True): + ''' + Assign a Tag to an interface. + If replaceValue is True ensures only one value of label is assigned. + ''' + ctx.tags._assignInterfaceTag(self._device.id, self.name, tag.label, tag.value, replaceValue) + + def _unassignTag(self, ctx, tag: Tag): + ''' + Unassign a Tag from an interface. + If tag.value is unspecified unassign all tags of label. + ''' + if tag.value: + ctx.tags._unassignInterfaceTag(self._device.id, self.name, tag.label, tag.value) + else: + ctx.tags._unassignInterfaceTagLabel(self._device.id, self.name, tag.label) diff --git a/cloudvision/cvlib/exceptions.py b/cloudvision/cvlib/exceptions.py index d12184d3..2924ff9e 100644 --- a/cloudvision/cvlib/exceptions.py +++ b/cloudvision/cvlib/exceptions.py @@ -311,11 +311,14 @@ class TagOperationException(TagErrorException): Exception raised when an attempted tag operation is invalid """ - def __init__(self, label: str, value: str, operation: str, devId: str = None): + def __init__(self, label: str, value: str, operation: str, + devId: str = None, intfId: str = None): message = (f"invalid attempt to {operation} tag " f"{self.expTagField(label)}:{self.expTagField(value)}") if devId: message = message + " for device " + devId + if intfId: + message = message + " on interface " + intfId super().__init__(message) @@ -324,9 +327,11 @@ class TagMissingException(TagErrorException): Exception raised when a tag is missing from a device """ - def __init__(self, label: str, devId: str): + def __init__(self, label: str, devId: str, intfId: str = None): message = (f"{self.expTagField(label)} tag missing" f" for device {devId}") + if intfId: + message = message + " on interface " + intfId super().__init__(message) @@ -335,9 +340,12 @@ class TagTooManyValuesException(TagErrorException): Exception raised when a tag has too many values assigned to a device """ - def __init__(self, label: str, devId: str, currVals: List[str] = None): + def __init__(self, label: str, devId: str, currVals: List[str] = None, + intfId: str = None): message = (f"{self.expTagField(label)} tag has too many values" f" assigned to device {devId}") + if intfId: + message = message + " on interface " + intfId if currVals: message = message + ", assigned values: " + ", ".join(currVals) super().__init__(message) diff --git a/cloudvision/cvlib/tags.py b/cloudvision/cvlib/tags.py index 922cd1d6..62b0577b 100644 --- a/cloudvision/cvlib/tags.py +++ b/cloudvision/cvlib/tags.py @@ -17,6 +17,7 @@ TagAssignment, TagAssignmentConfig, ELEMENT_TYPE_DEVICE, + ELEMENT_TYPE_INTERFACE, CREATOR_TYPE_USER ) @@ -65,18 +66,68 @@ class Tags: Note that a tag is of the form label:value, where the same label may be associated with many values. Device tags are assigned to devices. + Interface tags are assigned to devices' interfaces. - ctx: Context in which the studio build is run - - relevantTagAssigns: Dictionary of relevant tags, - of the form map[deviceId]map[label]=[value1,value2,..], + - relevantTagAssigns: Dictionary of relevant device tags, of the form: + {deviceId: {label: [value1,value2,..]}}, + works like a cache + - relevantIntfTagAssigns: Dictionary of relevant interface tags, of the form: + {deviceId: {intfId: {label: [value1,value2,..]}}}, works like a cache ''' - def __init__(self, - context): + def __init__(self, context): self.ctx = context self.relevantTagAssigns: Dict = {} + self.relevantIntfTagAssigns: Dict = {} + + def _getTagUpdatesFromWorkspace(self, etype=ELEMENT_TYPE_DEVICE): + ''' + _getTagUpdatesFromWorkspace returns a list of tags updates, + of the specified ElementType, in the workspace. + The returned list is of the form: + list[(deviceId, interfaceId, label, value, remove)] + ''' + workspaceTagUpdates = [] + tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub) + tagRequest = TagAssignmentConfigStreamRequest() + tagFilter = TagAssignmentConfig() + tagFilter.key.element_type = etype + tagFilter.key.workspace_id.value = self.ctx.getWorkspaceId() + tagRequest.partial_eq_filter.append(tagFilter) + for resp in tagClient.GetAll(tagRequest): + workspaceTagUpdates.append((resp.value.key.device_id.value, + resp.value.key.interface_id.value, + resp.value.key.label.value, + resp.value.key.value.value, + resp.value.remove.value)) + return workspaceTagUpdates + + def _createTag(self, etype: int, label: str, value: str): + ''' + _createTag creates a tag if it doesn't already exist, + of the specified ElementType, in the workspace. + ''' + if not label or not value: + raise TagOperationException(label, value, 'create') + # check if the tag exists + if etype == ELEMENT_TYPE_DEVICE and self._deviceTagExists(label, value): + return + if etype == ELEMENT_TYPE_INTERFACE and self._interfaceTagExists(label, value): + return + # create the tag + setRequest = TagConfigSetRequest() + wsID = self.ctx.getWorkspaceId() + setRequest.value.key.workspace_id.value = wsID + setRequest.value.key.element_type = etype + setRequest.value.key.label.value = label + setRequest.value.key.value.value = value + tagClient = self.ctx.getApiClient(TagConfigServiceStub) + tagClient.Set(setRequest) + + # The following are methods for device Tags - def _tagExists(self, label: str, value: str): + def _deviceTagExists(self, label: str, value: str): # Note use list instead of .items() # parallel thread might add/delete tags for dev in list(allTags := self._getAllDeviceTags()): @@ -85,7 +136,7 @@ def _tagExists(self, label: str, value: str): return True return False - def _tagAssigned(self, deviceId: str, label: str, value: str): + def _deviceTagAssigned(self, deviceId: str, label: str, value: str): if value in self._getDeviceTags(deviceId).get(label, []): return True return False @@ -145,32 +196,12 @@ def _getAllDeviceTagsFromMainline(self): self.relevantTagAssigns[deviceId][label].append(value) return self.relevantTagAssigns - def _getTagUpdatesFromWorkspace(self): - ''' - _getTagUpdatesFromWorkspace returns a list of tags updates - in the workspace. - The returned list is of the form: list[(deviceId, label, value, remove)] - ''' - workspaceTagUpdates = [] - tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub) - tagRequest = TagAssignmentConfigStreamRequest() - tagFilter = TagAssignmentConfig() - tagFilter.key.element_type = ELEMENT_TYPE_DEVICE - tagFilter.key.workspace_id.value = self.ctx.getWorkspaceId() - tagRequest.partial_eq_filter.append(tagFilter) - for resp in tagClient.GetAll(tagRequest): - workspaceTagUpdates.append((resp.value.key.device_id.value, - resp.value.key.label.value, - resp.value.key.value.value, - resp.value.remove.value)) - return workspaceTagUpdates - def _assignDeviceTagSet(self, deviceId: str, label: str, value: str): ''' _assignDeviceTagSet assigns a device tag if it isn't already assigned ''' # check if the tag is already assigned to this device - if self._tagAssigned(deviceId, label, value): + if self._deviceTagAssigned(deviceId, label, value): return # create the tag self._createTag(ELEMENT_TYPE_DEVICE, label, value) @@ -212,33 +243,13 @@ def _getAllDeviceTags(self): return self.relevantTagAssigns self._getAllDeviceTagsFromMainline() workspaceUpdates = self._getTagUpdatesFromWorkspace() - for (deviceId, label, value, remove) in workspaceUpdates: + for (deviceId, _, label, value, remove) in workspaceUpdates: if remove: self._unassignDevTagInCache(deviceId, label, value) else: self._assignDevTagInCache(deviceId, label, value) return self.relevantTagAssigns - def _createTag(self, etype: int, label: str, value: str): - ''' - _createTag creates a tag if it doesn't already exist - etype is a tags ElementType - ''' - if not label or not value: - raise TagOperationException(label, value, 'create') - # check if the tag exists - if self._tagExists(label, value): - return - # create the tag - setRequest = TagConfigSetRequest() - wsID = self.ctx.getWorkspaceId() - setRequest.value.key.workspace_id.value = wsID - setRequest.value.key.element_type = etype - setRequest.value.key.label.value = label - setRequest.value.key.value.value = value - tagClient = self.ctx.getApiClient(TagConfigServiceStub) - tagClient.Set(setRequest) - def _assignDeviceTag(self, deviceId: str, label: str, value: str, replaceValue: bool = True): ''' _assignDeviceTag assigns a device tag if it isn't already assigned, @@ -265,7 +276,7 @@ def _unassignDeviceTag(self, deviceId: str, label: str, value: str): if not label or not value or not deviceId: raise TagOperationException(label, value, 'unassign', deviceId) # check if the tag is assigned to this device - if not self._tagAssigned(deviceId, label, value): + if not self._deviceTagAssigned(deviceId, label, value): return # unassign the tag setRequest = TagAssignmentConfigSetRequest() @@ -290,3 +301,195 @@ def _unassignDeviceTagLabel(self, deviceId: str, label: str): raise TagOperationException(label, '', 'unassign', deviceId) for cvalue in current_values: self._unassignDeviceTag(deviceId, label, cvalue) + + # The following are methods for interface Tags + + def _interfaceTagExists(self, label: str, value: str): + # Note use list instead of .items() + # parallel thread might add/delete tags + for dev in list(allTags := self._getAllInterfaceTags()): + for intf in list(devIntfTags := allTags.get(dev, {})): + tags = devIntfTags.get(intf, {}) + if value in tags.get(label, []): + return True + return False + + def _interfaceTagAssigned(self, deviceId: str, interfaceId: str, label: str, value: str): + if value in self._getInterfaceTags(deviceId, interfaceId).get(label, []): + return True + return False + + def _assignIntfTagInCache(self, deviceId: str, interfaceId: str, label: str, value: str): + ''' + _assignIntfTagInCache modifies relevantIntfAssigns for the device tag + ensuring the tag is assigned to the interface in the local cache + ''' + if deviceId not in self.relevantIntfTagAssigns: + self.relevantIntfTagAssigns[deviceId] = {} + if interfaceId not in self.relevantIntfTagAssigns[deviceId]: + self.relevantIntfTagAssigns[deviceId][interfaceId] = {} + if label not in self.relevantIntfTagAssigns[deviceId][interfaceId]: + self.relevantIntfTagAssigns[deviceId][interfaceId][label] = [] + if value not in self.relevantIntfTagAssigns[deviceId][interfaceId][label]: + self.relevantIntfTagAssigns[deviceId][interfaceId][label].append(value) + + def _unassignIntfTagInCache(self, deviceId: str, interfaceId: str, label: str, value: str): + ''' + _unassignIntfTagInCache modifies relevantIntfTagAssigns for the interface tag + ensuring the tag is not assigned to the interface in the local cache + ''' + if deviceId not in self.relevantIntfTagAssigns: + return + if interfaceId not in self.relevantIntfTagAssigns[deviceId]: + return + if label not in self.relevantIntfTagAssigns[deviceId][interfaceId]: + return + if value not in self.relevantIntfTagAssigns[deviceId][interfaceId][label]: + return + self.relevantIntfTagAssigns[deviceId][interfaceId][label].remove(value) + if not self.relevantIntfTagAssigns[deviceId][interfaceId][label]: + self.relevantIntfTagAssigns[deviceId][interfaceId].pop(label, None) + if not self.relevantIntfTagAssigns[deviceId][interfaceId]: + self.relevantIntfTagAssigns[deviceId].pop(interfaceId, None) + if not self.relevantIntfTagAssigns[deviceId]: + self.relevantIntfTagAssigns.pop(deviceId, None) + + def _getAllInterfaceTagsFromMainline(self): + ''' + _getAllInterfaceTagsFromMainline returns a map of all assigned interface tags + available in the mainline. Also sets the local cache to this map. + The returned map is of the form: + map[deviceId]map[interfaceId]map[label]=[value1,value2,..] + ''' + self.relevantIntfTagAssigns = {} + tagClient = self.ctx.getApiClient(TagAssignmentServiceStub) + tagRequest = TagAssignmentStreamRequest() + tagFilter = TagAssignment() + tagFilter.tag_creator_type = CREATOR_TYPE_USER + tagFilter.key.element_type = ELEMENT_TYPE_INTERFACE + tagFilter.key.workspace_id.value = MAINLINE_ID + tagRequest.partial_eq_filter.append(tagFilter) + for resp in tagClient.GetAll(tagRequest): + label = resp.value.key.label.value + value = resp.value.key.value.value + deviceId = resp.value.key.device_id.value + interfaceId = resp.value.key.interface_id.value + if deviceId not in self.relevantIntfTagAssigns: + self.relevantIntfTagAssigns[deviceId] = {} + if interfaceId not in self.relevantIntfTagAssigns[deviceId]: + self.relevantIntfTagAssigns[deviceId][interfaceId] = {} + if label not in self.relevantIntfTagAssigns[deviceId][interfaceId]: + self.relevantIntfTagAssigns[deviceId][interfaceId][label] = [] + if value not in self.relevantIntfTagAssigns[deviceId][interfaceId][label]: + self.relevantIntfTagAssigns[deviceId][interfaceId][label].append(value) + return self.relevantIntfTagAssigns + + def _setRelevantInterfaceTagAssigns(self, tags: Dict): + ''' + Sets the relevantIntfTagAssigns of the context. + Called during context initialisation during script execution as optimization + Does not need to be called by the script writers + ''' + self.relevantIntfTagAssigns = tags + + def _getInterfaceTags(self, deviceId: str, interfaceId: str): + ''' + _getInterfaceTags returns the relevant assigned tags for the interface. + The returned map is of the form: map[label]=[value1,value2,..] + ''' + return self._getAllInterfaceTags().get(deviceId, {}).get(interfaceId, {}) + + def _getAllInterfaceTags(self): + ''' + _getAllInterfaceTags returns a map of all assigned device tags available + in the workspace. The returned map is of the form: + map[deviceId]map[interfaceId]map[label]=[value1,value2,..] + ''' + if self.relevantIntfTagAssigns: + return self.relevantIntfTagAssigns + self._getAllInterfaceTagsFromMainline() + workspaceUpdates = self._getTagUpdatesFromWorkspace(ELEMENT_TYPE_INTERFACE) + for (deviceId, interfaceId, label, value, remove) in workspaceUpdates: + if remove: + self._unassignIntfTagInCache(deviceId, interfaceId, label, value) + else: + self._assignIntfTagInCache(deviceId, interfaceId, label, value) + return self.relevantIntfTagAssigns + + def _assignInterfaceTagSet(self, deviceId: str, interfaceId: str, label: str, value: str): + ''' + _assignInterfaceTagSet assigns a interface tag if it isn't already assigned + ''' + # check if the tag is already assigned to this device + if self._interfaceTagAssigned(deviceId, interfaceId, label, value): + return + # create the tag + self._createTag(ELEMENT_TYPE_INTERFACE, label, value) + # assign the tag + setRequest = TagAssignmentConfigSetRequest() + wsID = self.ctx.getWorkspaceId() + setRequest.value.key.workspace_id.value = wsID + setRequest.value.key.element_type = ELEMENT_TYPE_INTERFACE + setRequest.value.key.label.value = label + setRequest.value.key.value.value = value + setRequest.value.key.device_id.value = deviceId + setRequest.value.key.interface_id.value = interfaceId + setRequest.value.remove.value = False + tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub) + tagClient.Set(setRequest) + # assign the tag in cache + self._assignIntfTagInCache(deviceId, interfaceId, label, value) + + def _assignInterfaceTag(self, deviceId: str, interfaceId: str, label: str, value: str, + replaceValue: bool = True): + ''' + _assignInterfaceTag assigns a interface tag if it isn't already assigned, + enforcing that only one value of the tag is assigned to the interface, + unless the replaceValue argument is set to False + ''' + # first make sure this device's tags have been loaded in cache + self._getInterfaceTags(deviceId, interfaceId) + if not label or not value or not deviceId or not interfaceId: + raise TagOperationException(label, value, 'assign', deviceId, interfaceId) + if replaceValue: + current_values = list(self._getInterfaceTags(deviceId, interfaceId).get(label, [])) + for cvalue in current_values: + if cvalue != value: + self._unassignInterfaceTag(deviceId, interfaceId, label, cvalue) + self._assignInterfaceTagSet(deviceId, interfaceId, label, value) + + def _unassignInterfaceTag(self, deviceId: str, interfaceId: str, label: str, value: str): + ''' + _unassignInterfaceTag unassigns a interface tag if it is assigned + ''' + # first make sure this device's tags have been loaded in cache + self._getInterfaceTags(deviceId, interfaceId) + if not label or not value or not deviceId or not interfaceId: + raise TagOperationException(label, value, 'unassign', deviceId, interfaceId) + # check if the tag is assigned to this interface + if not self._interfaceTagAssigned(deviceId, interfaceId, label, value): + return + # unassign the tag + setRequest = TagAssignmentConfigSetRequest() + wsID = self.ctx.getWorkspaceId() + setRequest.value.key.workspace_id.value = wsID + setRequest.value.key.element_type = ELEMENT_TYPE_INTERFACE + setRequest.value.key.label.value = label + setRequest.value.key.value.value = value + setRequest.value.key.device_id.value = deviceId + setRequest.value.key.interface_id.value = interfaceId + setRequest.value.remove.value = True + tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub) + tagClient.Set(setRequest) + # unassign the tag in cache + self._unassignIntfTagInCache(deviceId, interfaceId, label, value) + + def _unassignInterfaceTagLabel(self, deviceId: str, interfaceId: str, label: str): + ''' + _unassignInterfaceTagLabel unassigns all interface tags of a label + ''' + current_values = list(self._getInterfaceTags(deviceId, interfaceId).get(label, [])) + if not label or not deviceId or not interfaceId: + raise TagOperationException(label, '', 'unassign', deviceId, interfaceId) + for cvalue in current_values: + self._unassignInterfaceTag(deviceId, interfaceId, label, cvalue) diff --git a/test/cvlib/tags/test_interfaceTags.py b/test/cvlib/tags/test_interfaceTags.py new file mode 100644 index 00000000..3b9e6b36 --- /dev/null +++ b/test/cvlib/tags/test_interfaceTags.py @@ -0,0 +1,1002 @@ +# Copyright (c) 2023 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the COPYING file. + +"""tests for the template access to interface class tags methods""" + +import pytest + +from cloudvision.cvlib import ( + Context, + Device, + Topology, + Tags, + Tag, + TagErrorException, + TagOperationException, + TagMissingException, + TagTooManyValuesException +) + +from arista.tag.v2.services import ( + TagAssignmentServiceStub, + TagAssignmentConfigServiceStub, + TagAssignmentStreamResponse, + TagAssignmentConfigStreamResponse +) + + +def convertListToStream(assignmentList): + stream = [] + for assign in assignmentList: + device, interface, tag, value = assign + item = TagAssignmentStreamResponse() + item.value.key.device_id.value = device + item.value.key.interface_id.value = interface + item.value.key.label.value = tag + item.value.key.value.value = value + stream.append(item) + return stream + + +def convertListToConfigStream(assignmentConfigList): + stream = [] + for assign in assignmentConfigList: + device, interface, tag, value, remove = assign + item = TagAssignmentConfigStreamResponse() + item.value.key.device_id.value = device + item.value.key.interface_id.value = interface + item.value.key.label.value = tag + item.value.key.value.value = value + item.value.remove.value = remove + stream.append(item) + return stream + + +class mockStudio: + def __init__(self): + self.workspaceId = "workspace1" + + +class mockClient: + def __init__(self): + self.stub = None + self.tagResponse = convertListToStream([]) + self.tagConfigResponse = convertListToConfigStream([]) + self.numGetAlls = 0 + self.numSets = 0 + + def GetAll(self, request): + labelFilters = [] + for afilter in request.partial_eq_filter: + if afilter.key.label.value: + labelFilters.append(afilter.key.label.value) + if self.stub == TagAssignmentServiceStub: + self.numGetAlls += 1 + response = self.tagResponse + elif self.stub == TagAssignmentConfigServiceStub: + self.numGetAlls += 1 + response = self.tagConfigResponse + if labelFilters: + for item in list(response): + if item.value.key.label.value not in labelFilters: + response.remove(item) + return response + + def SetGetAllResponse(self, response): + self.tagResponse = response + + def SetGetAllConfigResponse(self, response): + self.tagConfigResponse = response + + def Set(self, request): + self.numSets += 1 + return + + +class mockCtx(Context): + def __init__(self): + super().__init__('user') + self.client = mockClient() + self.studio = mockStudio() + self.device = Device() + + def getApiClient(self, stub): + self.client.stub = stub + return self.client + + +getSingleTagCases = [ + # name + # cached tags + # tagv2 GetAll response + # topology + # device to get + # interface to get + # tag label + # required + # expected num GetAlls (includes mainline and workspace) + # expected Tag + # err + [ + "get tag that is assigned correctly to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'DC', + True, + 2, + Tag('DC', 'DC1'), + None + ], + [ + "get tag that is assigned to interface with too many values", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC', 'DC2'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'DC', + True, + 2, + None, + TagTooManyValuesException('DC', 'J1', ['DC1', 'DC2'], 'Ethernet1') + ], + [ + "get required tag that isn't assigned to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'Role', + True, + 2, + None, + TagMissingException('Role', 'J1', 'Ethernet1') + ], + [ + "get unrequired tag that isn't assigned to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'Leaf-Domain', + False, + 2, + None, + None, + ], + [ + "try get required tag for interface that has no tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J3', + 'Ethernet1', + 'DC', + True, + 2, + None, + TagMissingException('DC', 'J3', 'Ethernet1') + ], + [ + "try get unrequired tag for interface that has no tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J3', + 'Ethernet1', + 'DC', + False, + 2, + None, + None + ], + [ + "get tag assigned correctly to interface with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'DC', + True, + 0, + Tag('DC', 'DC1'), + None + ], + [ + "get tag assigned to interface with too many values with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC', 'DC2'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet2', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet2', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'DC', + True, + 0, + None, + TagTooManyValuesException('DC', 'J1', ['DC1', 'DC2'], 'Ethernet1') + ], +] + + +@pytest.mark.parametrize('name, cacheTags, getAllResp, topoDevices, deviceId, ' + + 'interfaceId, label, required, expNumGetAlls, expectedTag, ' + + 'expectedError', + getSingleTagCases) +def test_getSingleTag(name, cacheTags, getAllResp, topoDevices, deviceId, + interfaceId, label, required, expNumGetAlls, expectedTag, + expectedError): + error = None + tag = None + ctx = mockCtx() + deviceMap = {} + for dev, interfaces in topoDevices.items(): + deviceMap[dev] = Device(deviceId=dev) + for interface in interfaces: + deviceMap[dev].addInterface(interface) + topology = Topology(deviceMap) + ctx.setTopology(topology) + ctx.client.SetGetAllResponse(getAllResp) + ctx.tags._setRelevantInterfaceTagAssigns(cacheTags) + intf = topology.getDevices([deviceId])[0].getInterface(interfaceId) + try: + tag = intf.getSingleTag(ctx, label, required=required) + except Exception as e: + error = e + if error or expectedError: + assert str(error) == str(expectedError) + else: + assert tag == expectedTag + assert ctx.client.numGetAlls == expNumGetAlls + + +getTagsCases = [ + # name + # cached tags + # tagv2 GetAll response + # devices in topology + # device to get + # interface to get + # label + # expected num GetAlls (includes mainline and workspace) + # expected Tags + # expected Error + [ + "get all interface tags for interface that has tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + None, + 2, + [Tag('DC', 'DC1'), Tag('DC-Pod', 'POD1'), Tag('NodeId', '1')], + None + ], + [ + "get specific label interface tags for interface that has tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + 'DC', + 2, + [Tag('DC', 'DC1')], + None + ], + [ + "get all interface tags for dev3 ethernet1 that has no tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J3', + 'Ethernet1', + None, + 2, + [], + None + ], + [ + "get specific label interface tags for dev3 ethernet1 that has no tags", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J3', + 'Ethernet1', + 'DC', + 2, + [], + None + ], + [ + "get all interface tags for interface that has tags with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + None, + 0, + [Tag('DC', 'DC1'), Tag('DC-Pod', 'POD1'), Tag('NodeId', '1')], + None + ], +] + + +@pytest.mark.parametrize('name, cacheTags, getAllResp, topoDevices, deviceId, ' + + 'interfaceId, label, expNumGetAlls, expectedTags, ' + + 'expectedError', + getTagsCases) +def test_getTags(name, cacheTags, getAllResp, topoDevices, deviceId, + interfaceId, label, expNumGetAlls, expectedTags, + expectedError): + error = None + intfTags = None + ctx = mockCtx() + deviceMap = {} + for dev, interfaces in topoDevices.items(): + deviceMap[dev] = Device(deviceId=dev) + for interface in interfaces: + deviceMap[dev].addInterface(interface) + topology = Topology(deviceMap) + ctx.setTopology(topology) + ctx.client.SetGetAllResponse(getAllResp) + ctx.tags._setRelevantInterfaceTagAssigns(cacheTags) + intf = topology.getDevices([deviceId])[0].getInterface(interfaceId) + try: + intfTags = intf.getTags(ctx, label=label) + except Exception as e: + error = e + if error or expectedError: + assert str(error) == str(expectedError) + else: + assert intfTags == expectedTags + assert ctx.client.numGetAlls == expNumGetAlls + + +assignTagsCases = [ + # name + # cached tags + # tagv2 GetAll response + # devices in topology + # device to use + # interface to use + # tag + # replace flag + # expected num GetAlls (includes mainline and workspace) + # expected num Sets (includes tag creation and assignment) + # expected Tags + # expected Error + [ + "assign new tag to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('Role', 'Spine'), + False, + 2, + 2, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + Tag('Role', 'Spine'), + ], + None + ], + [ + "assign a new value of existing label to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC2'), + False, + 2, + 2, + [ + Tag('DC', 'DC1'), + Tag('DC', 'DC2'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "replace value with new value of existing label for interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC2'), + True, + 2, + 3, + [ + Tag('DC', 'DC2'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "replace value with already assigned value of another interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('NodeId', '2'), + True, + 2, + 2, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '2'), + ], + None + ], + [ + "assign already assigned tag to same interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC1'), + True, + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "assign tag with invalid label to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('', 'Spine'), + False, + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + TagOperationException('', 'Spine', 'assign', 'J1', 'Ethernet1') + ], + [ + "assign tag with invalid value to interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', ''), + True, + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + TagOperationException('DC', '', 'assign', 'J1', 'Ethernet1') + ], + [ + "assign new tag to interface with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('Role', 'Spine'), + False, + 0, + 2, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + Tag('Role', 'Spine'), + ], + None + ], + [ + "replace value with new value of existing label for interface with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC2'), + True, + 0, + 3, + [ + Tag('DC', 'DC2'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], +] + + +@pytest.mark.parametrize('name, cacheTags, getAllResp, topoDevices, deviceId, ' + + 'interfaceId, tag, replace, expNumGetAlls, expNumSets, ' + + 'expectedTags, expectedError', assignTagsCases) +def test_assignTags(name, cacheTags, getAllResp, topoDevices, deviceId, + interfaceId, tag, replace, expNumGetAlls, expNumSets, + expectedTags, expectedError): + error = None + ctx = mockCtx() + deviceMap = {} + for dev, interfaces in topoDevices.items(): + deviceMap[dev] = Device(deviceId=dev) + for interface in interfaces: + deviceMap[dev].addInterface(interface) + topology = Topology(deviceMap) + ctx.setTopology(topology) + ctx.client.SetGetAllResponse(getAllResp) + ctx.tags._setRelevantInterfaceTagAssigns(cacheTags) + intf = topology.getDevices([deviceId])[0].getInterface(interfaceId) + try: + intf._assignTag(ctx, tag, replaceValue=replace) + except Exception as e: + error = e + tags = intf.getTags(ctx) + if error or expectedError: + assert str(error) == str(expectedError) + assert sorted(tags) == sorted(expectedTags) + assert ctx.client.numGetAlls == expNumGetAlls + assert ctx.client.numSets == expNumSets + + +unassignTagsCases = [ + # name + # cached tags + # tagv2 GetAll response + # devices in topology + # device to use + # interface to use + # tag + # expected num GetAlls (includes mainline and workspace) + # expected num Sets (includes tag creation and assignment) + # expected Tags + # expected Error + [ + "unassign tag from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC1'), + 2, + 1, + [ + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign one of two values for a label from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC', 'DC2'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC2'), + 2, + 1, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign all values for a label from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC', 'DC2'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', ''), + 2, + 2, + [ + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign a value that's not assigned for a label from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC2'), + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign a tag that's not assigned from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('Role', ''), + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign tag with invalid label from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag(None, None), # type: ignore + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + TagOperationException('', '', 'unassign', 'J1', 'Ethernet1') + ], + [ + "unassign tag with invalid label and valid value from interface", + { + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag(None, 'Spine'), # type: ignore + 2, + 0, + [ + Tag('DC', 'DC1'), + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + TagOperationException(None, 'Spine', 'unassign', 'J1', 'Ethernet1') # type: ignore + ], + [ + "unassign tag from interface with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', 'DC1'), + 0, + 1, + [ + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], + [ + "unassign all values for a label from interface with preloaded cache", + { + 'J1': {'Ethernet1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'J2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + }, + convertListToStream([('J1', 'Ethernet1', 'DC', 'DC1'), + ('J1', 'Ethernet1', 'DC', 'DC2'), + ('J1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J1', 'Ethernet1', 'NodeId', '1'), + ('J2', 'Ethernet1', 'DC', 'DC1'), + ('J2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('J2', 'Ethernet1', 'NodeId', '2'), + ]), + {'J1': ['Ethernet1'], 'J2': ['Ethernet1'], 'J3': ['Ethernet1']}, + 'J1', + 'Ethernet1', + Tag('DC', ''), + 0, + 2, + [ + Tag('DC-Pod', 'POD1'), + Tag('NodeId', '1'), + ], + None + ], +] + + +@pytest.mark.parametrize('name, cacheTags, getAllResp, topoDevices, deviceId, ' + + 'interfaceId, tag, expNumGetAlls, expNumSets, expectedTags, ' + + 'expectedError', unassignTagsCases) +def test_unassignTags(name, cacheTags, getAllResp, topoDevices, deviceId, + interfaceId, tag, expNumGetAlls, expNumSets, expectedTags, + expectedError): + error = None + ctx = mockCtx() + deviceMap = {} + for dev, interfaces in topoDevices.items(): + deviceMap[dev] = Device(deviceId=dev) + for interface in interfaces: + deviceMap[dev].addInterface(interface) + topology = Topology(deviceMap) + ctx.setTopology(topology) + ctx.client.SetGetAllResponse(getAllResp) + ctx.tags._setRelevantInterfaceTagAssigns(cacheTags) + intf = topology.getDevices([deviceId])[0].getInterface(interfaceId) + try: + intf._unassignTag(ctx, tag) + except Exception as e: + error = e + tags = intf.getTags(ctx) + if error or expectedError: + assert str(error) == str(expectedError) + assert sorted(tags) == sorted(expectedTags) + assert ctx.client.numGetAlls == expNumGetAlls + assert ctx.client.numSets == expNumSets diff --git a/test/cvlib/tags/test_tags.py b/test/cvlib/tags/test_tags.py index 209bd935..19706db8 100644 --- a/test/cvlib/tags/test_tags.py +++ b/test/cvlib/tags/test_tags.py @@ -9,6 +9,7 @@ from cloudvision.cvlib import ( Context, Device, + Interface, Topology, Tags, Tag @@ -25,11 +26,18 @@ def convertListToStream(assignmentList): stream = [] for assign in assignmentList: - device, tag, value = assign item = TagAssignmentStreamResponse() - item.value.key.device_id.value = device - item.value.key.label.value = tag - item.value.key.value.value = value + if len(assign) == 3: + device, tag, value = assign + item.value.key.device_id.value = device + item.value.key.label.value = tag + item.value.key.value.value = value + elif len(assign) == 4: + device, interface, tag, value = assign + item.value.key.device_id.value = device + item.value.key.interface_id.value = interface + item.value.key.label.value = tag + item.value.key.value.value = value stream.append(item) return stream @@ -37,12 +45,20 @@ def convertListToStream(assignmentList): def convertListToConfigStream(assignmentConfigList): stream = [] for assign in assignmentConfigList: - device, tag, value, remove = assign item = TagAssignmentConfigStreamResponse() - item.value.key.device_id.value = device - item.value.key.label.value = tag - item.value.key.value.value = value - item.value.remove.value = remove + if len(assign) == 4: + device, tag, value, remove = assign + item.value.key.device_id.value = device + item.value.key.label.value = tag + item.value.key.value.value = value + item.value.remove.value = remove + elif len(assign) == 5: + device, interface, tag, value, remove = assign + item.value.key.device_id.value = device + item.value.key.interface_id.value = interface + item.value.key.label.value = tag + item.value.key.value.value = value + item.value.remove.value = remove stream.append(item) return stream @@ -791,3 +807,280 @@ def test_getDevicesByTag(name, cacheTags, getAllResp, topoDevices, tag, assert str(error) == str(expectedError) assert devices == expectedDevices assert ctx.client.numGetAlls == expNumGetAlls + + +getInterfacesByTagCases = [ + # name + # cached tags + # tagv2 GetAll response + # devices in topology + # tag + # topology flag + # num GetAll calls + # expected interfaces + # expected Error + [ + "get interfaces matching label with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('DC', ''), + True, + 0, + [Interface('Ethernet1', Device('dev1')), + Interface('Ethernet1', Device('dev2')), + Interface('Ethernet1', Device('dev3'))], + None + ], + [ + "try get interfaces not matching label with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('Role', ''), + True, + 0, + [], + None + ], + [ + "get interfaces matching label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('DC', 'DC1'), + True, + 0, + [Interface('Ethernet1', Device('dev1')), + Interface('Ethernet1', Device('dev2'))], + None + ], + [ + "get only interfaces of devices in topology matching label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('DC', 'DC1'), + True, + 0, + [Interface('Ethernet1', Device('dev1'))], + None + ], + [ + "get only interfaces in topology matching label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet2'], 'dev3': ['Ethernet1']}, + Tag('DC', 'DC1'), + True, + 0, + [Interface('Ethernet1', Device('dev1'))], + None + ], + [ + "get interfaces of devices not in topology matching label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('DC', 'DC1'), + False, + 0, + [Interface('Ethernet1', Device('dev1')), + Interface('Ethernet1', Device('dev2'))], + None + ], + [ + "get interfaces not in topology matching label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': [], 'dev3': ['Ethernet1']}, + Tag('DC', 'DC1'), + False, + 0, + [Interface('Ethernet1', Device('dev1')), + Interface('Ethernet1', Device('dev2'))], + None + ], + [ + "try get interfaces with None label and value with preloaded cache", + { + 'dev1': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + 'dev2': {'Ethernet1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']}}, + 'dev3': {'Ethernet1': {'DC': ['DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']}}, + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ('dev3', 'Ethernet1', 'DC', 'DC2'), + ('dev3', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev3', 'Ethernet1', 'NodeId', '1'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1'], 'dev3': ['Ethernet1']}, + Tag('', 'DC1'), + True, + 0, + [], + None + ], + [ + "get interfaces matching label without preloaded cache", + { + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1']}, + Tag('DC', ''), + True, + 2, + [Interface('Ethernet1', Device('dev1')), + Interface('Ethernet1', Device('dev2'))], + None + ], + [ + "try get interfaces not matching label without preloaded cache", + { + }, + convertListToStream([('dev1', 'Ethernet1', 'DC', 'DC1'), + ('dev1', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev1', 'Ethernet1', 'NodeId', '1'), + ('dev2', 'Ethernet1', 'DC', 'DC1'), + ('dev2', 'Ethernet1', 'DC-Pod', 'POD1'), + ('dev2', 'Ethernet1', 'NodeId', '2'), + ]), + {'dev1': ['Ethernet1'], 'dev2': ['Ethernet1']}, + Tag('Role', ''), + True, + 2, + [], + None + ], +] + + +@pytest.mark.parametrize('name, cacheTags, getAllResp, topoDevices, tag, ' + + 'topoFlag, expNumGetAlls, expectedInterfaces, ' + + 'expectedError', getInterfacesByTagCases) +def test_getInterfacesByTag(name, cacheTags, getAllResp, topoDevices, tag, + topoFlag, expNumGetAlls, expectedInterfaces, + expectedError): + error = None + tagInterfaces = [] + ctx = mockCtx() + deviceMap = {} + for dev, interfaces in topoDevices.items(): + deviceMap[dev] = Device(deviceId=dev) + for interface in interfaces: + deviceMap[dev].addInterface(interface) + topology = Topology(deviceMap) + ctx.setTopology(topology) + ctx.client.SetGetAllResponse(getAllResp) + ctx.tags._setRelevantInterfaceTagAssigns(cacheTags) + try: + tagInterfaces = ctx.getInterfacesByTag(tag, inTopology=topoFlag) + except Exception as e: + error = e + if error or expectedError: + assert str(error) == str(expectedError) + intfList = [(intf.name, intf._device.id) for intf in tagInterfaces or []] + expIntfList = [(intf.name, intf._device.id) for intf in expectedInterfaces or []] + assert intfList == expIntfList + assert ctx.client.numGetAlls == expNumGetAlls