Skip to content

Commit

Permalink
Conversion validation (#184)
Browse files Browse the repository at this point in the history
I want to verify if json files are converted correctly from CSV files
inside pull requests.
  • Loading branch information
yozik04 authored Oct 18, 2024
2 parents fcaac2f + e214a7c commit 8d5c387
Showing 1 changed file with 123 additions and 78 deletions.
201 changes: 123 additions & 78 deletions nibe/console_scripts/convert_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,100 @@
import json
import logging
import re
from typing import Literal
from typing import Optional

import pandas
import pandas as pd
from slugify import slugify

from nibe.heatpump import HeatPump, Model

logger = logging.getLogger("nibe")

re_mapping = re.compile(
r"(?P<key>(?<!\w)\d+|I)\s*=\s*(?P<value>(?:[\w +.-]+[\w]\b[+]?(?! *=)))",
re.IGNORECASE,
)

def update_dict(d: MutableMapping, u: Mapping, removeExplicitNulls: bool) -> Mapping:

def _extract_mappings(info: str) -> Optional[Mapping]:
if pd.isna(info):
return None

if "Binary encoded" in info:
return None

mappings = {}
matches = re_mapping.finditer(info)

for match in matches:
key = match.group("key")
value = match.group("value")

if key == "I":
key = "1"

mappings[key] = value

if not mappings:
return None

return _sort_mappings(mappings)


def _sort_mappings(mappings) -> Mapping:
return {str(k): mappings[str(k)] for k in sorted(map(int, mappings.keys()))}


def _sort_mappings_in_output(dict_):
for key, value in dict_.items():
if "mappings" in value:
dict_[key]["mappings"] = _sort_mappings(value["mappings"])


def _update_dict(d: MutableMapping, u: Mapping, removeExplicitNulls: bool) -> Mapping:
for k, v in u.items():
if v is None and removeExplicitNulls:
try:
d.pop(k)
except (IndexError, KeyError):
pass
elif isinstance(v, Mapping):
update_dict(d.setdefault(k, {}), v, removeExplicitNulls)
_update_dict(d.setdefault(k, {}), v, removeExplicitNulls)
else:
d[k] = v

return d


class ValidationFailed(Exception):
"""Raised when validation fails."""

pass


class CSVConverter:
data: pandas.DataFrame
"""Converts CSV file to JSON file."""

data: pd.DataFrame

def __init__(self, in_file, out_file, extensions):
self.in_file = in_file
self.out_file = out_file
self.extensions = extensions

def convert(self, mode: Literal["export", "verify"] = "export"):
def convert(self):
"""Converts CSV file to JSON file."""
self._process()

self._export_to_file()

def verify(self):
"""Verifies that the JSON file matches the CSV file after conversion."""
self._process()

self._verify_export()

def _process(self):
self._read_csv()

self._unifi_column_names()
Expand All @@ -68,58 +124,36 @@ def convert(self, mode: Literal["export", "verify"] = "export"):

self._ensure_no_duplicate_ids()

self._export_to_file()

if mode == "export":
self._export_to_file()
elif mode == "validate":
self._verify_export()

def _make_mapping_parameter(self):
if "info" not in self.data:
return

re_mapping = re.compile(
r"(?P<value>\d+|I)\s*=\s*(?P<key>(?:[\w +.-]+[\w]\b[+]?(?! *=)))",
re.IGNORECASE,
)
mappings = (
self.data["info"]
.where(~self.data["info"].str.contains("encoded"))
.str.extractall(re_mapping)
)
mappings["value"] = mappings["value"].str.replace("I", "1").astype("str")
mappings = mappings.reset_index("match", drop=True)
self.data["mappings"] = pandas.Series(
{
str(k): self._make_mapping_series(g)
for k, g in mappings.groupby("value", level=0)
}
).where(self._is_mapping_allowed)

def _is_mapping_allowed(self, s):
return self.data["factor"] == 1
# Create a mask to identify rows where mapping is allowed
allowed_mask = self.data["factor"] == 1

def _make_mapping_series(self, g):
return g.set_index("value", drop=True)["key"].drop_duplicates()
# Apply the function to each cell in self.data["info"] column where mapping is allowed
self.data.loc[allowed_mask, "mappings"] = self.data.loc[
allowed_mask, "info"
].apply(_extract_mappings)

def _unset_equal_min_max_default_values(self):
valid_min_max = self.data["min"] != self.data["max"]
self.data["min"] = self.data["min"].where(valid_min_max)
self.data["max"] = self.data["max"].where(valid_min_max)
self.data["default"] = self.data["default"].where(valid_min_max)
for column in ["min", "max", "default"]:
self.data[column] = self.data[column].where(valid_min_max)

def _fix_data_types(self):
self.data["unit"] = self.data["unit"].astype("string")
self.data["title"] = self.data["title"].astype("string")
string_columns = ["unit", "title", "size", "name"]
for column in string_columns:
self.data[column] = self.data[column].astype("string")

if "info" in self.data:
self.data["info"] = self.data["info"].astype("string")
self.data["size"] = self.data["size"].astype("string")
self.data["name"] = self.data["name"].astype("string")

self.data["factor"] = self.data["factor"].astype("int")
self.data["min"] = self.data["min"].astype("float")
self.data["max"] = self.data["max"].astype("float")
self.data["default"] = self.data["default"].astype("float")

float_columns = ["min", "max", "default"]
for column in float_columns:
self.data[column] = self.data[column].astype("float")

def _fix_data_size_column(self):
mapping = {
Expand Down Expand Up @@ -150,14 +184,14 @@ def _fix_data_size_column(self):

def _fix_data_unit_column(self):
self.data["unit"] = (
self.data["unit"].replace(r"^\s*$", pandas.NA, regex=True).str.strip()
self.data["unit"].replace(r"^\s*$", pd.NA, regex=True).str.strip()
)

def _fix_data_soft_hyphens(self):
self.data["title"] = self.data["title"].str.replace("\xad", "")

def _make_name_using_slugify(self):
ids = pandas.Series(self.data.index, index=self.data.index)
ids = pd.Series(self.data.index, index=self.data.index)
self.data["name"] = self.data["title"].combine(
ids, lambda title, id_: slugify(f"{title}-{id_}")
)
Expand All @@ -167,7 +201,7 @@ def _replace_mode_with_boolean_write_parameter(self):
self.data["mode"] = self.data["mode"].str.strip().astype("string")

self.data["write"] = self.data["mode"].map(
lambda x: True if x == "R/W" else pandas.NA
lambda x: True if x == "R/W" else pd.NA
)
del self.data["mode"]

Expand All @@ -189,7 +223,7 @@ def _read_csv(self):
modbus_manager = f.readline().startswith("ModbusManager")

if modbus_manager:
self.data = pandas.read_csv(
self.data = pd.read_csv(
self.in_file,
sep=";",
skiprows=4,
Expand All @@ -198,7 +232,7 @@ def _read_csv(self):
skipinitialspace=True,
)
else:
self.data = pandas.read_csv(
self.data = pd.read_csv(
self.in_file,
sep="\t",
skiprows=0,
Expand Down Expand Up @@ -248,33 +282,30 @@ def calculate_number(row):

self.data = self.data.set_index("id")

def _convert_series_to_dict(self, o):
if isinstance(o, pandas.Series):
return o.sort_index(key=lambda i: i.astype(int)).to_dict()

raise TypeError(f"Object of type {type(o)} is not JSON serializable")

def _ensure_no_duplicate_ids(self):
if self.data.index.has_duplicates:
logger.error(
f"Duplicate IDs found in {self.in_file}:\n{self.data[self.data.index.duplicated()]}"
)
raise ValueError("Duplicate IDs found")

def _make_dict(self):
return {index: row.dropna().to_dict() for index, row in self.data.iterrows()}
def _make_dict(self) -> dict:
out = {index: row.dropna().to_dict() for index, row in self.data.iterrows()}

_update_dict(out, self.extensions, True)
_sort_mappings_in_output(out)

return out

def _export_to_file(self):
o = self._make_dict()
update_dict(o, self.extensions, True)
out = self._make_dict()

with open(self.out_file, "w", encoding="utf-8") as fh:
json.dump(o, fh, indent=2, default=self._convert_series_to_dict)
json.dump(out, fh, indent=2)
fh.write("\n")

def _verify_export(self):
o = self._make_dict()
update_dict(o, self.extensions, True)

try:
with open(self.out_file, encoding="utf-8") as fh:
Expand All @@ -285,21 +316,20 @@ def _verify_export(self):
raise ValidationFailed(f"File {self.out_file} not found")

if o != file_contents:
expected = json.dumps(o, indent=4, sort_keys=True)
actual = json.dumps(file_contents, indent=4, sort_keys=True)
expected = json.dumps(o, indent=2, sort_keys=True)
actual = json.dumps(file_contents, indent=2, sort_keys=True)
diff = difflib.unified_diff(
expected.splitlines(),
actual.splitlines(),
fromfile="expected",
tofile="actual",
lineterm="",
)
diff_text = "\n".join(diff)
raise ValidationFailed(
f"File {self.out_file} does not match the expected content\nDiff:\n{diff_text}"
)
raise ValidationFailed(f"File {self.out_file} does not match:\n{diff_text}")


async def _validate_initialization(out_file):
async def _verify_heat_pump_initialization(out_file):
model = Model.CUSTOM
model.data_file = out_file
hp = HeatPump(model)
Expand All @@ -311,7 +341,7 @@ async def run(mode):
all_extensions = json.load(fp)

processed_files = []
convert_failed = []
processing_failed = []

for in_file in files("nibe.data").glob("*.csv"):
out_file = in_file.with_suffix(".json")
Expand All @@ -320,29 +350,39 @@ async def run(mode):
for extra in all_extensions:
if out_file.name not in extra["files"]:
continue
update_dict(extensions, extra["data"], False)
_update_dict(extensions, extra["data"], False)

logger.info(f"Processing {in_file} to {out_file}")
try:
CSVConverter(in_file, out_file, extensions).convert(mode=mode)
converter = CSVConverter(in_file, out_file, extensions)
if mode == "verify":
converter.verify()
elif mode == "export":
converter.convert()
else:
raise ValueError(f"Invalid mode: {mode}")

await _validate_initialization(out_file)
await _verify_heat_pump_initialization(out_file)

if mode == "verify":
logger.info(f"Verified {out_file}")
else:
logger.info(f"Converted {in_file} to {out_file}")
except ValidationFailed as ex:
processing_failed.append(in_file)
logger.error("Validation failed for %s: %s", in_file, ex)
except Exception as ex:
convert_failed.append(in_file)
processing_failed.append(in_file)
logger.exception("Failed to process %s: %s", in_file, ex)
finally:
processed_files.append(in_file)

if convert_failed:
logger.error("Failed to process the following files: %s", convert_failed)
raise ValueError("Failed to process all files")
assert len(processed_files) > 0, "No files were processed"
assert len(processing_failed) == 0, f"Failed to process files: {processing_failed}"

logger.info("Processed files: %s", list(map(lambda x: x.name, processed_files)))
logger.info(
"Successfully processed files: %s", list(map(lambda x: x.name, processed_files))
)


def main():
Expand All @@ -353,7 +393,12 @@ def main():
mode = "verify" if args.verify else "export"

logging.basicConfig(level=logging.INFO)
asyncio.run(run(mode))

try:
asyncio.run(run(mode))
except AssertionError as ex:
logger.error(ex)
exit(1)


if __name__ == "__main__":
Expand Down

0 comments on commit 8d5c387

Please sign in to comment.