Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ New features
* Display KPIs for asset sensors with daily event resolution [see `PR #1608 <https://github.com/FlexMeasures/flexmeasures/pull/1608>`_, `PR #1634 <https://github.com/FlexMeasures/flexmeasures/pull/1634>`_ and `PR #1656 <https://github.com/FlexMeasures/flexmeasures/pull/1656>`_]
* Improved timestamp on sensor detail page to be more friendly [see `PR #1632 <https://www.github.com/FlexMeasures/flexmeasures/pull/1632>`_]
* Asset types support: new API endpoint (`GET /assets/types`), better docs and fix CLI command `flexmeasures show asset-types` [see `PR #1663 <https://github.com/FlexMeasures/flexmeasures/pull/1663>`_]
* Added Materialized View for latest beliefs per sensor to speed up data loading [see `PR #1671 <https://github.com/FlexMeasures/flexmeasures/pull/1671>`_]

Infrastructure / Support
----------------------
Expand Down
11 changes: 10 additions & 1 deletion documentation/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -671,4 +671,13 @@ FLEXMEASURES_API_SUNSET_LINK

Allow to override the default sunset link for your clients.

Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0)
Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0)

Performance optimizations
----------------------------

FLEXMEASURES_MVIEW_REFRESH_INTERVAL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FLEXMEASURES_MVIEW_REFRESH_INTERVAL setting requires a short discussion, @Muhammad-Moiz626 @nhoening. I already scoped some options with @Ahmad-Wahid. I suggest to spin out that discussion on this comment thread.

There is currently no automation that runs the CLI command to refresh the mviews. Our go-to approach is to do this via a cronjob, for which we'd essentially have to move this setting in the cron job configuration. It doesn't then make sense to have that setting live in FlexMeasures, as FlexMeasures is not setting up cronjobs itself. I believe the config setting is, in this PR, only used to inform the user on the refresh rate in the UI. I think that is nice. If we want to keep that, then an alternative to this config setting may be to use the tooling we have that saves (in the db) the last time a specific CLI command was run successfully, and report that time in the UI instead.

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Interval in minutes to refresh the materialized db view which caches the most recent beliefs at a given point in time (for faster queries).

Default: None
3 changes: 3 additions & 0 deletions flexmeasures/api/dev/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.services.annotations import prepare_annotations_for_chart
from flexmeasures.ui.utils.view_utils import set_session_variables
from flexmeasures.data.config import most_recent_beliefs_mview


class SensorAPI(FlaskView):
Expand Down Expand Up @@ -90,6 +91,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs):
"most_recent_beliefs_only": fields.Boolean(
required=False, load_default=True
),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
"compress_json": fields.Boolean(required=False),
},
location="query",
Expand All @@ -111,6 +113,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs):
- "resolution" (see :ref:`resolutions`)
- "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true)
"""
kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview
return sensor.search_beliefs(as_json=True, **kwargs)

@route("/<id>/chart_annotations", strict_slashes=False)
Expand Down
3 changes: 3 additions & 0 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from flexmeasures.utils.coding_utils import (
flatten_unique,
)
from flexmeasures.data.config import most_recent_beliefs_mview
from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables
from flexmeasures.auth.policy import check_access
from werkzeug.exceptions import Forbidden, Unauthorized
Expand Down Expand Up @@ -695,6 +696,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs):
"beliefs_after": AwareDateTimeField(format="iso", required=False),
"beliefs_before": AwareDateTimeField(format="iso", required=False),
"most_recent_beliefs_only": fields.Boolean(required=False),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
"compress_json": fields.Boolean(required=False),
},
location="query",
Expand All @@ -708,6 +710,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs):
Data for use in charts (in case you have the chart specs already).
"""
sensors = flatten_unique(asset.validate_sensors_to_show())
kwargs["most_recent_beliefs_mview"] = most_recent_beliefs_mview
return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs)

@route("/<id>/auditlog")
Expand Down
37 changes: 37 additions & 0 deletions flexmeasures/cli/db_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import click

from flexmeasures.cli.utils import MsgStyle
from flexmeasures.data import db


@click.group("db-ops")
Expand Down Expand Up @@ -95,4 +96,40 @@ def restore(file: str):
click.secho("db restore unsuccessful", **MsgStyle.ERROR)


@fm_db_ops.command("refresh-materialized-views")
@with_appcontext
@click.option("--concurrent", is_flag=True, default=False)
def refresh_materialized_views(concurrent: bool):
"""
Refresh the materialized views for getting the most recent data.
By default, this locks the materialized view for the duration of the refresh.
Use the --concurrent option to avoid locking, at the cost of higher resource usage and
the requirement that a unique index exists on the materialized view.
"""
from sqlalchemy import text

refresh_type = "CONCURRENTLY" if concurrent else ""
import time

start_time = time.time()
click.secho(
f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}...",
**MsgStyle.INFO,
)
try:
db.session.execute(
text(f"REFRESH MATERIALIZED VIEW {refresh_type} most_recent_beliefs_mview;")
)
db.session.commit()
elapsed_time = time.time() - start_time
click.secho(
f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds",
**MsgStyle.SUCCESS,
)
except Exception as e:
db.session.rollback()
click.secho(f"✗ Error refreshing materialized views: {e}", **MsgStyle.ERROR)
raise click.Abort()


app.cli.add_command(fm_db_ops)
15 changes: 14 additions & 1 deletion flexmeasures/data/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
Base = None # type: ignore
session_options = None
most_recent_beliefs_mview = None


