diff --git a/qubesadmin/device_protocol.py b/qubesadmin/device_protocol.py index 92754126..c72d8a65 100644 --- a/qubesadmin/device_protocol.py +++ b/qubesadmin/device_protocol.py @@ -641,29 +641,29 @@ class DeviceCategory(Enum): """ # pylint: disable=invalid-name - Other = "*******" + Other = ("*******", ) # also matches all devices, if used to block - Communication = ("u02****", "p07****") # eg. modems + # The following devices are used in GUI for blocks; take note when changing + + # modems, WiFi and Ethernet adapters + Network = ("u02****", "p0703**", "p02****", "ue0****") Input = ("u03****", "p09****") # HID etc. Keyboard = ("u03**01", "p0900**") Mouse = ("u03**02", "p0902**") Printer = ("u07****",) - Scanner = ("p0903**",) - Microphone = ("m******",) + Image_Input = ("p0903**", "u06****", "u0e****") # cameras and scanners + + Audio = ("p0403**", "p0401**", "p0408**", "u01****", "m******") # Multimedia = Audio, Video, Displays etc. - Multimedia = ( - "u06****", - "u10****", - "p03****", - "p04****", - ) - Audio = ("p0403**", "p0401**", "u01****") - Display = ("p0300**", "p0380**") - Video = ("p0400**", "u0e****") - Wireless = ("ue0****", "p0d****") - Bluetooth = ("ue00101", "p0d11**") + Multimedia = ("u10****", "p03****", "p04****") + Microphone = ("m******",) + USB_Storage = ("u08****", ) + Block_Storage = ("b******", ) Storage = ("b******", "u08****", "p01****") - Network = ("p02****",) + Bluetooth = ("ue00101", "p0d11**") + Smart_Card_Readers = ("u0b****", ) + + Display = ("p0300**", "p0380**") # PCI screens? Memory = ("p05****",) PCI_Bridge = ("p06****",) Docking_Station = ("p0a****",) @@ -677,7 +677,7 @@ def from_str(interface_encoding: str) -> "DeviceCategory": Returns `DeviceCategory` from data encoded in string. """ result = DeviceCategory.Other - if len(interface_encoding) != len(DeviceCategory.Other.value): + if len(interface_encoding) != len(DeviceCategory.Other.value[0]): return result best_score = 0 diff --git a/qubesadmin/tests/mock_app.py b/qubesadmin/tests/mock_app.py index 4438afb7..c9939e28 100644 --- a/qubesadmin/tests/mock_app.py +++ b/qubesadmin/tests/mock_app.py @@ -63,7 +63,7 @@ def main(): from unittest import mock from copy import deepcopy -from typing import List, Optional, Dict, Tuple +from typing import List, Optional, Dict, Tuple, Any import qubesadmin.events from qubesadmin.tests import QubesTest @@ -72,9 +72,11 @@ def main(): # Helper methods + def wrapper(method): """Very simple wrapper that prints arguments with which the wrapped method was called. Used to wrap qubesd calls to extend this library.""" + def _wrapped_method(*args, **kwargs): if kwargs: print(args, kwargs) @@ -82,6 +84,7 @@ def _wrapped_method(*args, **kwargs): print(args) return_val = method(*args, **kwargs) return return_val + return _wrapped_method @@ -89,6 +92,7 @@ class Property: """ Qubes property; holds information on property type and if it's the default. """ + def __init__(self, value: str, prop_type: str = str, default: bool = False): """ :param value: property value as string @@ -116,6 +120,7 @@ def default_string(self): "debug": Property("False", "bool", True), "default_dispvm": Property("default-dvm", "vm", False), "default_user": Property("user", "str", True), + "devices_denied": Property("", "str", True), "dns": Property("10.139.1.1 10.139.1.2", "str", True), "gateway": Property("", "str", True), "gateway6": Property("", "str", True), @@ -171,14 +176,14 @@ def default_string(self): } DEFAULT_DOM0_FEATURES = { - 'config-usbvm-name': None, - 'config.default.qubes-update-check': None, - 'gui-default-allow-fullscreen': '', - 'gui-default-allow-utf8-titles': '', - 'gui-default-secure-copy-sequence': 'Ctrl-c', - 'gui-default-secure-paste-sequence': 'Ctrl-v', - 'gui-default-trayicon-mode': '', - 'service.qubes-update-check': 1, + "config-usbvm-name": None, + "config.default.qubes-update-check": None, + "gui-default-allow-fullscreen": "", + "gui-default-allow-utf8-titles": "", + "gui-default-secure-copy-sequence": "Ctrl-c", + "gui-default-secure-paste-sequence": "Ctrl-v", + "gui-default-trayicon-mode": "", + "service.qubes-update-check": 1, } GLOBAL_PROPERTIES = { @@ -197,26 +202,58 @@ def default_string(self): } ALL_KNOWN_FEATURES = [ - 'updates-available', 'internal', "servicevm", 'appmenus-dispvm', 'os-eol', - 'supported-service.shutdown-idle', 'os', 'gui-allow-fullscreen', - 'gui-allow-utf8-titles', 'qubes-firewall', 'service.shutdown-idle', - 'supported-service.qubes-updates-proxy', 'service.clocksync', - 'supported-service.clocksync', 'skip-update', 'boot-mode.active', - 'boot-mode.appvm-default', 'boot-mode.name.default', - 'boot-mode.kernelopts.mode1', 'boot-mode.kernelopts.mode2', - 'boot-mode.name.mode1', 'boot-mode.name.mode2', - 'service.updates-proxy-setup' + "updates-available", + "internal", + "servicevm", + "appmenus-dispvm", + "os-eol", + "supported-service.shutdown-idle", + "os", + "gui-allow-fullscreen", + "gui-allow-utf8-titles", + "qubes-firewall", + "service.shutdown-idle", + "supported-service.qubes-updates-proxy", + "service.clocksync", + "supported-service.clocksync", + "skip-update", + "boot-mode.active", + "boot-mode.appvm-default", + "boot-mode.name.default", + "boot-mode.kernelopts.mode1", + "boot-mode.kernelopts.mode2", + "boot-mode.name.mode1", + "boot-mode.name.mode2", + "service.updates-proxy-setup", + "qubes-vm-update-update-if-stale", + "restart-after-update", + "qubes-vm-update-hide-skipped", + "template-name", + "last-updates-check", + "last-update", + "prohibit-start", + "qubes-vm-update-hide-updated", + "qubes-vm-update-restart-servicevms", + "qubes-vm-update-restart-system", + "qubes-vm-update-restart-other", + "qubes-vm-update-max-concurrency", ] -POSSIBLE_TAGS = ['whonix-updatevm', 'anon-gateway', 'anon-vm'] +POSSIBLE_TAGS = ["whonix-updatevm", "anon-gateway", "anon-vm"] class VolumeInfo: """Class that handles the qubesdb VolumeInfo string.""" + # TODO: enhancements: obsolete volumes - def __init__(self, qube_name: str, volume_type: str, - outdated: bool = False, usage_perc: Optional[float] = 0.0, - save_on_stop: bool = False): + def __init__( + self, + qube_name: str, + volume_type: str, + outdated: bool = False, + usage_perc: Optional[float] = 0.0, + save_on_stop: bool = False, + ): """ :param qube_name: name of the associated qube :param volume_type: one of the following: @@ -228,7 +265,7 @@ def __init__(self, qube_name: str, volume_type: str, self.qube_name = qube_name self.volume_type = volume_type self.outdated = outdated - self.size = 2 ** 30 + self.size = 2**30 self.usage = int(self.size * usage_perc) self.save_on_stop = save_on_stop @@ -237,37 +274,39 @@ def response_string(self): return b"0\x00" + str(self).encode() def __str__(self): - if self.volume_type == 'kernel': + if self.volume_type == "kernel": result = "pool=linux-kernel\n" - result += 'vid=1.1\n' - result += 'size=0\n' - result += 'usage=0\n' + result += "vid=1.1\n" + result += "size=0\n" + result += "usage=0\n" result += "rw=False\n" else: result = "pool=vm-pool\n" - result += f'vid=qubes_dom0/vm-{self.qube_name}-{self.volume_type}\n' - result += f'size={self.size}\n' - result += f'usage={self.usage}\n' + result += f"vid=qubes_dom0/vm-{self.qube_name}-{self.volume_type}\n" + result += f"size={self.size}\n" + result += f"usage={self.usage}\n" result += "rw=True\n" - if self.volume_type == 'root': + if self.volume_type == "root": result += "source=qubes_dom0/vm-template-root\n" else: result += "source=\n" - if self.volume_type == 'kernel': - result += 'path=/var/lib/qubes/vm-kernels/1.1/modules.img\n' + if self.volume_type == "kernel": + result += "path=/var/lib/qubes/vm-kernels/1.1/modules.img\n" else: - name = self.qube_name.replace('-', '--') - result += f'path=/dev/mapper/' \ - f'qubes_dom0-vm--{name}--{self.volume_type}\n' + name = self.qube_name.replace("-", "--") + result += ( + f"path=/dev/mapper/" + f"qubes_dom0-vm--{name}--{self.volume_type}\n" + ) - if self.volume_type == 'private': + if self.volume_type == "private": result += "save_on_stop=True\n" else: result += f"save_on_stop={self.save_on_stop}\n" - if self.volume_type == 'root': + if self.volume_type == "root": result += "snap_on_start=True\n" else: result += "snap_on_start=False\n" @@ -283,14 +322,20 @@ class MockQube: """Object that helps generate qube-related calls. Initializing the object already adds all relevant calls to the qapp object. If changes were made, run update_calls to notify the qapp object of them.""" - def __init__(self, name: str, qapp: QubesTest, - klass: str = 'AppVM', running: bool = False, - features: Optional[Dict] = None, - usage: Optional[float] = 0.0, - tags: Optional[List] = None, - firewall_rules: Optional[List[Dict[str, str]]] = None, - override_default_props: Optional[Dict] = None, - **kwargs): + + def __init__( + self, + name: str, + qapp: QubesTest, + klass: str = "AppVM", + running: bool = False, + features: Optional[Dict] = None, + usage: Optional[float] = 0.0, + tags: Optional[List] = None, + firewall_rules: Optional[List[Dict[str, str]]] = None, + override_default_props: Optional[Dict] = None, + **kwargs, + ): """ Creates a mock qube object and updates all relevant calls. :param name: qube name @@ -316,8 +361,9 @@ def __init__(self, name: str, qapp: QubesTest, self.running = running self.usage = usage self.features = features if features else {} - self.firewall_rules = firewall_rules if firewall_rules \ - else [{'action': 'accept'}] + self.firewall_rules = ( + firewall_rules if firewall_rules else [{"action": "accept"}] + ) self.tags = tags if tags else [] self._add_to_vm_list(name, klass) @@ -330,19 +376,21 @@ def __init__(self, name: str, qapp: QubesTest, self.properties[prop].default = is_default else: self.properties[prop].value = str(value) - self.properties[prop].default = \ - (str(value) == DEFAULT_VM_PROPERTIES.get(prop).default) + self.properties[prop].default = ( + str(value) == DEFAULT_VM_PROPERTIES.get(prop).default + ) - if self.klass == 'AdminVM': - self.properties["icon"].value = 'adminvm-black' + if self.klass == "AdminVM": + self.properties["icon"].value = "adminvm-black" else: - self.properties["icon"].value = self.klass.lower() + "-" + \ - self.properties["label"].value + self.properties["icon"].value = ( + self.klass.lower() + "-" + self.properties["label"].value + ) self.update_calls() def __setattr__(self, key, value): - if key != 'properties' and key in self.properties: + if key != "properties" and key in self.properties: self.properties[key].value = str(value) self.properties[key].default = False else: @@ -363,13 +411,13 @@ def set_property_default(self, prop, value): def _add_to_vm_list(self, name: str, klass: str): """Modify existing VM list call to add this qube. Should not be called twice.""" - state = 'Running' if self.running else 'Halted' + state = "Running" if self.running else "Halted" - vm_list_call = ('dom0', 'admin.vm.List', None, None) - vm_list = b'0\x00' + vm_list_call = ("dom0", "admin.vm.List", None, None) + vm_list = b"0\x00" if vm_list_call in self.qapp.expected_calls: vm_list = self.qapp.expected_calls[vm_list_call] - list_call = f'{name} class={klass} state={state}\n'.encode() + list_call = f"{name} class={klass} state={state}\n".encode() vm_list += list_call self.qapp.expected_calls[vm_list_call] = vm_list @@ -381,40 +429,42 @@ def update_calls(self): # create power state call if self.running: - response = b'0\x00power_state=Running' + response = b"0\x00power_state=Running" else: - response = b'0\x00power_state=Halted' - self.qapp.expected_calls[(self.name, 'admin.vm.CurrentState', None, - None)] = response + response = b"0\x00power_state=Halted" + self.qapp.expected_calls[ + (self.name, "admin.vm.CurrentState", None, None) + ] = response # create all propertyget calls for prop, value in self.properties.items(): - if ((prop == 'template') and \ - self.klass in ("TemplateVM", "StandaloneVM")) \ - or ((prop == 'appvm_default_bootmode') and \ - self.klass in ("AppVM",)): + if ( + (prop == "template") + and self.klass in ("TemplateVM", "StandaloneVM") + ) or ( + (prop == "appvm_default_bootmode") and self.klass in ("AppVM",) + ): self.qapp.expected_calls[ - (self.name, "admin.vm.property.Get", prop, None)] = \ - b'2\x00QubesNoSuchPropertyError\x00\x00No such property\x00' + (self.name, "admin.vm.property.Get", prop, None) + ] = b"2\x00QubesNoSuchPropertyError\x00\x00No such property\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.property.GetDefault", prop, None)] = \ - b'2\x00QubesNoSuchPropertyError\x00\x00No such property\x00' + (self.name, "admin.vm.property.GetDefault", prop, None) + ] = b"2\x00QubesNoSuchPropertyError\x00\x00No such property\x00" continue - properties_getall += \ - f"{prop} {value}\n".encode() + properties_getall += f"{prop} {value}\n".encode() self.qapp.expected_calls[ - (self.name, "admin.vm.property.Get", prop, None)] = \ - b"0\x00" + str(value).encode() + (self.name, "admin.vm.property.Get", prop, None) + ] = (b"0\x00" + str(value).encode()) if prop in DEFAULT_VM_PROPERTIES: default_value = DEFAULT_VM_PROPERTIES[prop] self.qapp.expected_calls[ - (self.name, "admin.vm.property.GetDefault", prop, None)] = \ - b"0\x00" + default_value.default_string().encode() + (self.name, "admin.vm.property.GetDefault", prop, None) + ] = (b"0\x00" + default_value.default_string().encode()) # the property.GetAll call is optional, but let's have it, why not self.qapp.expected_calls[ - (self.name, "admin.vm.property.GetAll", None, None)] = \ - properties_getall + (self.name, "admin.vm.property.GetAll", None, None) + ] = properties_getall # features: we add both feature.Get and feature.CheckWithTemplate, with # the same resulting value @@ -423,27 +473,45 @@ def update_calls(self): for feature_name, value in self.features.items(): if value is not None: self.qapp.expected_calls[ - (self.name, "admin.vm.feature.Get", feature_name, None)] = \ - b"0\x00" + str(value).encode() + (self.name, "admin.vm.feature.Get", feature_name, None) + ] = (b"0\x00" + str(value).encode()) self.qapp.expected_calls[ - (self.name, "admin.vm.feature.CheckWithTemplate", - feature_name, None)] = b"0\x00" + str(value).encode() + ( + self.name, + "admin.vm.feature.CheckWithTemplate", + feature_name, + None, + ) + ] = ( + b"0\x00" + str(value).encode() + ) else: self.qapp.expected_calls[ - (self.name, "admin.vm.feature.Get", feature_name, None)] = \ - b'2\x00QubesFeatureNotFoundError\x00\x00' + \ - str(feature_name).encode() + b'\x00' + (self.name, "admin.vm.feature.Get", feature_name, None) + ] = ( + b"2\x00QubesFeatureNotFoundError\x00\x00" + + str(feature_name).encode() + + b"\x00" + ) self.qapp.expected_calls[ - (self.name, "admin.vm.feature.CheckWithTemplate", - feature_name, None)] = \ - b'2\x00QubesFeatureNotFoundError\x00\x00' + \ - str(feature_name).encode() + b'\x00' + ( + self.name, + "admin.vm.feature.CheckWithTemplate", + feature_name, + None, + ) + ] = ( + b"2\x00QubesFeatureNotFoundError\x00\x00" + + str(feature_name).encode() + + b"\x00" + ) # list all features correctly self.qapp.expected_calls[ - (self.name, "admin.vm.feature.List", None, None)] = \ - ("0\x00" + "".join(f"{feature}\n" for feature - in self.features)).encode() + (self.name, "admin.vm.feature.List", None, None) + ] = ( + "0\x00" + "".join(f"{feature}\n" for feature in self.features) + ).encode() # setup all volumeInfo related calls self.setup_volume_calls() @@ -453,19 +521,21 @@ def update_calls(self): if self.tags and tag in self.tags: continue self.qapp.expected_calls[ - (self.name, "admin.vm.tag.Get", tag, None)] = b"0\x000" + (self.name, "admin.vm.tag.Get", tag, None) + ] = b"0\x000" if self.tags: self.qapp.expected_calls[ - (self.name, "admin.vm.tag.List", None, None)] = \ - b"0\x00" + ("".join(f"{tag}\n" for tag in self.tags)).encode() + (self.name, "admin.vm.tag.List", None, None) + ] = (b"0\x00" + ("".join(f"{tag}\n" for tag in self.tags)).encode()) for tag in self.tags: self.qapp.expected_calls[ - (self.name, "admin.vm.tag.Get", tag, None)] = b'0\0001' + (self.name, "admin.vm.tag.Get", tag, None) + ] = b"0\0001" else: self.qapp.expected_calls[ - (self.name, "admin.vm.tag.List", None, None)] = "0\x00".encode() - + (self.name, "admin.vm.tag.List", None, None) + ] = "0\x00".encode() self.setup_device_calls() self.setup_firewall_rules(self.firewall_rules) @@ -475,213 +545,307 @@ def update_calls(self): def setup_volume_calls(self): self.qapp.expected_calls[ - (self.name, 'admin.vm.volume.List', None, None)] = \ - b'0\x00root\nprivate\nvolatile\nkernel\n' + (self.name, "admin.vm.volume.List", None, None) + ] = b"0\x00root\nprivate\nvolatile\nkernel\n" self.qapp.expected_calls[ - (self.name, 'admin.vm.volume.Info', 'root', None)] = \ - VolumeInfo(self.name, "root", - usage_perc=self.usage, - save_on_stop=self.klass in [ - 'TemplateVM', 'StandaloneVM']).response_string() + (self.name, "admin.vm.volume.Info", "root", None) + ] = VolumeInfo( + self.name, + "root", + usage_perc=self.usage, + save_on_stop=self.klass in ["TemplateVM", "StandaloneVM"], + ).response_string() self.qapp.expected_calls[ - (self.name, 'admin.vm.volume.Info', 'private', None)] = \ - VolumeInfo(self.name, "private", - usage_perc=self.usage).response_string() + (self.name, "admin.vm.volume.Info", "private", None) + ] = VolumeInfo( + self.name, "private", usage_perc=self.usage + ).response_string() self.qapp.expected_calls[ - (self.name, 'admin.vm.volume.Info', 'volatile', None)] = \ - VolumeInfo(self.name, "volatile", - usage_perc=self.usage).response_string() + (self.name, "admin.vm.volume.Info", "volatile", None) + ] = VolumeInfo( + self.name, "volatile", usage_perc=self.usage + ).response_string() self.qapp.expected_calls[ - (self.name, 'admin.vm.volume.Info', 'kernel', None)] = \ - VolumeInfo(self.name, "kernel").response_string() + (self.name, "admin.vm.volume.Info", "kernel", None) + ] = VolumeInfo(self.name, "kernel").response_string() def setup_device_calls(self): # all devices self.qapp.expected_calls[ - (self.name, "admin.vm.device.pci.List", None, None)] = b"0\x00" + (self.name, "admin.vm.device.pci.List", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.block.List", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.block.List", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.usb.List", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.usb.List", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.mic.List", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.mic.List", None, None) + ] = b"0\x00" # available devices self.qapp.expected_calls[ - (self.name, "admin.vm.device.pci.Available", None, None)] = b"0\x00" + (self.name, "admin.vm.device.pci.Available", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.block.Available", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.block.Available", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.usb.Available", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.usb.Available", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.mic.Available", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.mic.Available", None, None) + ] = b"0\x00" # assigned devices self.qapp.expected_calls[ - (self.name, "admin.vm.device.pci.Assigned", None, None)] = b"0\x00" + (self.name, "admin.vm.device.pci.Assigned", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.block.Assigned", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.block.Assigned", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.usb.Assigned", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.usb.Assigned", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.mic.Assigned", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.mic.Assigned", None, None) + ] = b"0\x00" # currently attached devices self.qapp.expected_calls[ - (self.name, "admin.vm.device.pci.Attached", None, None)] = b"0\x00" + (self.name, "admin.vm.device.pci.Attached", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.block.Attached", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.block.Attached", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.usb.Attached", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.usb.Attached", None, None) + ] = b"0\x00" self.qapp.expected_calls[ - (self.name, "admin.vm.device.mic.Attached", None, None)] = \ - b"0\x00" + (self.name, "admin.vm.device.mic.Attached", None, None) + ] = b"0\x00" def setup_firewall_rules(self, rule_list: List[Dict[str, str]]): """setup firewall with provided rules: rules should be provided as list of dictionaries. Typical keys are action, proto, dsthost, dstports, icmptype, specialtarget, expire and comment""" - rules = b'0\x00' + rules = b"0\x00" for rule_dict in rule_list: - rules += (' '.join(f'{key}={value}' for key, value - in rule_dict.items()) + '\n').encode() + rules += ( + " ".join(f"{key}={value}" for key, value in rule_dict.items()) + + "\n" + ).encode() self.qapp.expected_calls[ - (self.name, "admin.vm.firewall.Get", None, None)] = rules + (self.name, "admin.vm.firewall.Get", None, None) + ] = rules class MockAdminVM(MockQube): def __init__(self, qapp): - super().__init__('dom0', qapp, klass='AdminVM', running=True, - features=DEFAULT_DOM0_FEATURES.copy(), - override_default_props=DEFAULT_DOM0_PROPERTIES.copy()) + super().__init__( + "dom0", + qapp, + klass="AdminVM", + running=True, + features=DEFAULT_DOM0_FEATURES.copy(), + override_default_props=DEFAULT_DOM0_PROPERTIES.copy(), + ) # make all properties that are unknown give an 'unknown property' error for property_name in DEFAULT_VM_PROPERTIES: if property_name not in self.properties: self.qapp.expected_calls[ - (self.name, "admin.vm.property.Get", - property_name, None)] = \ - b'2\x00QubesNoSuchPropertyError\x00\x00No such property\x00' + (self.name, "admin.vm.property.Get", property_name, None) + ] = b"2\x00QubesNoSuchPropertyError\x00\x00No such property\x00" continue +# pylint: disable=too-many-positional-arguments class MockDevice: """helper for adding a device to a qubes test instance""" - def __init__(self, qapp: QubesTest, dev_class: str, - description: str, dev_id: str, backend_vm: str, - attached: Optional[str] = None): + + def __init__( + self, + qapp: QubesTest, + dev_class: str, + device_id: str, + backend_vm: str, + port: str, + product: str, + vendor: str, + attached: Optional[str] = None, + assigned: Optional[List[Tuple[str, str, list[Any] | None]]] = None, + ): """ :param qapp: QubesTest object :param dev_class: block / mic / usb - :param description: device description - :param dev_id: dev id (such as sda, 2-1, mic) + :param device_id: device_id + :param port: port :param backend_vm: name of the vm providing this device - :param attached: name of the qube to which the device is attached, - if any + :param attached: name of the qube to which the device is + currently attached, + :param assigned: list of the qubes to which the device is currently + assigned, tupled with mode ('ask-to-attach' or 'auto-attach') """ self.qapp = qapp self.dev_class = dev_class - self.description = description - self.dev_id = dev_id + self.device_id = device_id + self.port = port self.backend_vm = backend_vm self.attached = attached + self.assigned = assigned + self.product = product + self.vendor = vendor + + self.interface = self.device_id.split(":")[-1] self.update_calls() def device_string(self): - if self.dev_class == 'block': - port, d_id = self.dev_id.split(":", 1) - return f"{self.dev_id} device_id='{d_id}' port_id='{port}' " \ - f"devclass='block' backend_domain='{self.backend_vm}' " \ - f"serial='root/test.img' manufacturer='{self.description}' " \ - "interfaces='b******'\n" - if self.dev_class == 'pci': - _, d_id = self.dev_id.split(":") - port = self.dev_id.replace(':', '_') - return f"{port} device_id='{d_id}' " \ - f"port_id='{port}' devclass='pci' " \ - f"product='{self.description}' vendor='{self.description}' " \ - f"interfaces='p0c0500' backend_domain='{self.backend_vm}'\n" - return f"{self.dev_id} description='{self.description}'\n" + return ( + f"{self.port} device_id='{self.device_id}' " + f"port_id='{self.port}' devclass='{self.dev_class}' " + f"product='{self.product}' vendor='{self.vendor}' " + f"interfaces='{self.interface}' " + f"backend_domain='{self.backend_vm}'\n" + ) def attachment_string(self): - if ":" in self.dev_id: - port, _ = self.dev_id.split(":") - elif self.dev_class == 'mic': - port = "mic" - else: - port = "00" - string = (f"{self.backend_vm}+{self.dev_id} " - f"port_id='{port}' devclass='{self.dev_class}' " - f"backend_domain='{self.backend_vm}' mode='required' " - f"frontend_domain='{self.attached}'") + string = ( + f"{self.backend_vm}+{self.port} " + f"port_id='{self.port}' devclass='{self.dev_class}' " + f"backend_domain='{self.backend_vm}' mode='manual' " + f"frontend_domain='{self.attached}'" + ) + string += "\n" + return string + + def assignment_string(self, vm, mode, opts): + string = ( + f"{self.backend_vm}+{self.port} device_id='" + f"{self.device_id}' " + f"port_id='{self.port}' devclass='{self.dev_class}' " + f"backend_domain='{self.backend_vm}' mode='{mode}' " + f"frontend_domain='{vm}'" + ) + if opts: + for opt in opts: + string += f" _{opt}='True'" string += "\n" return string def update_calls(self): # modify call current_response = self.qapp.expected_calls[ - (self.backend_vm, f"admin.vm.device.{self.dev_class}.Available", - None, None)] + ( + self.backend_vm, + f"admin.vm.device.{self.dev_class}.Available", + None, + None, + ) + ] self.qapp.expected_calls[ - (self.backend_vm, f"admin.vm.device.{self.dev_class}.Available", - None, None)] = current_response + self.device_string().encode() + ( + self.backend_vm, + f"admin.vm.device.{self.dev_class}.Available", + None, + None, + ) + ] = ( + current_response + self.device_string().encode() + ) if self.attached: current_response = self.qapp.expected_calls[ - (self.attached, f"admin.vm.device.{self.dev_class}.Assigned", - None, None)] + ( + self.attached, + f"admin.vm.device.{self.dev_class}.Attached", + None, + None, + ) + ] self.qapp.expected_calls[ - (self.attached, f"admin.vm.device.{self.dev_class}.Assigned", - None, None)] = current_response + \ - self.attachment_string().encode() + ( + self.attached, + f"admin.vm.device.{self.dev_class}.Attached", + None, + None, + ) + ] = ( + current_response + self.attachment_string().encode() + ) + + if self.assigned: + for vm, mode, opts in self.assigned: + current_response = self.qapp.expected_calls[ + ( + vm, + f"admin.vm.device.{self.dev_class}.Assigned", + None, + None, + ) + ] + self.qapp.expected_calls[ + ( + vm, + f"admin.vm.device.{self.dev_class}.Assigned", + None, + None, + ) + ] = ( + current_response + + self.assignment_string(vm, mode, opts).encode() + ) class QubesTestWrapper(QubesTest): def __init__(self): super().__init__() - self._local_name = 'dom0' # pylint: disable=protected-access + self._local_name = "dom0" # pylint: disable=protected-access self._devices = [] - self.app.qubesd_connection_type = 'qrexec' - - self._qubes: Dict[str, MockQube] = {'dom0': MockAdminVM(self)} - - labels = ['red', 'orange', 'yellow', 'green', 'gray', 'blue', - 'purple', 'black'] + self.app.qubesd_connection_type = "qrexec" + + self._qubes: Dict[str, MockQube] = {"dom0": MockAdminVM(self)} + + labels = [ + "red", + "orange", + "yellow", + "green", + "gray", + "blue", + "purple", + "black", + ] # setup labels - self.expected_calls[('dom0', 'admin.label.List', None, None)] = \ - b'0\x00' + '\n'.join(labels).encode() + b'\n' + self.expected_calls[("dom0", "admin.label.List", None, None)] = ( + b"0\x00" + "\n".join(labels).encode() + b"\n" + ) for i, label in enumerate(labels): - self.expected_calls[('dom0', 'admin.label.Index', label, None)] =\ - b'0\x00' + str(i).encode() + self.expected_calls[("dom0", "admin.label.Index", label, None)] = ( + b"0\x00" + str(i).encode() + ) # setup pools: - pools = ['linux-kernel', 'lvm', 'file', 'vm-pool'] - self.expected_calls[('dom0', 'admin.pool.List', None, None)] = \ - b'0\x00' + '\n'.join(pools).encode() + b'\n' - self.expected_calls[('dom0', 'admin.pool.volume.List', - 'linux-kernel', None)] = \ - b'0\x001.1\nmisc\n4.2\n' + pools = ["linux-kernel", "lvm", "file", "vm-pool"] + self.expected_calls[("dom0", "admin.pool.List", None, None)] = ( + b"0\x00" + "\n".join(pools).encode() + b"\n" + ) + self.expected_calls[ + ("dom0", "admin.pool.volume.List", "linux-kernel", None) + ] = b"0\x001.1\nmisc\n4.2\n" for pool in pools: - self.app.expected_calls[('dom0', 'admin.pool.UsageDetails', pool, - None)] = \ - b'0\x00data_size=1099511627776\n' \ - b'data_usage=102400\n' \ - b'metadata_size=1024\n' \ - b'metadata_usage=50\n' + self.app.expected_calls[ + ("dom0", "admin.pool.UsageDetails", pool, None) + ] = ( + b"0\x00data_size=1099511627776\n" + b"data_usage=102400\n" + b"metadata_size=1024\n" + b"metadata_usage=50\n" + ) self.populate_feature_calls() @@ -706,14 +870,21 @@ def populate_feature_calls(self): for feature in known_features: calls = [ (qube.name, "admin.vm.feature.Get", feature, None), - (qube.name, 'admin.vm.feature.CheckWithTemplate', - feature, None)] + ( + qube.name, + "admin.vm.feature.CheckWithTemplate", + feature, + None, + ), + ] for call in calls: if call in self.expected_calls: continue - self.expected_calls[call] = \ - b'2\x00QubesFeatureNotFoundError\x00\x00' + \ - str(feature).encode() + b'\x00' + self.expected_calls[call] = ( + b"2\x00QubesFeatureNotFoundError\x00\x00" + + str(feature).encode() + + b"\x00" + ) def set_global_property(self, property_name: str, value: Optional[str]): self._global_properties[property_name].value = value @@ -722,14 +893,13 @@ def update_global_properties(self): properties_getall = b"0\x00" for property_name, value in self._global_properties.items(): - self.app.expected_calls[('dom0', "admin.property.Get", - property_name, None)] = \ - b"0\x00" + str(value).encode() - properties_getall += \ - f"{property_name} {value}\n".encode() + self.app.expected_calls[ + ("dom0", "admin.property.Get", property_name, None) + ] = (b"0\x00" + str(value).encode()) + properties_getall += f"{property_name} {value}\n".encode() self.app.expected_calls[ - ('dom0', "admin.property.GetAll", None, None)] = \ - properties_getall + ("dom0", "admin.property.GetAll", None, None) + ] = properties_getall class MockQubes(QubesTestWrapper): @@ -739,28 +909,36 @@ class MockQubes(QubesTestWrapper): Available pools: linux-kernel, lvm and file. Available linux kernels: 1.1, misc and 4.2 """ + def __init__(self): super().__init__() - self._qubes['sys-net'] = MockQube( - name="sys-net", qapp=self, - running=True, template='fedora-36', - provides_network=True, - features={ - 'service.qubes-updates-proxy': '1', 'servicevm': '1', - 'supported-service.qubes-updates-proxy': '1'}) - self._qubes['fedora-36'] = MockQube( - name="fedora-36", qapp=self, - klass='TemplateVM', netvm='', + self._qubes["sys-net"] = MockQube( + name="sys-net", + qapp=self, + running=True, + template="fedora-36", + provides_network=True, features={ - 'service.updates-proxy-setup': 1}) + "service.qubes-updates-proxy": "1", + "servicevm": "1", + "supported-service.qubes-updates-proxy": "1", + }, + ) + self._qubes["fedora-36"] = MockQube( + name="fedora-36", + qapp=self, + klass="TemplateVM", + netvm="", + features={"service.updates-proxy-setup": 1}, + ) self._global_properties = GLOBAL_PROPERTIES.copy() - self.set_global_property('clockvm', "sys-net") - self.set_global_property('default_dispvm', "fedora-36") - self.set_global_property('default_netvm', "sys-net") - self.set_global_property('default_template', "fedora-36") - self.set_global_property('updatevm', "sys-net") + self.set_global_property("clockvm", "sys-net") + self.set_global_property("default_dispvm", "fedora-36") + self.set_global_property("default_netvm", "sys-net") + self.set_global_property("default_template", "fedora-36") + self.set_global_property("updatevm", "sys-net") self.update_global_properties() self.update_vm_calls() @@ -770,68 +948,160 @@ class MockQubesComplete(MockQubes): """ A complex Qubes setup, with multiple qubes. """ + def __init__(self): super().__init__() - self._qubes['sys-firewall'] = MockQube( - name="sys-firewall", qapp=self, netvm="sys-net", - provides_network=True, features={'servicevm': '1', - 'qubes-firewall': 1}) - - self._qubes['sys-usb'] = MockQube( - name="sys-usb", qapp=self, running=True, - features={'supported-service.qubes-u2f-proxy': '1', - 'servicevm': '1'}) - - self._qubes['fedora-35'] = MockQube( - name="fedora-35", qapp=self, klass='TemplateVM', netvm='', + self._qubes["sys-firewall"] = MockQube( + name="sys-firewall", + qapp=self, + netvm="sys-net", + provides_network=True, + features={"servicevm": "1", "qubes-firewall": 1}, + ) + + self._qubes["sys-usb"] = MockQube( + name="sys-usb", + qapp=self, + running=True, + features={ + "supported-service.qubes-u2f-proxy": "1", + "servicevm": "1", + }, + ) + + self._qubes["fedora-35"] = MockQube( + name="fedora-35", + qapp=self, + klass="TemplateVM", + netvm="", installed_by_rpm=True, - features={'supported-service.qubes-u2f-proxy': '1', - 'service.qubes-update-check': '1', - 'updates-available': 1, - 'service.updates-proxy-setup': 1}) - - self._qubes['default-dvm'] = MockQube( - name="default-dvm", qapp=self, klass='DispVM', - template_for_dispvms='True', template='fedora-36', - features={'appmenus-dispvm': '1'}) - - self._qubes['test-vm'] = MockQube( - name="test-vm", qapp=self, - features={'service.qubes-u2f-proxy': '1', - 'supported-service.qubes-u2f-proxy': '1'}) + features={ + "supported-service.qubes-u2f-proxy": "1", + "service.qubes-update-check": "1", + "updates-available": 1, + "service.updates-proxy-setup": 1, + }, + ) + + self._qubes["default-dvm"] = MockQube( + name="default-dvm", + qapp=self, + klass="DispVM", + template_for_dispvms="True", + template="fedora-36", + features={"appmenus-dispvm": "1"}, + ) + + self._qubes["test-vm"] = MockQube( + name="test-vm", + qapp=self, + features={ + "service.qubes-u2f-proxy": "1", + "supported-service.qubes-u2f-proxy": "1", + }, + ) - self._qubes['test-blue'] = MockQube( - name="test-blue", running=True, qapp=self, label="blue") + self._qubes["test-blue"] = MockQube( + name="test-blue", running=True, qapp=self, label="blue" + ) - self._qubes['test-red'] = MockQube( - name="test-red", qapp=self, label="red") + self._qubes["test-red"] = MockQube( + name="test-red", qapp=self, label="red" + ) - self._qubes['test-old'] = MockQube( - name="test-old", qapp=self, label="orange", template='fedora-35') - self._qubes['test-old'].set_property_default('netvm', 'sys-firewall') + self._qubes["test-old"] = MockQube( + name="test-old", qapp=self, label="orange", template="fedora-35" + ) + self._qubes["test-old"].set_property_default("netvm", "sys-firewall") - self._qubes['test-standalone'] = MockQube( - name="test-standalone", qapp=self, klass="StandaloneVM", - label="green") + self._qubes["test-standalone"] = MockQube( + name="test-standalone", + qapp=self, + klass="StandaloneVM", + label="green", + ) - self._qubes['vault'] = MockQube( - name="vault", qapp=self, netvm="") + self._qubes["vault"] = MockQube(name="vault", qapp=self, netvm="") # # this system has some reasonable defaults - self._qubes['dom0'].default_dispvm = "default-dvm" + self._qubes["dom0"].default_dispvm = "default-dvm" # also add a bunch of devices self._devices = [ - MockDevice(self, 'mic', 'Internal Microphone', 'mic', 'dom0', - attached='test-blue'), + MockDevice( + self, + dev_class="mic", + device_id="dom0:mic::m000000", + backend_vm="dom0", + port="mic", + product="Internal Mic", + vendor="ACME", + attached="test-blue", + ), # the usb stick appears as multiple devices, as they are wont to - MockDevice(self, 'usb', 'My USB Drive', '2-1', 'sys-usb'), - MockDevice(self, 'block', ' ()', 'sda::0', 'sys-usb'), - MockDevice(self, 'block', '(USB DISK)', 'sda1::1', 'sys-usb'), - MockDevice(self, 'usb', 'Internal Camera', '2-10', 'sys-usb'), - MockDevice(self, 'pci', 'Host bridge', '00:00.0', 'dom0'), - MockDevice(self, 'pci', 'PCI Bridge', '00:02.2', 'dom0'), - MockDevice(self, 'pci', 'USB Controller', '00:03.2', 'dom0'), + MockDevice( + self, + dev_class="usb", + device_id="1d6b:0104:CAFEBABE:u030101u300000", + backend_vm="sys-usb", + port="2-1", + product="My USB Drive", + vendor="SCP Foundation", + ), + MockDevice( + self, + dev_class="block", + device_id="1d6b:0104:CAFEBABE:b123456", + backend_vm="sys-usb", + port="sda", + product="(USB)", + vendor="SCP Foundation", + ), + MockDevice( + self, + dev_class="block", + device_id="1d6b:0104:CAFEBABE:b123456", + backend_vm="sys-usb", + port="sda", + product="()", + vendor="SCP Foundation", + ), + MockDevice( + self, + dev_class="usb", + device_id="04f2:b684:01.00.00:u0e0101u0e0201", + backend_vm="sys-usb", + port="2-7", + product="Internal Camera", + vendor="Saruman Industries", + ), + MockDevice( + self, + dev_class="pci", + device_id="0x8086:0x4621::p060000", + backend_vm="dom0", + port="00_00.0", + product="PCI Bridge", + vendor="Unnamed Industries", + ), + MockDevice( + self, + dev_class="pci", + device_id="0x8086:0x51e9::p0c8000", + backend_vm="dom0", + port="00_02.0", + product="I2C Controller", + vendor="Unnamed Industries", + ), + MockDevice( + self, + dev_class="pci", + device_id="0x8086:0x461e::p0c0330", + backend_vm="dom0", + port="00_03.0", + product="I2C Controller", + vendor="Unnamed Industries", + ), ] self.update_vm_calls() @@ -839,32 +1109,50 @@ def __init__(self): class MockQubesWhonix(MockQubesComplete): """Complete Qubes system, with additional whonix qubes.""" + def __init__(self): super().__init__() - self._qubes['sys-whonix'] = MockQube( - name="sys-whonix", qapp=self, template="whonix-gw-15", - features={'service.qubes-updates-proxy': '1', - 'supported-service.qubes-updates-proxy': '1'}, - tags=['anon-gateway']) - - self._qubes['anon-whonix'] = MockQube( - name="anon-whonix", qapp=self, template="whonix-gw-15", - tags=['anon-gateway']) - - self._qubes['whonix-gw-15'] = MockQube( - name="whonix-gw-15", qapp=self, klass='TemplateVM', netvm='', - tags=['whonix-updatevm']) - - self._qubes['whonix-gw-14'] = MockQube( - name="whonix-gw-14", qapp=self, klass='TemplateVM', netvm='', - tags=['whonix-updatevm']) + self._qubes["sys-whonix"] = MockQube( + name="sys-whonix", + qapp=self, + template="whonix-gw-15", + features={ + "service.qubes-updates-proxy": "1", + "supported-service.qubes-updates-proxy": "1", + }, + tags=["anon-gateway"], + ) + + self._qubes["anon-whonix"] = MockQube( + name="anon-whonix", + qapp=self, + template="whonix-gw-15", + tags=["anon-gateway"], + ) + + self._qubes["whonix-gw-15"] = MockQube( + name="whonix-gw-15", + qapp=self, + klass="TemplateVM", + netvm="", + tags=["whonix-updatevm"], + ) + + self._qubes["whonix-gw-14"] = MockQube( + name="whonix-gw-14", + qapp=self, + klass="TemplateVM", + netvm="", + tags=["whonix-updatevm"], + ) self.update_vm_calls() # Mock Stats Dispatcher object + async def noop_coro(*_args): """A very simple do-nothing coroutine, used to mock qubes events.""" while True: @@ -873,7 +1161,8 @@ async def noop_coro(*_args): class MockDispatcher(qubesadmin.events.EventsDispatcher): """Create a mock EventsDispatcher object that does not actually dispatch - events""" + events""" + def __init__(self, qapp, **kwargs): super().__init__(qapp, **kwargs) self._listen_for_events = noop_coro @@ -881,22 +1170,24 @@ def __init__(self, qapp, **kwargs): class MockEvent: """Helper class for event handling.""" + def __init__( - self, - event_subject: str, - event_name: str, - additional_keys: Optional[List[Tuple[str, str]]] = None): + self, + event_subject: str, + event_name: str, + additional_keys: Optional[List[Tuple[str, str]]] = None, + ): """ event subject - the name of the object that fired the event event name - the name of the event additional keys: list of str, str tuples representing any additional event keys and values """ - self.event_string = '1\0' + event_subject + '\0' + event_name + '\0' + self.event_string = "1\0" + event_subject + "\0" + event_name + "\0" if additional_keys: for key, value in additional_keys: - self.event_string += key + '\0' + value + '\0' - self.event_string += '\0' + self.event_string += key + "\0" + value + "\0" + self.event_string += "\0" self.event_encoded = self.event_string.encode() def get_event_line(self): @@ -906,13 +1197,14 @@ def get_event_line(self): class MockAsyncDispatcher(qubesadmin.events.EventsDispatcher): """Working mock events dispatcher. Events must be initialized with initialize_events.""" + def __init__(self, qapp, **kwargs): super().__init__(qapp, **kwargs) self.mock_events = mock.AsyncMock() self._get_events_reader = self.mock_events self.events = [] - connection_established_event = MockEvent('', 'connection_established') + connection_established_event = MockEvent("", "connection_established") self.events.append(connection_established_event.get_event_line()) self.mock_events.side_effect = MockEventsReader(self.events)