Skip to content

Commit

Permalink
feature(KTP-1470): Added data balancing feature. (OpenSTEF#574)
Browse files Browse the repository at this point in the history
* feature(KTP-1470): Added data balancing feature.

Signed-off-by: Egor Dmitriev <[email protected]>

* feature(KTP-1470): Added tests for data balancing.

Signed-off-by: Egor Dmitriev <[email protected]>

* feature(KTP-1470): Code style fixes.

Signed-off-by: Egor Dmitriev <[email protected]>

* feature(KTP-1470): Imporved tests for data balancing.

Signed-off-by: Egor Dmitriev <[email protected]>

---------

Signed-off-by: Egor Dmitriev <[email protected]>
  • Loading branch information
egordm authored Dec 2, 2024
1 parent df50619 commit 3873d1a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 2 deletions.
3 changes: 3 additions & 0 deletions openstef/data_classes/prediction_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class PredictionJobDataClass(BaseModel):
"""Minimum length (in rows) of the forecast input for making a regular forecast."""
flatliner_threshold_minutes: int = 1440
"""Number of minutes that the load has to be constant to detect a flatliner. """
data_balancing_ratio: Optional[float] = None
"""If data balancing is enabled, the data will be balanced with data from 1 year
ago in the future."""
depends_on: Optional[list[Union[int, str]]]
"""Link to another prediction job on which this prediction job might depend."""
sid: Optional[str]
Expand Down
33 changes: 32 additions & 1 deletion openstef/tasks/train_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd

from openstef.data_classes.prediction_job import PredictionJobDataClass
from openstef.enums import ModelType, PipelineType
from openstef.exceptions import (
Expand Down Expand Up @@ -114,10 +116,16 @@ def train_model_task(
return

# Define start and end of the training input data
training_period_days_to_fetch = (
TRAINING_PERIOD_DAYS
if pj.data_balancing_ratio is None
else int(pj.data_balancing_ratio * TRAINING_PERIOD_DAYS)
)

if datetime_end is None:
datetime_end = datetime.utcnow()
if datetime_start is None:
datetime_start = datetime_end - timedelta(days=TRAINING_PERIOD_DAYS)
datetime_start = datetime_end - timedelta(days=training_period_days_to_fetch)

# Get training input data from database
input_data = context.database.get_model_input(
Expand All @@ -127,6 +135,29 @@ def train_model_task(
datetime_end=datetime_end,
)

# If data balancing is enabled, fetch data from 1 year ago and combine it with the
# current data
if pj.data_balancing_ratio is not None:
# Because the data is from the past, we can use the data from the "future"
balanced_datetime_start = datetime_end - timedelta(days=365)
balanced_datetime_end = balanced_datetime_start + timedelta(
days=training_period_days_to_fetch
)

balanced_input_data = context.database.get_model_input(
pid=pj["id"],
location=[pj["lat"], pj["lon"]],
datetime_start=balanced_datetime_start,
datetime_end=balanced_datetime_end,
)

input_data = pd.concat(
[
balanced_input_data,
input_data,
]
)

context.perf_meter.checkpoint("Retrieved timeseries input")

# Excecute the model training pipeline
Expand Down
46 changes: 45 additions & 1 deletion test/unit/tasks/test_train_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#
# SPDX-License-Identifier: MPL-2.0
import copy
from datetime import datetime, UTC

from test.unit.utils.data import TestData
from unittest import TestCase
from unittest.mock import MagicMock, patch
Expand All @@ -11,7 +13,7 @@

from openstef.enums import PipelineType
from openstef.exceptions import InputDataOngoingZeroFlatlinerError
from openstef.tasks.train_model import main as task_main
from openstef.tasks.train_model import main as task_main, TRAINING_PERIOD_DAYS
from openstef.tasks.train_model import train_model_task


Expand All @@ -37,6 +39,8 @@ def test_create_train_model_task_happy_flow(self, train_model_pipeline_mock):

# Arrange
context = MagicMock()
test_data = TestData.load("reference_sets/307-train-data.csv")
context.database.get_model_input.return_value = test_data

# Act
train_model_task(self.pj, context)
Expand All @@ -46,6 +50,46 @@ def test_create_train_model_task_happy_flow(self, train_model_pipeline_mock):
self.assertEqual(
train_model_pipeline_mock.call_args_list[0][0][0]["id"], self.pj["id"]
)
self.assertEqual(
len(train_model_pipeline_mock.call_args_list[0][0][1]),
len(test_data),
)

@patch("openstef.tasks.train_model.train_model_pipeline")
def test_create_train_model_task_data_balancing(self, train_model_pipeline_mock):
# Test happy flow of create forecast task

# Arrange
context = MagicMock()
test_data = TestData.load("reference_sets/307-train-data.csv")

def get_model_input(*args, datetime_start=None, datetime_end=None, **kwargs):
# Shift test data to datetime_start and datetime_end
result = test_data.copy()
result.index = pd.date_range(
datetime_start, freq="15T", periods=len(result)
)
return result[datetime_start:datetime_end]

context.database.get_model_input = get_model_input
pj_with_balancing = self.pj.copy(update={"data_balancing_ratio": 0.5})

datetime_end = datetime.now(tz=UTC)
# +2 because pandas slicing is inclusive
expected_data_points = TRAINING_PERIOD_DAYS * 24 * 4 + 2

# Act
train_model_task(pj_with_balancing, context, datetime_end=datetime_end)

# Assert
self.assertEqual(train_model_pipeline_mock.call_count, 1)
self.assertEqual(
train_model_pipeline_mock.call_args_list[0][0][0]["id"], self.pj["id"]
)
input_data = train_model_pipeline_mock.call_args_list[0][0][1]
self.assertIn(datetime_end, input_data.index)
self.assertIn(datetime_end - pd.Timedelta(days=365), input_data.index)
self.assertEqual(len(input_data), expected_data_points)

@patch(
"openstef.tasks.train_model.train_model_pipeline",
Expand Down

0 comments on commit 3873d1a

Please sign in to comment.