def init_db():
Expand All @@ -32,7 +33,7 @@ def init_db():
def configure_db_for(app: Flask):
"""Call this to configure the database and the tools we use on it for the Flask app.
This should only be called once in the app's lifetime."""
global db, Base
global db, Base, most_recent_beliefs_mview

with app.app_context():
db.init_app(app)
Expand All @@ -51,6 +52,18 @@ def configure_db_for(app: Flask):
forecasting,
) # noqa: F401

import timely_beliefs.utils as tb_utils

try:
most_recent_beliefs_mview = tb_utils.get_most_recent_beliefs_mview(
db.session
)
except Exception:
app.logger.warning(
"Could not determine most_recent_beliefs_mview. Do you have timely-beliefs installed and is the latest version?"
" Beliefs will be retrieved from the actual table instead of the materialized view.",
)

# This would create db structure based on models, but you should use `flask db upgrade` for that.
# Base.metadata.create_all(bind=db.engine)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Add materialized view for belief optimization

Revision ID: c98798csds8c
Revises: b8f3cda5e023
Create Date: 2025-08-08 04:55:33.722545

"""

from alembic import op

# revision identifiers
revision = "c98798csds8c"
down_revision = "b8f3cda5e023"
branch_labels = None
depends_on = None


def upgrade():
# Create the materialized view with proper alias
op.execute(
"""
CREATE MATERIALIZED VIEW most_recent_beliefs_mview AS
SELECT *
FROM (
SELECT
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id,
MIN(timed_belief.belief_horizon) AS most_recent_belief_horizon
FROM timed_belief
INNER JOIN data_source
ON data_source.id = timed_belief.source_id
GROUP BY
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id
) AS belief_mins
GROUP BY
sensor_id,
event_start,
source_id,
most_recent_belief_horizon;
"""
)

# Create indexes
op.execute(
"""
CREATE INDEX idx_most_recent_beliefs_mview_sensor_event
ON most_recent_beliefs_mview(sensor_id, event_start);
"""
)

op.execute(
"""
CREATE INDEX idx_most_recent_beliefs_mview_event_start
ON most_recent_beliefs_mview(event_start);
"""
)

# Create a unique index to allow concurrent refreshes
op.execute(
"""
CREATE UNIQUE INDEX idx_most_recent_beliefs_mview_unique
ON most_recent_beliefs_mview(sensor_id, event_start, source_id);
"""
)


def downgrade():
op.execute("DROP MATERIALIZED VIEW IF EXISTS most_recent_beliefs_mview CASCADE;")
6 changes: 5 additions & 1 deletion flexmeasures/data/models/generic_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import current_app
from flask_security import current_user
import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.engine import Row
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.sql.expression import func, text
Expand Down Expand Up @@ -648,6 +648,8 @@ def search_beliefs( # noqa C901
as_json: bool = False,
compress_json: bool = False,
resolution: timedelta | None = None,
use_materialized_view: bool = True,
most_recent_beliefs_mview: Table | None = None,
) -> BeliefsDataFrame | str:
"""Search all beliefs about events for all sensors of this asset

Expand Down Expand Up @@ -688,6 +690,8 @@ def search_beliefs( # noqa C901
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
resolution=resolution,
use_materialized_view=use_materialized_view,
most_recent_beliefs_mview=most_recent_beliefs_mview,
)
if as_json and not compress_json:
from flexmeasures.data.services.time_series import simplify_index
Expand Down
13 changes: 12 additions & 1 deletion flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from flask import current_app

import pandas as pd
from sqlalchemy import select
from sqlalchemy import (
select,
Table,
)
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.schema import UniqueConstraint
Expand Down Expand Up @@ -356,6 +359,8 @@ def search_beliefs( # noqa: C901
as_json: bool = False,
compress_json: bool = False,
resolution: str | timedelta | None = None,
use_materialized_view: bool = True,
most_recent_beliefs_mview: Table | None = None,
) -> tb.BeliefsDataFrame | str:
"""Search all beliefs about events for this sensor.

Expand Down Expand Up @@ -401,6 +406,8 @@ def search_beliefs( # noqa: C901
one_deterministic_belief_per_event=one_deterministic_belief_per_event,
one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
resolution=resolution,
use_materialized_view=use_materialized_view,
most_recent_beliefs_mview=most_recent_beliefs_mview,
)
if as_json and not compress_json:
df = bdf.reset_index()
Expand Down Expand Up @@ -804,6 +811,8 @@ def search(
one_deterministic_belief_per_event_per_source: bool = False,
resolution: str | timedelta = None,
sum_multiple: bool = True,
use_materialized_view: bool = True,
most_recent_beliefs_mview: Table | None = None,
) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]:
"""Search all beliefs about events for the given sensors.

Expand Down Expand Up @@ -887,6 +896,8 @@ def search(
**most_recent_filters,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
use_materialized_view=use_materialized_view,
most_recent_beliefs_mview=most_recent_beliefs_mview,
)
if use_latest_version_per_event:
bdf = keep_latest_version(
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/schemas/tests/test_input_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def test_input_schema():
# These arguments are not mapped to a field at all (state a reason)
excluded_arg_names = [
"as_json", # used in Sensor.search_beliefs but not in TimedBelief.search
"most_recent_beliefs_mview", # used in Sensor.search_beliefs but not in TimedBelief.search
"use_materialized_view", # used in Sensor.search_beliefs as well as in TimedBelief.search
"compress_json", # used in Sensor.search_beliefs but not in TimedBelief.search
]

Expand Down
Loading
Loading