Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flexmeasures/api/dev/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.services.annotations import prepare_annotations_for_chart
from flexmeasures.ui.utils.view_utils import set_session_variables
from flexmeasures.utils.validation_utils import (
validate_timed_belief_min_v,
)


class SensorAPI(FlaskView):
Expand Down Expand Up @@ -90,6 +93,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs):
"most_recent_beliefs_only": fields.Boolean(
required=False, load_default=True
),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
},
location="query",
)
Expand All @@ -110,6 +114,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs):
- "resolution" (see :ref:`resolutions`)
- "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true)
"""
kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session)
return sensor.search_beliefs(as_json=True, **kwargs)

@route("/<id>/chart_annotations", strict_slashes=False)
Expand Down
5 changes: 5 additions & 0 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from flexmeasures.utils.coding_utils import (
flatten_unique,
)
from flexmeasures.utils.validation_utils import (
validate_timed_belief_min_v,
)
from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables
from flexmeasures.auth.policy import check_access
from werkzeug.exceptions import Forbidden, Unauthorized
Expand Down Expand Up @@ -657,6 +660,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs):
"beliefs_after": AwareDateTimeField(format="iso", required=False),
"beliefs_before": AwareDateTimeField(format="iso", required=False),
"most_recent_beliefs_only": fields.Boolean(required=False),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
},
location="query",
)
Expand All @@ -669,6 +673,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs):
Data for use in charts (in case you have the chart specs already).
"""
sensors = flatten_unique(asset.validate_sensors_to_show())
kwargs["timed_belief_min_v"] = validate_timed_belief_min_v(db.session)
return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs)

@route("/<id>/auditlog")
Expand Down
29 changes: 29 additions & 0 deletions flexmeasures/cli/db_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import click

from flexmeasures.cli.utils import MsgStyle
from flexmeasures.data import db


@click.group("db-ops")
Expand Down Expand Up @@ -95,4 +96,32 @@ def restore(file: str):
click.secho("db restore unsuccessful", **MsgStyle.ERROR)


@fm_db_ops.command("refresh-materialized-views")
@with_appcontext
@click.option("--concurrent", is_flag=True, default=False)
def refresh_materialized_views(concurrent: bool):
"""Refresh materialized views for better query performance."""
from sqlalchemy import text

refresh_type = "CONCURRENTLY" if concurrent else ""
import time

start_time = time.time()
click.echo(
f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}..."
)
try:
db.session.execute(
text(f"REFRESH MATERIALIZED {refresh_type} VIEW timed_belief_min_v;")
)
db.session.commit()
elapsed_time = time.time() - start_time
click.echo(
f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds"
)
except Exception as e:
db.session.rollback()
click.echo(f"✗ Error refreshing materialized views: {e}")


app.cli.add_command(fm_db_ops)
6 changes: 5 additions & 1 deletion flexmeasures/data/models/generic_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import current_app
from flask_security import current_user
import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.engine import Row
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.sql.expression import func, text
Expand Down Expand Up @@ -647,6 +647,8 @@ def search_beliefs(
most_recent_events_only: bool = False,
as_json: bool = False,
resolution: timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> BeliefsDataFrame | str:
"""Search all beliefs about events for all sensors of this asset

Expand Down Expand Up @@ -686,6 +688,8 @@ def search_beliefs(
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json:
from flexmeasures.data.services.time_series import simplify_index
Expand Down
10 changes: 9 additions & 1 deletion flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flask import current_app

import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.schema import UniqueConstraint
Expand Down Expand Up @@ -354,6 +354,8 @@ def search_beliefs(
one_deterministic_belief_per_event_per_source: bool = False,
as_json: bool = False,
resolution: str | timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | str:
"""Search all beliefs about events for this sensor.

Expand Down Expand Up @@ -398,6 +400,8 @@ def search_beliefs(
one_deterministic_belief_per_event=one_deterministic_belief_per_event,
one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json:
df = bdf.reset_index()
Expand Down Expand Up @@ -715,6 +719,8 @@ def search(
one_deterministic_belief_per_event_per_source: bool = False,
resolution: str | timedelta = None,
sum_multiple: bool = True,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]:
"""Search all beliefs about events for the given sensors.

Expand Down Expand Up @@ -798,6 +804,8 @@ def search(
**most_recent_filters,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if use_latest_version_per_event:
bdf = keep_latest_version(
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/schemas/tests/test_input_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def test_input_schema():
# These arguments are not mapped to a field at all (state a reason)
excluded_arg_names = [
"as_json", # used in Sensor.search_beliefs but not in TimedBelief.search
"timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search
"use_materialized_view", # used in Sensor.search_beliefs but not in
]

arg_names_without_associated_fields = [
Expand Down
35 changes: 35 additions & 0 deletions flexmeasures/utils/validation_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import re
from sqlalchemy import MetaData, Table, Column, Integer, DateTime, Interval
from sqlalchemy.sql import text


def validate_color_hex(value):
Expand Down Expand Up @@ -51,3 +53,36 @@ def validate_url(value):
)

return value


def validate_timed_belief_min_v(session) -> Table | None:
"""Define the structure of the timed_belief_min_v materialized view if it exists."""
# Check if materialized view exists
result = session.execute(
text(
"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'timed_belief_min_v'
AND table_schema = 'public'
);
"""
)
)

exists = result.scalar()
if not exists:
return None
print("Using timed_belief_min_v materialized view for optimized queries.")

# Only create the table definition if it exists
metadata = MetaData()
timed_belief_min_v = Table(
"timed_belief_min_v",
metadata,
Column("sensor_id", Integer),
Column("event_start", DateTime),
Column("source_id", Integer),
Column("most_recent_belief_horizon", Interval),
)
return timed_belief_min_v
Loading