Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dist
.tox
coverage.xml
.coverage*
*_cache
*_cache
venv/
108 changes: 108 additions & 0 deletions tests/polar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,33 @@
# For detail about GNU see <http://www.gnu.org/licenses/>.
import math
import os
import tempfile
import unittest

import weatherrouting


def create_temp_file(content: str, test_instance=None) -> str:
temp_file = tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".pol", encoding="utf-8"
)
temp_file.write(content)
temp_file.close()
if test_instance:
test_instance.addCleanup(os.remove, temp_file.name)
return temp_file.name


class TestPolar(unittest.TestCase):
def setUp(self):
self.polar_obj = weatherrouting.Polar(
os.path.join(os.path.dirname(__file__), "data/bavaria38.pol")
)
self.valid_file_path = os.path.join(
os.path.dirname(__file__), "data/bavaria38.pol"
)
with open(self.valid_file_path, "r", encoding="utf-8") as f:
self.valid_polar_content_lines = f.readlines()

def test_to_string(self):
f = open(os.path.join(os.path.dirname(__file__), "data/bavaria38.pol"), "r")
Expand Down Expand Up @@ -65,3 +82,94 @@ def test_reaching(self):
self.assertAlmostEqual(
self.polar_obj.get_reaching(6.1)[1], 1.3962634015954636, delta=0.001
)

# --- Tests for Polar.validate_file ---
def test_validate_valid_file(self):
# This should not raise an error for a known valid file
try:
weatherrouting.Polar.validate_file(self.valid_file_path)
except weatherrouting.PolarError as e:
self.fail(
f"validate_polar_file raised PolarError unexpectedly for a valid file: {e}"
)

def test_validate_empty_file(self):
temp_file_path = create_temp_file("", self)
with self.assertRaisesRegex(weatherrouting.PolarError, "EMPTY_FILE"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_wind_numeric(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("30", "a")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(
weatherrouting.PolarError, "WIND_SPEED_NOT_NUMERIC"
):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_wind_incresing(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("50", "100")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(
weatherrouting.PolarError, "WIND_SPEEDS_NOT_INCREASING"
):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_empty_line(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("\n", "\n\n")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "EMPTY_LINE"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_column_count_mismatch(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("60", "60 70")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "COLUMN_COUNT_MISMATCH"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_twa_out_of_range(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("100", "181")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "TWA_OUT_OF_RANGE"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_twa_not_numeric(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("100", "a")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "TWA_NOT_NUMERIC"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_empty_value(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("7.7", "-")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "EMPTY_VALUE"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_negative_speed(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("7.7", "-1")
temp_file_path = create_temp_file(corrupt_content)
with self.assertRaisesRegex(weatherrouting.PolarError, "NEGATIVE_SPEED"):
weatherrouting.Polar.validate_file(temp_file_path)

def test_validate_speed_not_numeric(self):
with open(self.valid_file_path, "r", encoding="utf-8") as f:
valid_content = f.read()
corrupt_content = valid_content.replace("7.7", "a")
temp_file_path = create_temp_file(corrupt_content, self)
with self.assertRaisesRegex(weatherrouting.PolarError, "SPEED_NOT_NUMERIC"):
weatherrouting.Polar.validate_file(temp_file_path)
2 changes: 1 addition & 1 deletion weatherrouting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# For detail about GNU see <http://www.gnu.org/licenses/>.
from .grib import Grib # noqa: F401
from .polar import Polar # noqa: F401
from .polar import Polar, PolarError # noqa: F401
from .routers import * # noqa: F401, F403
from .routing import Routing, list_routing_algorithms # noqa: F401
from .utils import * # noqa: F401, F403
101 changes: 101 additions & 0 deletions weatherrouting/polar.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

# For detail about GNU see <http://www.gnu.org/licenses/>.
import math
import re
from io import TextIOWrapper
from typing import Dict, Optional, Tuple


class PolarError(Exception):
pass


class Polar:
def __init__(self, polar_path: str, f: Optional[TextIOWrapper] = None):
"""
Expand All @@ -29,6 +34,7 @@ def __init__(self, polar_path: str, f: Optional[TextIOWrapper] = None):
f : File
File object for passing an opened file
"""
self.validate_file(polar_path)

self.tws = []
self.twa = []
Expand Down Expand Up @@ -189,3 +195,98 @@ def get_twa_routage(self, tws: float, twa: float) -> float:
if twa > twadown:
twa = twadown
return twa

# ---- Start validate function ----
@staticmethod
def validate_file(filepath):
"""Validates the structure and content of a polar file.

Returns True if valid, raises PolarError with specific message if invalid.
"""
with open(filepath, "r") as f:
content = f.read()
lines = content.strip().split("\n")

# Check for empty file
if len(lines) == 1 and not lines[0] or not lines[0]:
raise PolarError("EMPTY_FILE")

# Process header (wind speeds)
Polar._validate_header(lines[0])

# Check data rows
header_parts = re.split(r"\s+", lines[0].strip())
expected_columns = len(header_parts)

for line in lines[1:]:
Polar._validate_data_row(line, expected_columns)

return True

@staticmethod
def _validate_header(header_line):
"""Validates the header line containing wind speeds."""
header_parts = re.split(r"\s+", header_line.strip())

# Try to parse wind speeds (should be numeric)
try:
tws = [float(ws) for ws in header_parts[1:]]
except ValueError:
raise PolarError("WIND_SPEED_NOT_NUMERIC")

# Check for increasing wind speeds
if not all(tws[i] <= tws[i + 1] for i in range(len(tws) - 1)):
raise PolarError("WIND_SPEEDS_NOT_INCREASING")

return True

@staticmethod
def _validate_data_row(line, expected_columns):
"""Validates a single data row in the polar file."""
parts = re.split(r"\s+", line.strip())

# Skip empty lines
if not parts or (len(parts) == 1 and not parts[0]):
raise PolarError("EMPTY_LINE")

# Check number of columns
if len(parts) != expected_columns:
raise PolarError("COLUMN_COUNT_MISMATCH")

# Validate TWA
Polar._validate_twa(parts[0])

# Validate boat speeds
for speed in parts[1:]:
Polar._validate_boat_speed(speed)

return True

@staticmethod
def _validate_twa(twa_str):
"""Validates a TWA (True Wind Angle) value."""
try:
twa = float(twa_str)
if twa < 0 or twa > 180:
raise PolarError("TWA_OUT_OF_RANGE")
except ValueError:
raise PolarError("TWA_NOT_NUMERIC")

return True

@staticmethod
def _validate_boat_speed(speed_str):
"""Validates a boat speed value."""
if speed_str in ["", "-", "NaN", "NULL"]:
raise PolarError("EMPTY_VALUE")

try:
boat_speed = float(speed_str)
if boat_speed < 0:
raise PolarError("NEGATIVE_SPEED")
except ValueError:
raise PolarError("SPEED_NOT_NUMERIC")

return True

# ---- End validate function ----