diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c5e4c85..fb01854 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.4.0 hooks: - id: check-added-large-files - id: check-case-conflict @@ -16,13 +16,13 @@ repos: - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v2.31.1 + rev: v3.3.1 hooks: - id: pyupgrade args: ["--py37-plus"] - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 23.1.0 hooks: - id: black args: @@ -30,23 +30,23 @@ repos: - --quiet - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort - repo: https://github.com/PyCQA/flake8 - rev: 4.0.1 + rev: 6.0.0 hooks: - id: flake8 additional_dependencies: [flake8-bugbear] - repo: https://github.com/codespell-project/codespell - rev: v2.1.0 + rev: v2.2.2 hooks: - id: codespell - repo: https://github.com/pre-commit/pygrep-hooks - rev: v1.9.0 + rev: v1.10.0 hooks: - id: python-check-blanket-noqa - id: python-check-blanket-type-ignore diff --git a/README.md b/README.md index ff10b57..c69891f 100644 --- a/README.md +++ b/README.md @@ -46,14 +46,14 @@ Ports are configurable import asyncio import logging -from nibe.coil import Coil +from nibe.coil import CoilData from nibe.connection.nibegw import NibeGW from nibe.heatpump import HeatPump, Model logger = logging.getLogger("nibe").getChild(__name__) -def on_coil_update(coil: Coil): - logger.debug(f"{coil.name}: {coil.value}") +def on_coil_update(coil_data: CoilData): + logger.debug(coil_data) async def main(): heatpump = HeatPump(Model.F1255) @@ -80,14 +80,14 @@ With S series heatpumps import asyncio import logging -from nibe.coil import Coil +from nibe.coil import CoilData from nibe.connection.modbus import Modbus from nibe.heatpump import HeatPump, Model logger = logging.getLogger("nibe").getChild(__name__) -def on_coil_update(coil: Coil): - logger.debug(f"on_coil_update: {coil.name}: {coil.value}") +def on_coil_update(coil_data: CoilData): + logger.debug(f"on_coil_update: {coil_data}") async def main(): heatpump = HeatPump(Model.F1255) @@ -99,9 +99,9 @@ async def main(): connection = Modbus(heatpump=heatpump, url="tcp://192.168.1.2:502", slave_id=1) coil = heatpump.get_coil_by_name('bt50-room-temp-s1-40033') - await connection.read_coil(coil) + coil_data = await connection.read_coil(coil) - logger.debug(f"main: {coil.name}: {coil.value}") + logger.debug(f"main: {coil_data}") if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) @@ -118,14 +118,14 @@ With NIBE MODBUS 40 import asyncio import logging -from nibe.coil import Coil +from nibe.coil import CoilData from nibe.connection.modbus import Modbus from nibe.heatpump import HeatPump, Model logger = logging.getLogger("nibe").getChild(__name__) -def on_coil_update(coil: Coil): - logger.debug(f"on_coil_update: {coil.name}: {coil.value}") +def on_coil_update(coil_data: CoilData): + logger.debug(f"on_coil_update: {coil_data}") async def main(): heatpump = HeatPump(Model.F1255) @@ -137,9 +137,9 @@ async def main(): connection = Modbus(heatpump=heatpump, url="serial:///dev/ttyS0", slave_id=1, conn_options={"baudrate": 9600}) coil = heatpump.get_coil_by_name('bt50-room-temp-s1-40033') - await connection.read_coil(coil) + coil_data = await connection.read_coil(coil) - logger.debug(f"main: {coil.name}: {coil.value}") + logger.debug(f"main: {coil_data}") if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) diff --git a/nibe/__init__.py b/nibe/__init__.py index e4adfb8..8c0d5d5 100644 --- a/nibe/__init__.py +++ b/nibe/__init__.py @@ -1 +1 @@ -__version__ = "1.6.0" +__version__ = "2.0.0" diff --git a/nibe/coil.py b/nibe/coil.py index 0a21966..f92506f 100644 --- a/nibe/coil.py +++ b/nibe/coil.py @@ -1,6 +1,7 @@ +from dataclasses import dataclass from typing import Dict, Optional, Union -from nibe.exceptions import NoMappingException +from nibe.exceptions import NoMappingException, ValidationError def is_coil_boolean(coil): @@ -17,9 +18,10 @@ def is_coil_boolean(coil): class Coil: + """Represents a coil.""" + mappings: Optional[Dict[str, str]] reverse_mappings: Optional[Dict[str, str]] - _value: Union[int, float, str, None] def __init__( self, @@ -67,9 +69,8 @@ def __init__( if self.is_boolean and not mappings: self.set_mappings({"0": "OFF", "1": "ON"}) - self._value = None - def set_mappings(self, mappings): + """Set mappings for value translation.""" if mappings: self.mappings = {k: v.upper() for k, v in mappings.items()} self.reverse_mappings = {v.upper(): k for k, v in mappings.items()} @@ -77,42 +78,15 @@ def set_mappings(self, mappings): self.mappings = None self.reverse_mappings = None - @property - def value(self) -> Union[int, float, str, None]: - return self._value - - @value.setter - def value(self, value: Union[int, float, str, None]): - if value is None: - self._value = None - return - - if self.reverse_mappings: - assert isinstance( - value, str - ), f"Provided value '{value}' is invalid type (str is supported) for {self.name}" - - value = value.upper() - assert ( - value in self.reverse_mappings - ), f"Provided value '{value}' is not in {self.reverse_mappings.keys()} for {self.name}" - - self._value = value - return - - assert isinstance( - value, (int, float) - ), f"Provided value '{value}' is invalid type (int and float are supported) for {self.name}" - - self.check_value_bounds(value) - - self._value = value - @property def has_mappings(self): + """Return True if mappings are defined.""" return self.mappings is not None def get_mapping_for(self, value: int): + """Return mapping for value. + + :raises NoMappingException: When no mapping is found""" if not self.mappings: raise NoMappingException(f"No mappings defined for {self.name}") @@ -123,38 +97,116 @@ def get_mapping_for(self, value: int): f"Mapping not found for {self.name} coil for value: {value}" ) - def get_reverse_mapping_for(self, value: Union[int, float, str, None]): + def get_reverse_mapping_for(self, value: Union[int, float, str, None]) -> int: + """Return reverse mapping for value. + + :raises NoMappingException: When no mapping is found""" + if not isinstance(value, str): + raise ValidationError( + f"{self.name} coil value ({value}) is invalid type (str is expected)" + ) + if not self.reverse_mappings: - raise NoMappingException(f"No reverse mappings defined for {self.name}") + raise NoMappingException( + f"{self.name} coil has no reverse mappings defined" + ) try: - return self.reverse_mappings[str(value)] + value = value.upper() + return int(self.reverse_mappings[str(value)]) except KeyError: raise NoMappingException( - f"Reverse mapping not found for {self.name} coil for value: {value}" + f"{self.name} coil reverse mapping not found for value: {value}" ) - def check_value_bounds(self, value): - if self.min is not None: - assert ( - value >= self.min - ), f"{self.name} coil value ({value}) is smaller than min allowed ({self.min})" + def is_raw_value_valid(self, value: int) -> bool: + """Return True if provided raw value is valid.""" + if not isinstance(value, int): + return False - if self.max is not None: - assert ( - value <= self.max - ), f"{self.name} coil value ({value}) is larger than max allowed ({self.max})" + if self.raw_min is not None and value < self.raw_min: + return False - def check_raw_value_bounds(self, value): - if self.raw_min is not None: - assert ( - value >= self.raw_min - ), f"value ({value}) is smaller than min allowed ({self.raw_min})" + if self.raw_max is not None and value > self.raw_max: + return False - if self.raw_max is not None: - assert ( - value <= self.raw_max - ), f"value ({value}) is larger than max allowed ({self.raw_max})" + return True def __repr__(self): - return f"Coil {self.address}, name: {self.name}, title: {self.title}, value: {self.value}" + return f"Coil {self.address}, name: {self.name}, title: {self.title}" + + +@dataclass +class CoilData: + """Represents a coil data.""" + + coil: Coil + value: Union[int, float, str, None] = None + + def __repr__(self) -> str: + return f"Coil {self.coil.name}, value: {self.value}" + + @staticmethod + def from_mapping(coil: Coil, value: int) -> "CoilData": + """Create CoilData from raw value using mappings.""" + return CoilData(coil, coil.get_mapping_for(value)) + + @staticmethod + def from_raw_value(coil: Coil, value: int) -> "CoilData": + """Create CoilData from raw value.""" + assert coil.is_raw_value_valid( + value + ), f"Raw value {value} is out of range for coil {coil.name}" + + if coil.has_mappings: + return CoilData.from_mapping(coil, value) + + return CoilData(coil, value / coil.factor) + + @property + def raw_value(self) -> int: + """Return raw value for coil.""" + if self.coil.has_mappings: + return self.coil.get_reverse_mapping_for(self.value) + + assert isinstance( + self.value, (int, float) + ), f"Provided value '{self.value}' is invalid type (int or float is supported) for {self.coil.name}" + + raw_value = int(self.value * self.coil.factor) + assert self.coil.is_raw_value_valid( + raw_value + ), f"Value {self.value} is out of range for coil {self.coil.name}" + + return raw_value + + def validate(self) -> None: + """Validate coil data. + + :raises ValidationError: when validation fails""" + if self.value is None: + raise ValidationError(f"Value for {self.coil.name} is not set") + + if self.coil.has_mappings: + self.coil.get_reverse_mapping_for( + self.value + ) # can throw NoMappingException(ValidationException) or AssertionError + return + + if not isinstance(self.value, (int, float)): + raise ValidationError( + f"{self.coil.name} coil value ({self.value}) is invalid type (expected int or float)" + ) + + self._check_value_bounds() + + def _check_value_bounds(self): + if self.coil.min is not None and self.value < self.coil.min: + raise ValidationError( + f"{self.coil.name} coil value ({self.value}) is smaller than min allowed ({self.coil.min})" + ) + + if self.coil.max is not None and self.value > self.coil.max: + raise ValidationError( + f"{self.coil.name} coil value ({self.value}) is larger than max allowed ({self.coil.max})" + ) diff --git a/nibe/connection/__init__.py b/nibe/connection/__init__.py index 084e639..4d22d24 100644 --- a/nibe/connection/__init__.py +++ b/nibe/connection/__init__.py @@ -3,8 +3,8 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator, Iterable -from nibe.coil import Coil -from nibe.exceptions import CoilReadException, CoilReadExceptionGroup +from nibe.coil import Coil, CoilData +from nibe.exceptions import ReadExceptionGroup, ReadIOException from nibe.heatpump import HeatPump, ProductInfo, Series DEFAULT_TIMEOUT: float = 5 @@ -12,41 +12,66 @@ class Connection(ABC): + """Base class for all connection methods.""" + async def start(self): # noqa: B027 + """Start the connection.""" pass async def stop(self): # noqa: B027 + """Close the connection.""" pass @abstractmethod - async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> CoilData: + """Read a coil data from the heatpump. + + :raises ReadIOException: If failed to read coil data due to IO error (will retry). + :raises ReadException: If failed to read coil data due to other error (will not retry). + """ pass async def read_coils( self, coils: Iterable[Coil], timeout: float = DEFAULT_TIMEOUT - ) -> AsyncIterator[Coil]: + ) -> AsyncIterator[CoilData]: + """Read multiple coil data from the heatpump. + + :raises ReadExceptionGroup: If one or more coils failed to read.""" exceptions = [] for coil in coils: try: yield await self.read_coil(coil, timeout) - except CoilReadException as exception: + except ReadIOException as exception: exceptions.append(exception) if exceptions: - raise CoilReadExceptionGroup("Failed to read some or all coils", exceptions) + raise ReadExceptionGroup("Failed to read some or all coils", exceptions) @abstractmethod - async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def write_coil( + self, coil_data: CoilData, timeout: float = DEFAULT_TIMEOUT + ) -> None: + """Write a coil data to the heatpump. + + :raises WriteIOException: If failed to write coil data due to IO error (will retry). + :raises WriteException: If failed to write coil data due to other error (will not retry). + """ pass async def read_product_info( self, timeout: float = READ_PRODUCT_INFO_TIMEOUT ) -> ProductInfo: + """Read product info from the heatpump. + + :raises ReadIOException: If failed to read product info in time.""" raise NotImplementedError( "read_product_info method is not implemented for this connection method" ) @abstractmethod async def verify_connectivity(self): + """Verify that we have functioning communication. + + :raises NibeException: If failed to verify connectivity.""" pass @@ -63,9 +88,9 @@ async def verify_connectivity_read_write_alarm( else: coil = heatpump.get_coil_by_name("alarm-reset-45171") - coil = await connection.read_coil(coil) + coil_data = await connection.read_coil(coil) value: str | int = 0 if coil.mappings: value = coil.mappings[str(value)] - coil.value = value - coil = await connection.write_coil(coil) + coil_data.value = value + await connection.write_coil(coil_data) diff --git a/nibe/connection/encoders.py b/nibe/connection/encoders.py index 61f518f..574c21b 100644 --- a/nibe/connection/encoders.py +++ b/nibe/connection/encoders.py @@ -12,8 +12,8 @@ Padded, ) -from nibe.coil import Coil -from nibe.exceptions import DecodeException, EncodeException +from nibe.coil import Coil, CoilData +from nibe.exceptions import DecodeException, EncodeException, ValidationError from nibe.parsers import WordSwapped parser_map = { @@ -35,30 +35,28 @@ class CoilDataEncoder: + """Encode and decode coil data.""" + def __init__(self, word_swap: bool = True) -> None: self._word_swap = word_swap - def encode(self, coil: Coil): - value = coil.value - try: - assert value is not None, "Unable to encode None value" - if coil.has_mappings: - return self._pad( - self._get_parser(coil), coil.get_reverse_mapping_for(value) - ) - - if coil.factor != 1: - value *= coil.factor + def encode(self, coil_data: CoilData) -> bytes: + """Encode coil data to bytes. - coil.check_raw_value_bounds(value) + :raises EncodeException: If encoding fails""" + try: + coil_data.validate() - return self._pad(self._get_parser(coil), value) - except (ConstructError, AssertionError) as e: + return self._pad(self._get_parser(coil_data.coil), coil_data.raw_value) + except (ConstructError, ValidationError) as e: raise EncodeException( - f"Failed to encode {coil.name} coil for value: {value}, exception: {e}" + f"Failed to encode {coil_data.coil.name} coil for value: {coil_data.value}, exception: {e}" ) - def decode(self, coil: Coil, raw: bytes): + def decode(self, coil: Coil, raw: bytes) -> CoilData: + """Decode coil data from bytes. + + :raises DecodeException: If decoding fails""" try: parser = self._get_parser(coil) assert parser.sizeof() <= len( @@ -66,18 +64,13 @@ def decode(self, coil: Coil, raw: bytes): ), f"Invalid raw data size: given {len(raw)}, expected at least {parser.sizeof()}" value = parser.parse(raw) if self._is_hitting_integer_limit(coil, value): - return None - coil.check_raw_value_bounds(value) - except AssertionError as e: + return CoilData(coil, None) + + return CoilData.from_raw_value(coil, value) + except (AssertionError, ConstructError, ValidationError) as e: raise DecodeException( f"Failed to decode {coil.name} coil from raw: {hexlify(raw).decode('utf-8')}, exception: {e}" - ) - if coil.factor != 1: - value /= coil.factor - if not coil.has_mappings: - return value - - return coil.get_mapping_for(value) + ) from e def _is_hitting_integer_limit(self, coil: Coil, int_value: int): if coil.size == "u8" and int_value == 0xFF: @@ -101,5 +94,5 @@ def _get_parser(self, coil: Coil) -> Construct: else: return parser_map_word_swaped[coil.size] - def _pad(self, parser: Construct, value) -> bytes: - return Padded(4, parser).build(int(value)) + def _pad(self, parser: Construct, value: int) -> bytes: + return Padded(4, parser).build(value) diff --git a/nibe/connection/modbus.py b/nibe/connection/modbus.py index 308bcb6..39dd267 100644 --- a/nibe/connection/modbus.py +++ b/nibe/connection/modbus.py @@ -4,17 +4,20 @@ from async_modbus import modbus_for_url import async_timeout +from tenacity import retry, retry_if_exception_type, stop_after_attempt from umodbus.exceptions import ModbusError -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.connection import DEFAULT_TIMEOUT, Connection from nibe.connection.encoders import CoilDataEncoder from nibe.exceptions import ( - CoilReadException, - CoilReadTimeoutException, - CoilWriteException, - CoilWriteTimeoutException, ModbusUrlException, + ReadException, + ReadIOException, + ReadTimeoutException, + ValidationError, + WriteIOException, + WriteTimeoutException, ) from nibe.heatpump import HeatPump @@ -61,10 +64,32 @@ def split_chunks(data, max_len, chunks) -> List[int]: class Modbus(Connection): - def __init__(self, heatpump: HeatPump, url, slave_id, conn_options=None): + """Modbus connection.""" + + def __init__( + self, + heatpump: HeatPump, + url, + slave_id, + conn_options=None, + read_retries: int = 3, + write_retries: int = 3, + ): self._slave_id = slave_id self._heatpump = heatpump + self.read_coil = retry( + retry=retry_if_exception_type(ReadIOException), + stop=stop_after_attempt(read_retries), + reraise=True, + )(self.read_coil) + + self.write_coil = retry( + retry=retry_if_exception_type(WriteIOException), + stop=stop_after_attempt(write_retries), + reraise=True, + )(self.write_coil) + try: self._client = modbus_for_url(url, conn_options) except ValueError as exc: @@ -75,12 +100,11 @@ def __init__(self, heatpump: HeatPump, url, slave_id, conn_options=None): async def stop(self) -> None: await self._client.stream.close() - async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> CoilData: logger.debug("Sending read request") entity_type, entity_number, entity_count = split_modbus_data(coil) try: - async with async_timeout.timeout(timeout): if entity_type == 3: result = await self._client.read_input_registers( @@ -107,39 +131,41 @@ async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: quantity=entity_count, ) else: - raise CoilReadException(f"Unsupported entity type {entity_type}") + raise ReadException(f"Unsupported entity type {entity_type}") - coil.value = self.coil_encoder.decode(coil, encode_u16_list(result)) + coil_data = self.coil_encoder.decode(coil, encode_u16_list(result)) - logger.info(f"{coil.name}: {coil.value}") - self._heatpump.notify_coil_update(coil) + logger.info(coil_data) + self._heatpump.notify_coil_update(coil_data) except ModbusError as exc: - raise CoilReadException( + raise ReadIOException( f"Error '{str(exc)}' reading {coil.name} starting: {entity_number} count: {entity_count} from: {self._slave_id}" ) from exc except asyncio.TimeoutError: - raise CoilReadTimeoutException( + raise ReadTimeoutException( f"Timeout waiting for read response for {coil.name}" ) - return coil + return coil_data - async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def write_coil( + self, coil_data: CoilData, timeout: float = DEFAULT_TIMEOUT + ) -> None: + coil = coil_data.coil assert coil.is_writable, f"{coil.name} is not writable" - assert coil.value is not None, f"{coil.name} value must be set" - - logger.debug("Sending write request") entity_type, entity_number, entity_count = split_modbus_data(coil) try: + coil_data.validate() + logger.debug("Sending write request") async with async_timeout.timeout(timeout): if entity_type == 4: result = await self._client.write_registers( slave_id=self._slave_id, starting_address=entity_number, values=decode_u16_list( - self.coil_encoder.encode(coil), entity_count + self.coil_encoder.encode(coil_data), entity_count ), ) elif entity_type == 0: @@ -147,27 +173,28 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil slave_id=self._slave_id, starting_address=entity_number, values=decode_u16_list( - self.coil_encoder.encode(coil), entity_count + self.coil_encoder.encode(coil_data), entity_count ), ) else: - raise CoilReadException(f"Unsupported entity type {entity_type}") + raise ReadIOException(f"Unsupported entity type {entity_type}") if not result: - raise CoilWriteException(f"Heatpump denied writing {coil.name}") + raise WriteIOException(f"Heatpump denied writing {coil.name}") else: logger.info(f"Write succeeded for {coil.name}") + except ValidationError as exc: + raise WriteIOException( + f"Error validating {coil.name} coil value: {str(exc)}" + ) from exc except ModbusError as exc: - raise CoilWriteException( + raise WriteIOException( f"Error '{str(exc)}' writing {coil.name} starting: {entity_number} count: {entity_count} to: {self._slave_id}" ) from exc except asyncio.TimeoutError: - raise CoilWriteTimeoutException( + raise WriteTimeoutException( f"Timeout waiting for write feedback for {coil.name}" ) - return coil - async def verify_connectivity(self): - """Verify that we have functioning communication.""" await verify_connectivity_read_write_alarm(self, self._heatpump) diff --git a/nibe/connection/nibegw.py b/nibe/connection/nibegw.py index 04b1739..33e072a 100644 --- a/nibe/connection/nibegw.py +++ b/nibe/connection/nibegw.py @@ -2,6 +2,7 @@ from asyncio import CancelledError, Future, InvalidStateError from binascii import hexlify from contextlib import suppress +from dataclasses import dataclass import errno from functools import reduce from io import BytesIO @@ -10,7 +11,7 @@ from operator import xor import socket import struct -from typing import Container, Dict, Optional, Union +from typing import Dict, Optional, Union from construct import ( Adapter, @@ -20,6 +21,7 @@ Checksum, ChecksumError, Const, + Container, Enum, EnumIntegerString, FixedSized, @@ -47,22 +49,24 @@ ) from tenacity import retry, retry_if_exception_type, stop_after_attempt -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.connection import DEFAULT_TIMEOUT, READ_PRODUCT_INFO_TIMEOUT, Connection from nibe.connection.encoders import CoilDataEncoder from nibe.event_server import EventServer from nibe.exceptions import ( AddressInUseException, CoilNotFoundException, - CoilReadException, - CoilReadSendException, - CoilReadTimeoutException, - CoilWriteException, CoilWriteSendException, - CoilWriteTimeoutException, DecodeException, NibeException, ProductInfoReadTimeoutException, + ReadException, + ReadIOException, + ReadSendException, + ReadTimeoutException, + WriteException, + WriteIOException, + WriteTimeoutException, ) from nibe.heatpump import HeatPump, ProductInfo @@ -72,6 +76,8 @@ class ConnectionStatus(Enum): + """Connection status of the NibeGW connection.""" + UNKNOWN = "unknown" INITIALIZING = "initializing" LISTENING = "listening" @@ -82,10 +88,19 @@ def __str__(self): return self.value +@dataclass +class CoilAction: + coil: Coil + future: Future + + class NibeGW(asyncio.DatagramProtocol, Connection, EventServer): + """NibeGW connection.""" + CONNECTION_STATUS_EVENT = "connection_status" PRODUCT_INFO_EVENT = "product_info" _futures: Dict[str, Future] + _registered_reads: Dict[str, CoilAction] _status: ConnectionStatus def __init__( @@ -114,17 +129,18 @@ def __init__( self._send_lock = asyncio.Lock() self._futures = {} + self._registered_reads = {} self.coil_encoder = CoilDataEncoder(heatpump.word_swap) self.read_coil = retry( - retry=retry_if_exception_type(CoilReadException), + retry=retry_if_exception_type(ReadIOException), stop=stop_after_attempt(read_retries), reraise=True, )(self.read_coil) self.write_coil = retry( - retry=retry_if_exception_type(CoilWriteException), + retry=retry_if_exception_type(WriteIOException), stop=stop_after_attempt(write_retries), reraise=True, )(self.write_coil) @@ -167,10 +183,12 @@ async def start(self): await asyncio.get_event_loop().create_datagram_endpoint(lambda: self, sock=sock) def connection_made(self, transport): + """Callback when connection is made.""" self._set_status(ConnectionStatus.LISTENING) self._transport = transport def datagram_received(self, data: bytes, addr): + """Callback when data is received.""" logger.debug(f"Received {hexlify(data).decode('utf-8')} from {addr}") try: msg = Response.parse(data) @@ -185,22 +203,10 @@ def datagram_received(self, data: bytes, addr): cmd = msg.fields.value.cmd if cmd == "MODBUS_DATA_MSG": for row in msg.fields.value.data: - try: - self._on_raw_coil_value(row.coil_address, row.value) - except NibeException as e: - logger.error(str(e)) + self._on_raw_coil_value(row.coil_address, row.value) elif cmd == "MODBUS_READ_RESP": row = msg.fields.value.data - try: - self._on_raw_coil_value(row.coil_address, row.value) - with suppress(InvalidStateError, CancelledError, KeyError): - self._futures["read"].set_result(None) - except NibeException as e: - with suppress(InvalidStateError, CancelledError, KeyError): - self._futures["read"].set_exception( - CoilReadException(str(e), e) - ) - raise + self._on_raw_coil_value(row.coil_address, row.value) elif cmd == "MODBUS_WRITE_RESP": with suppress(InvalidStateError, CancelledError, KeyError): self._futures["write"].set_result(msg.fields.value.data.result) @@ -239,7 +245,7 @@ async def read_product_info( finally: del self._futures["product_info"] - async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> CoilData: async with self._send_lock: assert self._transport, "Transport is closed" data = Request.build( @@ -252,7 +258,7 @@ async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: ) ) - self._futures["read"] = asyncio.get_event_loop().create_future() + future = self._register_coil_read_request(coil) logger.debug( f"Sending {hexlify(data).decode('utf-8')} (read request) to {self._remote_ip}:{self._remote_write_port}" @@ -261,26 +267,67 @@ async def read_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: try: self._transport.sendto(data, (self._remote_ip, self._remote_read_port)) except socket.gaierror: - raise CoilReadSendException( - f"Unable to lookup hostname: {self._remote_ip}" - ) + raise ReadSendException(f"Unable to lookup hostname: {self._remote_ip}") logger.debug(f"Waiting for read response for {coil.name}") try: - await asyncio.wait_for(self._futures["read"], timeout) + return await asyncio.wait_for(future, timeout) except asyncio.TimeoutError: - raise CoilReadTimeoutException( + raise ReadTimeoutException( f"Timeout waiting for read response for {coil.name}" ) - finally: - del self._futures["read"] + except DecodeException as e: + raise ReadException( + f"Failed decoding response for {coil.name}: {e}" + ) from e + + def _register_coil_read_request(self, coil: Coil) -> Future: + read = self._registered_reads.get(str(coil.address)) + if read is not None and not read.future.done(): + return read.future - return coil + future = asyncio.get_event_loop().create_future() + self._registered_reads[str(coil.address)] = CoilAction(coil, future) + return future + + def _on_coil_read_success(self, coil_data): + logger.info(coil_data) + + read = self._registered_reads.get(str(coil_data.coil.address)) + if read is not None and not read.future.done(): + read.future.set_result(coil_data) + + self._heatpump.notify_coil_update(coil_data) + + def _on_coil_read_error( + self, coil_address, value: Union[bytes, float, int, str], exception: Exception + ): + if coil_address == 65535: # 0xffff + return + + read = self._registered_reads.get(str(coil_address)) + if read is not None and not read.future.done(): + read.future.set_exception(exception) + + if isinstance(exception, CoilNotFoundException): + logger.warning(f"Ignoring coil {coil_address} - coil definition not found") + elif isinstance(exception, DecodeException): + str_value = ( + hexlify(value).decode("utf-8") if isinstance(value, bytes) else value + ) + logger.warning( + f"Ignoring coil {coil_address} value {str_value} - failed to decode" + ) + elif isinstance(exception, NibeException): + logger.error(f"Failed handling read for {coil_address}: {exception}") - async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil: + async def write_coil( + self, coil_data: CoilData, timeout: float = DEFAULT_TIMEOUT + ) -> None: + coil = coil_data.coil assert coil.is_writable, f"{coil.name} is not writable" - assert coil.value is not None, f"{coil.name} value must be set" + assert coil_data.value is not None, f"{coil.name} value must be set" async with self._send_lock: assert self._transport, "Transport is closed" data = Request.build( @@ -290,7 +337,7 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil cmd="MODBUS_WRITE_REQ", data=dict( coil_address=coil.address, - value=self.coil_encoder.encode(coil), + value=self.coil_encoder.encode(coil_data), ), ) ) @@ -316,27 +363,28 @@ async def write_coil(self, coil: Coil, timeout: float = DEFAULT_TIMEOUT) -> Coil result = self._futures["write"].result() if not result: - raise CoilWriteException(f"Heatpump denied writing {coil.name}") + raise WriteException(f"Heatpump denied writing {coil.name}") else: logger.info(f"Write succeeded for {coil.name}") except asyncio.TimeoutError: - raise CoilWriteTimeoutException( + raise WriteTimeoutException( f"Timeout waiting for write feedback for {coil.name}" ) finally: del self._futures["write"] - return coil - def error_received(self, exc): + """Handle errors from the transport""" logger.error(exc) @property def status(self) -> ConnectionStatus: + """Get the current connection status""" return self._status @property - def remote_ip(self) -> str: + def remote_ip(self) -> Optional[str]: + """Get the remote IP address""" return self._remote_ip def _set_status(self, status: ConnectionStatus): @@ -382,29 +430,15 @@ def _on_rmu_data(self, value: Container): if coil_address := ADDRESS_TO_ROOM_TEMP_COIL.get(value.address): self._on_coil_value(coil_address, data.bt50_room_temp_sX) - def _on_raw_coil_value(self, coil_address: int, raw_value: bytes): + def _on_raw_coil_value(self, coil_address: int, raw_value: bytes) -> None: try: coil = self._heatpump.get_coil_by_address(coil_address) - coil.value = self.coil_encoder.decode(coil, raw_value) - - # coil.raw_value = raw_value - logger.info(f"{coil.name}: {coil.value}") - self._heatpump.notify_coil_update(coil) - except CoilNotFoundException: - if coil_address == 65535: # 0xffff - return - - logger.warning( - f"Ignoring coil {coil_address} value {hexlify(raw_value).decode('utf-8')} - coil definition not found" - ) - return - except DecodeException: - logger.warning( - f"Ignoring coil {coil_address} value {hexlify(raw_value).decode('utf-8')} - failed to decode" - ) - return + coil_data = self.coil_encoder.decode(coil, raw_value) + self._on_coil_read_success(coil_data) + except NibeException as e: + self._on_coil_read_error(coil_address, raw_value, e) - def _on_coil_value(self, coil_address: int, value: Union[float, int, str]): + def _on_coil_value(self, coil_address: int, value: Union[float, int, str]) -> None: try: coil = self._heatpump.get_coil_by_address(coil_address) @@ -417,31 +451,19 @@ def _on_coil_value(self, coil_address: int, value: Union[float, int, str]): if coil.has_mappings and isinstance(value, int): value = coil.get_mapping_for(value) - coil.value = value - logger.info(f"{coil.name}: {coil.value}") - self._heatpump.notify_coil_update(coil) - - except CoilNotFoundException: - if coil_address == 65535: # 0xffff - return - - logger.warning( - f"Ignoring coil {coil_address} value {value} - coil definition not found" - ) - return - except DecodeException: - logger.warning( - f"Ignoring coil {coil_address} value {value} - failed to decode value" - ) - return + coil_data = CoilData(coil, value) + logger.info(coil_data) + self._on_coil_read_success(coil_data) + except NibeException as e: + self._on_coil_read_error(coil_address, value, e) async def verify_connectivity(self): - """Verify that we have functioning communication.""" await verify_connectivity_read_write_alarm(self, self._heatpump) async def stop(self): - self._transport.close() - self._transport = None + if self._transport: + self._transport.close() + self._transport = None await asyncio.sleep(0) self._set_status(ConnectionStatus.DISCONNECTED) diff --git a/nibe/console_scripts/cli.py b/nibe/console_scripts/cli.py index 9cd69ed..5bb3eb8 100644 --- a/nibe/console_scripts/cli.py +++ b/nibe/console_scripts/cli.py @@ -10,7 +10,7 @@ import asyncclick as click from construct import Const, GreedyRange, Int8ul, RawCopy, Select, Struct, Terminated -from ..coil import Coil +from ..coil import CoilData from ..connection import Connection from ..connection.modbus import Modbus from ..connection.nibegw import NibeGW, Request, Response @@ -113,7 +113,6 @@ async def nibegw( async def modbus( ctx: click.Context, remote_ip: str, remote_port: str, model: str, slave_id: int ): - heatpump = HeatPump(Model[model]) await heatpump.initialize() connection = Modbus( @@ -133,8 +132,8 @@ def add_connect_command(command: click.Command): @click.command(help="Monitor data sent by pump out of band") @click.pass_obj async def monitor(obj: ConnectionContext): - def on_coil_update(coil: Coil): - click.echo(f"{coil.name}: {coil.value}") + def on_coil_update(coil_data: CoilData): + click.echo(coil_data) obj.heatpump.subscribe(HeatPump.COIL_UPDATE_EVENT, on_coil_update) @@ -172,11 +171,12 @@ async def read(obj: ConnectionContext, parameter: int, **kwargs): @click.argument("value", type=str) async def write(obj: ConnectionContext, parameter: int, value: str, **kwargs): coil = obj.heatpump.get_coil_by_address(parameter) - if coil.mappings: - coil.value = value - else: - coil.value = float(value) - click.echo(await obj.connection.write_coil(coil)) + if not coil.mappings: + value = float(value) + + coil_data = CoilData(coil, value) + await obj.connection.write_coil(coil_data) + click.echo(coil_data) add_connect_command(read) @@ -211,9 +211,7 @@ def parse_stream(stream: io.RawIOBase): @cli.command() @click.argument("file", type=click.File()) def parse_file(file: IO): - with io.BytesIO(bytes(read_bytes_socat(file))) as stream: - for packet in parse_stream(stream): click.echo(packet.fields.value) diff --git a/nibe/console_scripts/convert_csv.py b/nibe/console_scripts/convert_csv.py index f32c701..d70aec2 100644 --- a/nibe/console_scripts/convert_csv.py +++ b/nibe/console_scripts/convert_csv.py @@ -22,6 +22,8 @@ def update_dict(d: MutableMapping, u: Mapping, removeExplicitNulls: bool) -> Map else: d[k] = v + return d + class CSVConverter: data: pandas.DataFrame @@ -262,7 +264,7 @@ async def run(): logger.info(f"Converting {in_file} to {out_file}") try: CSVConverter(in_file, out_file, extensions).convert() - + await _validate(out_file) logger.info(f"Converted {in_file} to {out_file}") diff --git a/nibe/event_server.py b/nibe/event_server.py index 04af8e5..126e7ea 100644 --- a/nibe/event_server.py +++ b/nibe/event_server.py @@ -12,6 +12,7 @@ def __init__(self): self._listeners = defaultdict(list) def notify_event_listeners(self, event_name: str, *args, **kwargs): + """Notify all listeners of an event""" for listener in self._listeners[event_name]: try: listener(*args, **kwargs) @@ -19,4 +20,5 @@ def notify_event_listeners(self, event_name: str, *args, **kwargs): logger.exception(e) def subscribe(self, event_name: str, callback: Callable[..., None]): + """Subscribe to an event""" self._listeners[event_name].append(callback) diff --git a/nibe/exceptions.py b/nibe/exceptions.py index 14a717a..1450dd2 100644 --- a/nibe/exceptions.py +++ b/nibe/exceptions.py @@ -17,7 +17,11 @@ class DecodeException(NibeException): pass -class NoMappingException(DecodeException): +class ValidationError(NibeException): + pass + + +class NoMappingException(ValidationError): pass @@ -29,15 +33,15 @@ class WriteException(NibeException): pass -class CoilWriteException(WriteException): +class WriteIOException(WriteException): pass -class CoilWriteSendException(CoilWriteException): +class CoilWriteSendException(WriteIOException): pass -class CoilWriteTimeoutException(CoilWriteException): +class WriteTimeoutException(WriteIOException): pass @@ -45,29 +49,25 @@ class ReadException(NibeException): pass -class CoilReadException(ReadException): +class ReadIOException(ReadException): pass -class CoilReadExceptionGroup(ExceptionGroup, CoilReadException): +class ReadExceptionGroup(ExceptionGroup, ReadIOException): def __str__(self) -> str: messages = ", ".join(str(exception) for exception in self._exceptions) return f"{self.message} ({messages})" -class CoilReadSendException(CoilReadException): - pass - - -class CoilReadTimeoutException(CoilReadException): +class ReadSendException(ReadIOException): pass -class ProductInfoReadException(ReadException): +class ReadTimeoutException(ReadIOException): pass -class ProductInfoReadTimeoutException(ProductInfoReadException): +class ProductInfoReadTimeoutException(ReadIOException): pass diff --git a/nibe/heatpump.py b/nibe/heatpump.py index bbe05bc..9eb5953 100644 --- a/nibe/heatpump.py +++ b/nibe/heatpump.py @@ -5,9 +5,9 @@ import json import logging from os import PathLike -from typing import Dict, Union +from typing import Dict, Optional, Union -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.event_server import EventServer from nibe.exceptions import CoilNotFoundException, ModelIdentificationFailed @@ -15,12 +15,16 @@ class Series(Enum): + """Series enum class""" + CUSTOM = auto() F = auto() S = auto() class Model(Enum): + """Model enum class""" + F1155 = "f1155_f1255", Series.F F1255 = "f1155_f1255", Series.F @@ -70,6 +74,7 @@ def __new__(cls, data_file: str, series: Series): return obj def get_coil_data(self): + """Get coil data for model""" if self == Model.CUSTOM: with open(self.data_file) as fh: return json.load(fh) @@ -79,15 +84,21 @@ def get_coil_data(self): @classmethod def keys(cls): + """Get all keys of the enum class""" return cls.__members__.keys() @dataclass class ProductInfo: + """Product info class""" + model: str firmware_version: int def identify_model(self) -> Model: + """Identify model from product info + + :raises ModelIdentificationFailed: When model cannot be identified""" for key in Model.keys(): if key in self.model.upper(): return getattr(Model, key) @@ -96,15 +107,17 @@ def identify_model(self) -> Model: class HeatPump(EventServer): + """Heat pump class""" + COIL_UPDATE_EVENT = "coil_update" _address_to_coil: Dict[str, Coil] _name_to_coil: Dict[str, Coil] word_swap: bool = True _product_info: Union[ProductInfo, None] = None - _model: Union[Model, None] = None + _model: Optional[Model] = None - def __init__(self, model: Model = None): + def __init__(self, model: Optional[Model] = None): super().__init__() self._address_to_coil = {} @@ -115,25 +128,30 @@ def __init__(self, model: Model = None): @property def model(self) -> Union[Model, None]: + """Returns the model of the heat pump""" return self._model @model.setter def model(self, model: Model): + """Sets the model of the heat pump""" assert isinstance(model, Model), "Passed argument is not of a Model type" self._model = model @property def series(self) -> Series: + """Returns the series of the heat pump""" assert self._model return self._model.series @property def product_info(self) -> Union[ProductInfo, None]: + """Returns the product info of the heat pump""" return self._product_info @product_info.setter def product_info(self, product_info: ProductInfo): + """Sets the product info of the heat pump""" assert isinstance( product_info, ProductInfo ), "Passed argument is not of a ProductInfo type" @@ -141,6 +159,7 @@ def product_info(self, product_info: ProductInfo): self._product_info = product_info async def _load_coils(self): + assert isinstance(self._model, Model), "Model is not set" data = await asyncio.get_running_loop().run_in_executor( None, self._model.get_coil_data ) @@ -158,6 +177,7 @@ def _make_coil(self, address: int, **kwargs): return Coil(address, **kwargs) async def initialize(self): + """Initialize the heat pump""" if not isinstance(self._model, Model) and isinstance( self._product_info, ProductInfo ): @@ -170,19 +190,29 @@ async def initialize(self): await self._load_coils() def get_coils(self) -> list[Coil]: + """Returns a list of all coils""" return list(self._address_to_coil.values()) def get_coil_by_address(self, address: Union[int, str]) -> Coil: + """Returns a coil by address + + :raises CoilNotFoundException: if coil is not found + """ try: return self._address_to_coil[str(address)] except KeyError: raise CoilNotFoundException(f"Coil with address {address} not found") def get_coil_by_name(self, name: str) -> Coil: + """Returns a coil by name + + :raises CoilNotFoundException: if coil is not found + """ try: return self._name_to_coil[str(name)] except KeyError: raise CoilNotFoundException(f"Coil with name '{name}' not found") - def notify_coil_update(self, coil: Coil): - self.notify_event_listeners(self.COIL_UPDATE_EVENT, coil) + def notify_coil_update(self, coil_data: CoilData): + """Notifies listeners about coil update""" + self.notify_event_listeners(self.COIL_UPDATE_EVENT, coil_data) diff --git a/tests/connection/test_init.py b/tests/connection/test_init.py index ecce843..bdf96f8 100644 --- a/tests/connection/test_init.py +++ b/tests/connection/test_init.py @@ -1,8 +1,8 @@ from pytest import raises -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.connection import Connection -from nibe.exceptions import CoilReadException, CoilWriteException +from nibe.exceptions import ReadExceptionGroup, ReadIOException, WriteIOException async def test_read_coils(): @@ -11,22 +11,22 @@ async def test_read_coils(): coil3 = Coil(231, "test3", "test", "u8") class MyConnection(Connection): - async def read_coil(self, coil: Coil, timeout: float = ...) -> Coil: + async def read_coil(self, coil: Coil, timeout: float = ...) -> CoilData: if coil is coil2: - raise CoilReadException(f"{coil.address}") - return coil + raise ReadIOException(f"{coil.address}") + return CoilData(coil, 1) async def verify_connectivity(self): return True - async def write_coil(self, coil: Coil, timeout: float = ...) -> Coil: - raise CoilWriteException() + async def write_coil(self, coil_data: CoilData, timeout: float = ...) -> None: + raise WriteIOException() connection = MyConnection() - result = [] - with raises(CoilReadException) as excinfo: - async for coil in connection.read_coils([coil1, coil2, coil3]): - result.append(coil) + coils = [] + with raises(ReadExceptionGroup) as excinfo: + async for coil_data in connection.read_coils([coil1, coil2, coil3]): + coils.append(coil_data.coil) assert str(excinfo.value) == "Failed to read some or all coils (231)" - assert result == [coil1, coil3] + assert coils == [coil1, coil3] diff --git a/tests/connection/test_modbus.py b/tests/connection/test_modbus.py index 81b4bf1..594656a 100644 --- a/tests/connection/test_modbus.py +++ b/tests/connection/test_modbus.py @@ -4,7 +4,7 @@ from async_modbus import AsyncClient import pytest -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.connection.modbus import Modbus from nibe.heatpump import HeatPump, Model @@ -50,8 +50,8 @@ async def test_read_holding_register_coil( ): coil = Coil(40001, "test", "test", size, 1) modbus_client.read_holding_registers.return_value = raw - coil = await connection.read_coil(coil) - assert coil.value == value + coil_data = await connection.read_coil(coil) + assert coil_data.value == value modbus_client.read_holding_registers.assert_called() @@ -74,8 +74,8 @@ async def test_write_holding_register( value: Union[int, float, str], ): coil = Coil(40002, "test", "test", size, 1, write=True) - coil.value = value - coil = await connection.write_coil(coil) + coil_data = CoilData(coil, value) + await connection.write_coil(coil_data) modbus_client.write_registers.assert_called_with( slave_id=0, starting_address=1, values=raw ) @@ -102,8 +102,8 @@ async def test_read_input_register_coil( ): coil = Coil(30001, "test", "test", size, 1) modbus_client.read_input_registers.return_value = raw - coil = await connection.read_coil(coil) - assert coil.value == value + coil_data = await connection.read_coil(coil) + assert coil_data.value == value modbus_client.read_input_registers.assert_called() @@ -124,8 +124,8 @@ async def test_read_discrete_input_coil( ): coil = Coil(10001, "test", "test", size, 1) modbus_client.read_discrete_inputs.return_value = raw - coil = await connection.read_coil(coil) - assert coil.value == value + coil_data = await connection.read_coil(coil) + assert coil_data.value == value modbus_client.read_discrete_inputs.assert_called() @@ -146,8 +146,8 @@ async def test_read_coil_coil( ): coil = Coil(1, "test", "test", size, 1) modbus_client.read_coils.return_value = raw - coil = await connection.read_coil(coil) - assert coil.value == value + coil_data = await connection.read_coil(coil) + assert coil_data.value == value modbus_client.read_coils.assert_called() @@ -166,8 +166,8 @@ async def test_write_coil_coil( value: Union[int, float, str], ): coil = Coil(2, "test", "test", size, 1, write=True) - coil.value = value - coil = await connection.write_coil(coil) + coil_data = CoilData(coil, value) + await connection.write_coil(coil_data) modbus_client.write_coils.assert_called_with( slave_id=0, starting_address=1, values=raw ) diff --git a/tests/connection/test_nibegw.py b/tests/connection/test_nibegw.py index f9f1203..fcd0822 100644 --- a/tests/connection/test_nibegw.py +++ b/tests/connection/test_nibegw.py @@ -1,12 +1,14 @@ import asyncio import binascii +import time from unittest import IsolatedAsyncioTestCase from unittest.mock import Mock import pytest +from nibe.coil import CoilData from nibe.connection.nibegw import ConnectionStatus, NibeGW -from nibe.exceptions import CoilReadTimeoutException +from nibe.exceptions import ReadException, ReadTimeoutException, WriteException from nibe.heatpump import HeatPump, Model, ProductInfo @@ -32,16 +34,9 @@ async def test_status(self): coil = self.heatpump.get_coil_by_address(43424) - async def send_receive(): - task = self.loop.create_task(self.nibegw.read_coil(coil)) - await asyncio.sleep(0) - self.nibegw.datagram_received( - binascii.unhexlify("5c00206a06a0a9f5120000a2"), ("127.0.0.1", 12345) - ) + self._enqueue_datagram(binascii.unhexlify("5c00206a06a0a9f5120000a2")) - return await task - - await send_receive() + await self.nibegw.read_coil(coil) assert self.nibegw.status == "connected" connection_status_handler_mock.assert_called_once_with( @@ -49,81 +44,81 @@ async def send_receive(): ) connection_status_handler_mock.reset_mock() - await send_receive() + self._enqueue_datagram(binascii.unhexlify("5c00206a06a0a9f5120000a2")) + await self.nibegw.read_coil(coil) connection_status_handler_mock.assert_not_called() + def _enqueue_datagram(self, data): + asyncio.get_event_loop().call_soon( + self.nibegw.datagram_received, data, ("127.0.0.1", 12345) + ) + async def test_read_s32_coil(self): coil = self.heatpump.get_coil_by_address(43424) - async def send_receive(): - task = self.loop.create_task(self.nibegw.read_coil(coil)) - await asyncio.sleep(0) - self.nibegw.datagram_received( - binascii.unhexlify("5c00206a06a0a9f5120000a2"), ("127.0.0.1", 12345) - ) + self._enqueue_datagram(binascii.unhexlify("5c00206a06a0a9f5120000a2")) - return await task + coil_data = await self.nibegw.read_coil(coil) + assert coil_data.value == 4853 - coil = await send_receive() - assert coil.value == 4853 - - self.transport.sendto.assert_called_with( + self.transport.sendto.assert_called_once_with( binascii.unhexlify("c06902a0a9a2"), ("127.0.0.1", 9999) ) - async def test_read_coil_decode_ignored(self): + async def test_read_coil_decode_failed(self): coil = self.heatpump.get_coil_by_address(43086) - coil.value = "HEAT" - async def send_receive(): - task = self.loop.create_task(self.nibegw.read_coil(coil)) - await asyncio.sleep(0) - self.nibegw.datagram_received( - binascii.unhexlify("5c00206a064ea8f51200004d"), ("127.0.0.1", 12345) - ) + self._enqueue_datagram(binascii.unhexlify("5c00206a064ea8f51200004d")) - return await task + start = time.time() + with pytest.raises(ReadException) as excinfo: + await self.nibegw.read_coil(coil, timeout=0.1) + assert "Decode failed" in str(excinfo.value) + assert 1 == self.transport.sendto.call_count + duration = time.time() - start + assert duration <= 0.1 - await send_receive() - assert "HEAT" == coil.value - - async def test_read_coil_timeout_exception(self): + async def test_read_coil_timeout(self): coil = self.heatpump.get_coil_by_address(43086) - with pytest.raises(CoilReadTimeoutException): - await self.nibegw.read_coil(coil, 0.1) + start = time.time() + with pytest.raises(ReadTimeoutException): + await self.nibegw.read_coil(coil, timeout=0.1) + duration = time.time() - start + assert ( + 0.3 <= duration <= 0.4 + ), "Timeout should be between 0.3 and 0.4 seconds. We do 3 retries" + assert 3 == self.transport.sendto.call_count, "Should do 3 retries" + self.transport.sendto.assert_called_with( + b"\xc0i\x02N\xa8M", ("127.0.0.1", 9999) + ) async def test_write_coil(self): coil = self.heatpump.get_coil_by_address(48132) - coil.value = "One time increase" - - async def send_receive(): - task = self.loop.create_task(self.nibegw.write_coil(coil)) - await asyncio.sleep(0) - self.nibegw.datagram_received( - binascii.unhexlify("5c00206c01014c"), ("127.0.0.1", 12345) - ) + coil_data = CoilData(coil, "One time increase") - return await task + self._enqueue_datagram(binascii.unhexlify("5c00206c01014c")) + await self.nibegw.write_coil(coil_data) - coil = await send_receive() - - self.transport.sendto.assert_called_with( + self.transport.sendto.assert_called_once_with( binascii.unhexlify("c06b0604bc0400000011"), ("127.0.0.1", 10000) ) - async def test_read_product_info(self): - async def read_product_info(): - task = self.loop.create_task(self.nibegw.read_product_info()) - await asyncio.sleep(0) - self.nibegw.datagram_received( - binascii.unhexlify("5c00206d0d0124e346313235352d313220529f"), - ("127.0.0.1", 12345), - ) + async def test_write_coil_failed(self): + coil = self.heatpump.get_coil_by_address(48132) + coil_data = CoilData(coil, "One time increase") - return await task + self._enqueue_datagram(binascii.unhexlify("5c00206c01004d")) + with pytest.raises(WriteException): + await self.nibegw.write_coil(coil_data) - product = await read_product_info() + assert 1 == self.transport.sendto.call_count, "Should only send once, no retry" + + async def test_read_product_info(self): + self._enqueue_datagram( + binascii.unhexlify("5c00206d0d0124e346313235352d313220529f") + ) + product = await self.nibegw.read_product_info() assert isinstance(product, ProductInfo) assert "F1255-12 R" == product.model @@ -138,7 +133,9 @@ async def test_read_multiple_with_u32(self): ("127.0.0.1", 12345), ) - for address in [45001, 43514]: # first and last in the payload - on_coil_update_mock.assert_any_call( - self.heatpump.get_coil_by_address(address) - ) + on_coil_update_mock.assert_any_call( + CoilData(self.heatpump.get_coil_by_address(45001), 0.0) + ) + on_coil_update_mock.assert_any_call( + CoilData(self.heatpump.get_coil_by_address(43514), 2.0) + ) diff --git a/tests/test_coil.py b/tests/test_coil.py index bc2ae84..b1f3f17 100644 --- a/tests/test_coil.py +++ b/tests/test_coil.py @@ -1,338 +1,485 @@ -from unittest import TestCase +from contextlib import nullcontext import pytest -from nibe.coil import Coil +from nibe.coil import Coil, CoilData from nibe.connection.nibegw import CoilDataEncoder -from nibe.exceptions import DecodeException, EncodeException +from nibe.exceptions import DecodeException, EncodeException, ValidationError from nibe.parsers import swapwords -class TestWordSwap(TestCase): - def test_swapwords(self): - assert swapwords(b"abcd") == b"cdab" - assert swapwords(b"ab") == b"ab" +def test_swapwords(): + assert swapwords(b"abcd") == b"cdab" + assert swapwords(b"ab") == b"ab" -class TestCoil(TestCase): - def test_create(self): - coil = Coil(123, "test_name", "test_title", "u8", unknown="some other") +def test_create(): + coil = Coil(123, "test_name", "test_title", "u8", unknown="some other") - assert coil.address == 123 - assert coil.name == "test_name" - assert coil.title == "test_title" - assert coil.other["unknown"] == "some other" - - -class TestCoilSigned8(TestCase): - def setUp(self) -> None: - self.coil = Coil(48739, "cool-offset-s1-48739", "Cool offset S1", "s8") - self.encoder = CoilDataEncoder() - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\xfc\x00\x00\x00") == -4 - assert self.encoder.decode(self.coil, b"\xfc\x00") == -4 - assert self.encoder.decode(self.coil, b"\xfc") == -4 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\x80") is None - - def test_encode(self): - self.coil.value = -4 - assert self.encoder.encode(self.coil) == b"\xfc\x00\x00\x00" - - with pytest.raises(EncodeException): - self.coil.value = 256 - self.encoder.encode(self.coil) - - def test_encode_unavailable(self): - self.coil.value = None - with pytest.raises(EncodeException): - self.encoder.encode(self.coil) - - -class TestCoilUnsigned8(TestCase): - def setUp(self) -> None: - self.coil = Coil(123, "test", "test", "u8") - self.encoder = CoilDataEncoder() - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x01\x00\x00\x00") == 1 - assert self.encoder.decode(self.coil, b"\x01\x00") == 1 - assert self.encoder.decode(self.coil, b"\x01") == 1 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\xff") is None - assert self.encoder.decode(self.coil, b"\xff\xff") is None - - def test_encode(self): - self.coil.value = 1 - assert self.encoder.encode(self.coil) == b"\x01\x00\x00\x00" - self.coil.value = 255 - assert self.encoder.encode(self.coil) == b"\xff\x00\x00\x00" - - with pytest.raises(EncodeException): - self.coil.value = 256 - self.encoder.encode(self.coil) - - -class TestCoilUnsigned8WordSwap(TestCase): - def setUp(self) -> None: - self.coil = Coil(123, "test", "test", "u8", word_swap=False) - self.encoder = CoilDataEncoder(word_swap=False) - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x01\x00\x00\x00") == 1 - assert self.encoder.decode(self.coil, b"\x01\x00") == 1 - assert self.encoder.decode(self.coil, b"\x01") == 1 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\xff") is None - assert self.encoder.decode(self.coil, b"\xff\xff") is None - - def test_encode(self): - self.coil.value = 1 - assert self.encoder.encode(self.coil) == b"\x01\x00\x00\x00" - self.coil.value = 255 - assert self.encoder.encode(self.coil) == b"\xff\x00\x00\x00" - - with pytest.raises(EncodeException): - self.coil.value = 256 - self.encoder.encode(self.coil) - - -class TestCoilSigned16(TestCase): - def setUp(self) -> None: - self.coil = Coil(123, "test", "test", "s16", factor=10, min=50, max=300) - self.encoder = CoilDataEncoder() - - def test_attributes(self): - assert self.coil.min == 5.0 - assert self.coil.max == 30.0 - - assert self.coil.raw_min == 50 - assert self.coil.raw_max == 300 - - assert not self.coil.is_boolean - assert not self.coil.is_writable - - def test_set_value_bounds(self): - self.coil.value = 5.0 - self.coil.value = 30 - - with pytest.raises(AssertionError): - self.coil.value = 4.9 - - with pytest.raises(AssertionError): - self.coil.value = 30.1 - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x97\x00") == 15.1 - - def test_decode_out_of_bounds(self): - with pytest.raises(DecodeException): - self.encoder.decode(self.coil, b"\x31\x00") - - with pytest.raises(DecodeException): - self.encoder.decode(self.coil, b"\x2d\x10") - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\x00\x80") is None - - def test_encode(self): - self.coil.value = 15.1 - assert self.encoder.encode(self.coil) == b"\x97\x00\x00\x00" - - def test_encode_out_of_bounds(self): - with pytest.raises(AssertionError): - self.coil.value = 4 - - with pytest.raises(AssertionError): - self.coil.value = 30.1 - - -class TestCoilUnsigned16(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 123, - "compressor-frequency-actual-43136", - "Compressor Frequency, Actual", - "u16", - factor=10, - ) - self.encoder = CoilDataEncoder() - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x01\x00\x00\x00") == 0.1 - assert self.encoder.decode(self.coil, b"\x01\x00") == 0.1 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\xff\xff") is None - - def test_encode(self): - self.coil.value = 0.1 - assert self.encoder.encode(self.coil) == b"\x01\x00\x00\x00" - self.coil.value = 25.5 - assert self.encoder.encode(self.coil) == b"\xff\x00\x00\x00" - - -class TestCoilUnsigned16WordSwap(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 123, - "compressor-frequency-actual-43136", - "Compressor Frequency, Actual", - "u16", - factor=10, - word_swap=False, - ) - self.encoder = CoilDataEncoder(word_swap=False) - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x01\x00\x00\x00") == 0.1 - assert self.encoder.decode(self.coil, b"\x01\x00") == 0.1 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\xff\xff") is None - - def test_encode(self): - self.coil.value = 0.1 - assert self.encoder.encode(self.coil) == b"\x01\x00\x00\x00" - self.coil.value = 25.5 - assert self.encoder.encode(self.coil) == b"\xff\x00\x00\x00" - - -class TestCoilSigned32(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 43420, - "tot-op-time-compr-eb100-ep14-43420", - "Total compressorer operation time", - "s32", - ) - self.encoder = CoilDataEncoder() - - def test_decode(self): - assert self.encoder.decode(self.coil, b"2T\x00\x00") == 21554 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\x00\x00\x00\x80") is None - - def test_encode(self): - self.coil.value = 21554 - assert self.encoder.encode(self.coil) == b"2T\x00\x00" - - -class TestCoilSigned32WordSwap(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 43420, - "tot-op-time-compr-eb100-ep14-43420", - "Total compressorer operation time", - "s32", - word_swap=False, - ) - self.encoder = CoilDataEncoder(word_swap=False) - - def test_decode(self): - assert self.encoder.decode(self.coil, b"\x00\x00(\x06") == 1576 - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\x00\x80\x00\x00") is None - - def test_encode(self): - self.coil.value = 1576 - assert self.encoder.encode(self.coil) == b"\x00\x00(\x06" - - -class TestCoilWithMapping(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 123, - "prio-43086", - "Prio", - "u8", - factor=1, - mappings={ - "10": "Off", - "20": "Hot Water", - "30": "Heat", - "40": "Pool", - "41": "Pool 2", - "50": "Transfer", - "60": "Cooling", - }, - ) - self.encoder = CoilDataEncoder() - - def test_set_valid_value(self): - self.coil.value = "off" - - assert self.coil.value == "OFF" - - def test_set_invalid_value(self): - with pytest.raises(AssertionError): - self.coil.value = "Beer" - - def test_decode_mapping(self): - assert self.encoder.decode(self.coil, b"\x0a") == "OFF" - - def test_decode_unavailable(self): - assert self.encoder.decode(self.coil, b"\xff\xff") is None - - def test_encode_mapping(self): - self.coil.value = "off" - assert self.encoder.encode(self.coil) == b"\x0a\x00\x00\x00" - - def test_decode_mapping_failure(self): - with pytest.raises(DecodeException): - assert self.encoder.decode(self.coil, b"\x00") - - def test_encode_mapping_failure(self): - with pytest.raises(AssertionError): - self.coil.value = "Unknown" - - -class TestBooleanCoilWithMapping(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 43024, - "status-cooling-43024", - "Status Cooling", - "u8", - factor=1, - mappings={"0": "Off", "1": "On"}, - ) - self.encoder = CoilDataEncoder() - - def test_attributes(self): - assert self.coil.is_boolean - - def test_set_valid_value(self): - self.coil.value = "On" - assert "ON" == self.coil.value - - self.coil.value = "ofF" - assert "OFF" == self.coil.value - - -class TestBooleanCoilWithBounds(TestCase): - def setUp(self) -> None: - self.coil = Coil( - 47050, - "status-cooling-43024", - "Periodic HW", - "s8", - factor=1, - min=0, - max=1, - write=True, - ) - self.encoder = CoilDataEncoder() - - def test_attributes(self): - assert self.coil.is_boolean - - def test_set_valid_value(self): - self.coil.value = "ON" - self.coil.value = "OFF" + assert coil.address == 123 + assert coil.name == "test_name" + assert coil.title == "test_title" + assert coil.other["unknown"] == "some other" + + +@pytest.fixture +def encoder(): + return CoilDataEncoder() + + +@pytest.fixture +def encoder_word_swap(): + return CoilDataEncoder(word_swap=True) + + +# Signed 8-bit +@pytest.fixture +def coil_signed_s8(): + return Coil(48739, "cool-offset-s1-48739", "Cool offset S1", "s8") + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\xfc\x00\x00\x00", -4), + (b"\xfc\x00", -4), + (b"\xfc", -4), + (b"\x80", None), + ], +) +def test_signed_s8_decode( + raw_value, value, encoder: CoilDataEncoder, coil_signed_s8: Coil +): + assert encoder.decode(coil_signed_s8, raw_value) == CoilData(coil_signed_s8, value) + + +def test_signed_s8_encode(encoder: CoilDataEncoder, coil_signed_s8: Coil): + coil_data = CoilData(coil_signed_s8, -4) + assert encoder.encode(coil_data) == b"\xfc\x00\x00\x00" + + +@pytest.mark.parametrize("value", [(256), (None)]) +def test_signed_s8_encode_exceptions( + value, encoder: CoilDataEncoder, coil_signed_s8: Coil +): + with pytest.raises(EncodeException): + coil_data = CoilData(coil_signed_s8, value) + encoder.encode(coil_data) + + +# Unsigned 8-bit +@pytest.fixture +def coil_unsigned_u8(): + return Coil(123, "test", "test", "u8") + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x01\x00\x00\x00", 1), + (b"\x01\x00", 1), + (b"\x01", 1), + (b"\xff", None), + (b"\xff\xff", None), + ], +) +def test_unsigned_s8_decode( + raw_value, value, encoder: CoilDataEncoder, coil_unsigned_u8: Coil +): + assert encoder.decode(coil_unsigned_u8, raw_value) == CoilData( + coil_unsigned_u8, value + ) + + +@pytest.mark.parametrize( + "value, raw_value", + [ + (1, b"\x01\x00\x00\x00"), + (255, b"\xff\x00\x00\x00"), + ], +) +def test_unsigned_s8_encode( + value, raw_value, encoder: CoilDataEncoder, coil_unsigned_u8: Coil +): + coil_data = CoilData(coil_unsigned_u8, value) + assert encoder.encode(coil_data) == raw_value + + +def test_unsigned_s8_encode_exception(encoder: CoilDataEncoder, coil_unsigned_u8: Coil): + with pytest.raises(EncodeException): + coil_data = CoilData(coil_unsigned_u8, 256) + encoder.encode(coil_data) + + +# Unsigned 8-bit with word swap +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x01\x00\x00\x00", 1), + (b"\x01\x00", 1), + (b"\x01", 1), + (b"\xff", None), + (b"\xff\xff", None), + ], +) +def test_unsigned_s8_decode_word_swap( + raw_value, value, encoder_word_swap: CoilDataEncoder, coil_unsigned_u8: Coil +): + assert encoder_word_swap.decode(coil_unsigned_u8, raw_value) == CoilData( + coil_unsigned_u8, value + ) + + +@pytest.mark.parametrize( + "value, raw_value", + [ + (1, b"\x01\x00\x00\x00"), + (255, b"\xff\x00\x00\x00"), + ], +) +def test_unsigned_s8_encode_word_swap( + value, raw_value, encoder_word_swap: CoilDataEncoder, coil_unsigned_u8: Coil +): + coil_data = CoilData(coil_unsigned_u8, value) + assert encoder_word_swap.encode(coil_data) == raw_value + + +def test_unsigned_s8_encode_exception_word_swap( + encoder_word_swap: CoilDataEncoder, coil_unsigned_u8: Coil +): + with pytest.raises(EncodeException): + coil_data = CoilData(coil_unsigned_u8, 256) + encoder_word_swap.encode(coil_data) + + +# Signed 16-bit +@pytest.fixture +def coil_signed_s16(): + return Coil(123, "test", "test", "s16", factor=10, min=50, max=300) + + +def test_signed_s16_attributes(coil_signed_s16: Coil): + assert coil_signed_s16.min == 5.0 + assert coil_signed_s16.max == 30.0 + + assert coil_signed_s16.raw_min == 50 + assert coil_signed_s16.raw_max == 300 + + assert not coil_signed_s16.is_boolean + assert not coil_signed_s16.is_writable + + +@pytest.mark.parametrize( + "value, expected_raises", + [ + (5.0, nullcontext()), + (30.0, nullcontext()), + (4.9, pytest.raises(ValidationError)), + (30.1, pytest.raises(ValidationError)), + ], +) +def test_signed_s16_is_valid(value, expected_raises, coil_signed_s16: Coil): + coil_data = CoilData(coil_signed_s16, value) + with expected_raises: + coil_data.validate() + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x97\x00", 15.1), + (b"\x00\x80", None), + ], +) +def test_signed_s16_decode( + raw_value, value, encoder: CoilDataEncoder, coil_signed_s16: Coil +): + assert encoder.decode(coil_signed_s16, raw_value) == CoilData( + coil_signed_s16, value + ) + + +@pytest.mark.parametrize( + "raw_value", + [ + (b"\x31\x00"), + (b"\x2d\x10"), + ], +) +def test_signed_s16_decode_exception( + raw_value, encoder: CoilDataEncoder, coil_signed_s16: Coil +): + with pytest.raises(DecodeException): + encoder.decode(coil_signed_s16, raw_value) + + +def test_signed_s16_encode(encoder: CoilDataEncoder, coil_signed_s16: Coil): + coil_data = CoilData(coil_signed_s16, 15.1) + assert encoder.encode(coil_data) == b"\x97\x00\x00\x00" + + +@pytest.mark.parametrize( + "value", + [ + (4), + (30.1), + ], +) +def test_signed_s16_encode_exception( + value, encoder: CoilDataEncoder, coil_signed_s16: Coil +): + with pytest.raises(EncodeException): + coil_data = CoilData(coil_signed_s16, value) + encoder.encode(coil_data) + + +# Unsigned 16-bit +@pytest.fixture +def coil_unsigned_u16(): + return Coil( + 123, + "compressor-frequency-actual-43136", + "Compressor Frequency, Actual", + "u16", + factor=10, + ) + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x01\x00\x00\x00", 0.1), + (b"\x01\x00", 0.1), + (b"\xff\xff", None), + ], +) +def test_unsigned_u16_decode( + raw_value, value, encoder: CoilDataEncoder, coil_unsigned_u16: Coil +): + assert encoder.decode(coil_unsigned_u16, raw_value) == CoilData( + coil_unsigned_u16, value + ) + + +@pytest.mark.parametrize( + "value, raw_value", + [ + (0.1, b"\x01\x00\x00\x00"), + (25.5, b"\xff\x00\x00\x00"), + ], +) +def test_unsigned_u16_encode( + value, raw_value, encoder: CoilDataEncoder, coil_unsigned_u16: Coil +): + coil_data = CoilData(coil_unsigned_u16, value) + assert encoder.encode(coil_data) == raw_value + + +# Unsigned 16-bit word swap +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x01\x00\x00\x00", 0.1), + (b"\x01\x00", 0.1), + (b"\xff\xff", None), + ], +) +def test_unsigned_u16_word_swap_decode( + raw_value, value, encoder_word_swap: CoilDataEncoder, coil_unsigned_u16: Coil +): + assert encoder_word_swap.decode(coil_unsigned_u16, raw_value) == CoilData( + coil_unsigned_u16, value + ) + + +@pytest.mark.parametrize( + "value, raw_value", + [ + (0.1, b"\x01\x00\x00\x00"), + (25.5, b"\xff\x00\x00\x00"), + ], +) +def test_unsigned_u16_word_swap_encode( + value, raw_value, encoder_word_swap: CoilDataEncoder, coil_unsigned_u16: Coil +): + coil_data = CoilData(coil_unsigned_u16, value) + assert encoder_word_swap.encode(coil_data) == raw_value + + +# Signed 32-bit +@pytest.fixture +def coil_signed_s32(): + return Coil( + 43420, + "tot-op-time-compr-eb100-ep14-43420", + "Total compressorer operation time", + "s32", + ) + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"2T\x00\x00", 21554), + (b"\x00\x00\x00\x80", None), + ], +) +def test_signed_s32_decode( + raw_value, value, encoder: CoilDataEncoder, coil_signed_s32: Coil +): + assert encoder.decode(coil_signed_s32, raw_value) == CoilData( + coil_signed_s32, value + ) + + +def test_signed_s32_encode(encoder: CoilDataEncoder, coil_signed_s32: Coil): + coil_data = CoilData(coil_signed_s32, 21554) + assert encoder.encode(coil_data) == b"2T\x00\x00" + + +# Signed 32-bit word swap +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"(\x06\x00\x00", 1576), + (b"\x00\x00\x00\x80", None), + ], +) +def test_signed_s32_word_swap_decode( + raw_value, value, encoder_word_swap: CoilDataEncoder, coil_signed_s32: Coil +): + assert encoder_word_swap.decode(coil_signed_s32, raw_value) == CoilData( + coil_signed_s32, value + ) + + +def test_signed_s32_word_swap_encode( + encoder_word_swap: CoilDataEncoder, coil_signed_s32: Coil +): + coil_data = CoilData(coil_signed_s32, 1576) + assert encoder_word_swap.encode(coil_data) == b"(\x06\x00\x00" + + +# Unsigned 8-bit with mapping +@pytest.fixture +def coil_unsigned_u8_mapping(): + return Coil( + 123, + "prio-43086", + "Prio", + "u8", + factor=1, + mappings={ + "10": "Off", + "20": "Hot Water", + "30": "Heat", + "40": "Pool", + "41": "Pool 2", + "50": "Transfer", + "60": "Cooling", + }, + ) + + +@pytest.mark.parametrize( + "raw_value, value", + [ + (b"\x0a", "OFF"), + (b"\xff\xff", None), + ], +) +def test_unsigned_u8_mapping_decode( + raw_value, value, encoder: CoilDataEncoder, coil_unsigned_u8_mapping: Coil +): + assert encoder.decode(coil_unsigned_u8_mapping, raw_value) == CoilData( + coil_unsigned_u8_mapping, value + ) + + +def test_unsigned_u8_mapping_decode_exception( + encoder: CoilDataEncoder, coil_unsigned_u8_mapping: Coil +): + with pytest.raises(DecodeException): + encoder.decode(coil_unsigned_u8_mapping, b"\x00") + + +@pytest.mark.parametrize( + "value, raw_value", + [ + ("OFF", b"\x0a\x00\x00\x00"), + ("off", b"\x0a\x00\x00\x00"), + ("Hot Water", b"\x14\x00\x00\x00"), + ], +) +def test_unsigned_u8_mapping_encode( + value, raw_value, encoder: CoilDataEncoder, coil_unsigned_u8_mapping: Coil +): + coil_data = CoilData(coil_unsigned_u8_mapping, value) + assert encoder.encode(coil_data) == raw_value + + +def test_unsigned_u8_mapping_encode_exception( + encoder: CoilDataEncoder, coil_unsigned_u8_mapping: Coil +): + coil_data = CoilData(coil_unsigned_u8_mapping, "Beer") + with pytest.raises(EncodeException): + encoder.encode(coil_data) + + +# Unsigned 8-bit boolean +@pytest.fixture +def coil_unsigned_u8_boolean(): + return Coil( + 43024, + "status-cooling-43024", + "Status Cooling", + "u8", + factor=1, + mappings={"0": "Off", "1": "On"}, + ) + + +def test_unsigned_u8_boolean(coil_unsigned_u8_boolean: Coil): + assert coil_unsigned_u8_boolean.is_boolean + + +def test_unsigned_u8_encode(encoder: CoilDataEncoder, coil_unsigned_u8_boolean: Coil): + coil_data = CoilData(coil_unsigned_u8_boolean, "On") + assert encoder.encode(coil_data) == b"\x01\x00\x00\x00" + + +# Unsigned 8-bit boolean with bounds +@pytest.fixture +def coil_unsigned_u8_boolean_with_bounds(): + return Coil( + 47050, + "status-cooling-43024", + "Periodic HW", + "s8", + factor=1, + min=0, + max=1, + ) + + +def test_unsigned_u8_boolean_with_bounds_is_boolean( + coil_unsigned_u8_boolean_with_bounds: Coil, +): + assert coil_unsigned_u8_boolean_with_bounds.is_boolean + + +@pytest.mark.parametrize( + "value, raw_value", + [ + ("On", b"\x01\x00\x00\x00"), + ("Off", b"\x00\x00\x00\x00"), + ], +) +def test_unsigned_u8_boolean_with_bounds_encode( + value, + raw_value, + encoder: CoilDataEncoder, + coil_unsigned_u8_boolean_with_bounds: Coil, +): + coil_data = CoilData(coil_unsigned_u8_boolean_with_bounds, value) + assert encoder.encode(coil_data) == raw_value diff --git a/tests/test_heatpump.py b/tests/test_heatpump.py index 46dc831..5441b57 100644 --- a/tests/test_heatpump.py +++ b/tests/test_heatpump.py @@ -3,6 +3,7 @@ import pytest +from nibe.coil import CoilData from nibe.connection.nibegw import CoilDataEncoder from nibe.exceptions import CoilNotFoundException, ModelIdentificationFailed from nibe.heatpump import HeatPump, Model, ProductInfo, Series @@ -47,23 +48,27 @@ def test_listener(self): mock.assert_not_called() - self.heat_pump.notify_coil_update(coil) + coil_data = CoilData(coil, 14) + self.heat_pump.notify_coil_update(coil_data) - mock.assert_called_with(coil) + mock.assert_called_with(coil_data) def test_listener_with_exception(self): mock = Mock(side_effect=Exception("Test exception that needs to be logged")) coil = self.heat_pump.get_coil_by_address(40004) self.heat_pump.subscribe(self.heat_pump.COIL_UPDATE_EVENT, mock) + coil_data = CoilData(coil, 14) self.heat_pump.notify_coil_update( - coil + coil_data ) # Error should be logged but not thrown out def test_word_swap_is_true(self): coil = self.heat_pump.get_coil_by_address(43420) assert ( - CoilDataEncoder(self.heat_pump.word_swap).decode(coil, b"(\x06\x00\x00") + CoilDataEncoder(self.heat_pump.word_swap) + .decode(coil, b"(\x06\x00\x00") + .value == 1576 ) @@ -77,7 +82,9 @@ async def asyncSetUp(self) -> None: def test_word_swap_is_false(self): coil = self.heat_pump.get_coil_by_address(43420) assert ( - CoilDataEncoder(self.heat_pump.word_swap).decode(coil, b"\x00\x00(\x06") + CoilDataEncoder(self.heat_pump.word_swap) + .decode(coil, b"\x00\x00(\x06") + .value == 1576 )