Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion documentation/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -671,4 +671,13 @@ FLEXMEASURES_API_SUNSET_LINK

Allow to override the default sunset link for your clients.

Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0)
Default: ``None`` (defaults are set internally for each sunset API version, e.g. ``"https://flexmeasures.readthedocs.io/en/v0.13.0/api/v2_0.html"`` for v2.0)

Perforamance optimizations
----------------------------

FLEXMEASURES_MVIEW_UPDATE_INTERVAL
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Interval in minutes to refresh the materialized views in the background.

Default: ``0`` (minutes)
5 changes: 5 additions & 0 deletions flexmeasures/api/dev/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.services.annotations import prepare_annotations_for_chart
from flexmeasures.ui.utils.view_utils import set_session_variables
from flexmeasures.utils.validation_utils import (
get_timed_belief_min_v,
)


class SensorAPI(FlaskView):
Expand Down Expand Up @@ -90,6 +93,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs):
"most_recent_beliefs_only": fields.Boolean(
required=False, load_default=True
),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
"compress_json": fields.Boolean(required=False),
},
location="query",
Expand All @@ -111,6 +115,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs):
- "resolution" (see :ref:`resolutions`)
- "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true)
"""
kwargs["timed_belief_min_v"] = get_timed_belief_min_v()
return sensor.search_beliefs(as_json=True, **kwargs)

@route("/<id>/chart_annotations", strict_slashes=False)
Expand Down
5 changes: 5 additions & 0 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
from flexmeasures.utils.coding_utils import (
flatten_unique,
)
from flexmeasures.utils.validation_utils import (
get_timed_belief_min_v,
)
from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables
from flexmeasures.auth.policy import check_access
from werkzeug.exceptions import Forbidden, Unauthorized
Expand Down Expand Up @@ -695,6 +698,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs):
"beliefs_after": AwareDateTimeField(format="iso", required=False),
"beliefs_before": AwareDateTimeField(format="iso", required=False),
"most_recent_beliefs_only": fields.Boolean(required=False),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
"compress_json": fields.Boolean(required=False),
},
location="query",
Expand All @@ -708,6 +712,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs):
Data for use in charts (in case you have the chart specs already).
"""
sensors = flatten_unique(asset.validate_sensors_to_show())
kwargs["timed_belief_min_v"] = get_timed_belief_min_v()
return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs)

@route("/<id>/auditlog")
Expand Down
37 changes: 37 additions & 0 deletions flexmeasures/cli/db_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import click

from flexmeasures.cli.utils import MsgStyle
from flexmeasures.data import db


@click.group("db-ops")
Expand Down Expand Up @@ -95,4 +96,40 @@ def restore(file: str):
click.secho("db restore unsuccessful", **MsgStyle.ERROR)


@fm_db_ops.command("refresh-materialized-views")
@with_appcontext
@click.option("--concurrent", is_flag=True, default=False)
def refresh_materialized_views(concurrent: bool):
"""
Refresh the materialized views for getting the most recent data.
By default, this locks the materialized view for the duration of the refresh.
Use the --concurrent option to avoid locking, at the cost of higher resource usage and
the requirement that a unique index exists on the materialized view.
"""
from sqlalchemy import text

refresh_type = "CONCURRENTLY" if concurrent else ""
import time

start_time = time.time()
click.secho(
f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}...",
**MsgStyle.INFO,
)
try:
db.session.execute(
text(f"REFRESH MATERIALIZED VIEW {refresh_type} timed_belief_min_v;")
)
db.session.commit()
elapsed_time = time.time() - start_time
click.secho(
f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds",
**MsgStyle.SUCCESS,
)
except Exception as e:
db.session.rollback()
click.secho(f"✗ Error refreshing materialized views: {e}", **MsgStyle.ERROR)
raise click.Abort()


app.cli.add_command(fm_db_ops)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Add materialized view for belief optimization
Revision ID: c98798csds8c
Revises: b8f3cda5e023
Create Date: 2025-08-08 04:55:33.722545
"""

from alembic import op

# revision identifiers
revision = "c98798csds8c"
down_revision = "b8f3cda5e023"
branch_labels = None
depends_on = None


def upgrade():
# Create the materialized view with proper alias
op.execute(
"""
CREATE MATERIALIZED VIEW timed_belief_min_v AS
SELECT *
FROM (
SELECT
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id,
MIN(timed_belief.belief_horizon) AS most_recent_belief_horizon
FROM timed_belief
INNER JOIN data_source
ON data_source.id = timed_belief.source_id
GROUP BY
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id
) AS belief_mins
GROUP BY
sensor_id,
event_start,
source_id,
most_recent_belief_horizon;
"""
)

# Create indexes
op.execute(
"""
CREATE INDEX idx_timed_belief_min_v_sensor_event
ON timed_belief_min_v(sensor_id, event_start);
"""
)

op.execute(
"""
CREATE INDEX idx_timed_belief_min_v_event_start
ON timed_belief_min_v(event_start);
"""
)

# Create a unique index to allow concurrent refreshes
op.execute(
"""
CREATE UNIQUE INDEX idx_timed_belief_min_v_unique
ON timed_belief_min_v(sensor_id, event_start, source_id);
"""
)


def downgrade():
op.execute("DROP MATERIALIZED VIEW IF EXISTS timed_belief_min_v CASCADE;")
6 changes: 5 additions & 1 deletion flexmeasures/data/models/generic_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import current_app
from flask_security import current_user
import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.engine import Row
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.sql.expression import func, text
Expand Down Expand Up @@ -648,6 +648,8 @@ def search_beliefs( # noqa C901
as_json: bool = False,
compress_json: bool = False,
resolution: timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> BeliefsDataFrame | str:
"""Search all beliefs about events for all sensors of this asset

Expand Down Expand Up @@ -688,6 +690,8 @@ def search_beliefs( # noqa C901
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json and not compress_json:
from flexmeasures.data.services.time_series import simplify_index
Expand Down
10 changes: 9 additions & 1 deletion flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flask import current_app

import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.schema import UniqueConstraint
Expand Down Expand Up @@ -356,6 +356,8 @@ def search_beliefs( # noqa: C901
as_json: bool = False,
compress_json: bool = False,
resolution: str | timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | str:
"""Search all beliefs about events for this sensor.

Expand Down Expand Up @@ -401,6 +403,8 @@ def search_beliefs( # noqa: C901
one_deterministic_belief_per_event=one_deterministic_belief_per_event,
one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json and not compress_json:
df = bdf.reset_index()
Expand Down Expand Up @@ -804,6 +808,8 @@ def search(
one_deterministic_belief_per_event_per_source: bool = False,
resolution: str | timedelta = None,
sum_multiple: bool = True,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]:
"""Search all beliefs about events for the given sensors.

Expand Down Expand Up @@ -887,6 +893,8 @@ def search(
**most_recent_filters,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if use_latest_version_per_event:
bdf = keep_latest_version(
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/schemas/tests/test_input_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def test_input_schema():
# These arguments are not mapped to a field at all (state a reason)
excluded_arg_names = [
"as_json", # used in Sensor.search_beliefs but not in TimedBelief.search
"timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search
"use_materialized_view", # used in Sensor.search_beliefs as well as in TimedBelief.search
"compress_json", # used in Sensor.search_beliefs but not in TimedBelief.search
]

Expand Down
96 changes: 94 additions & 2 deletions flexmeasures/ui/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,13 @@
{% endfor %}
</ul>
</li>
{% if active_subpage == "asset_graph" %}
<div class="form-check form-switch" style="margin-left: auto;">
<input class="form-check-input" type="checkbox" id="useMaterializedView" checked title="Reload this graph with latest data instead of using recent cache (updated every {{ mv_refresh_interval }} minutes). This will take longer to load.">
<label class="form-check-label" for="useMaterializedView">Use materialized view</label>
</div>
{% endif %}
<div class="form-check form-switch" style="margin-left: 10px;">
<input class="form-check-input" type="checkbox" {% if breadcrumb_info['current_asset_view'] == session.get("default_asset_view") %} checked {% endif %} id="defaultAssetView" onchange="setDefaultAssetView(this, '{{ breadcrumb_info["current_asset_view"] }}')">
<label class="form-check-label" for="defaultAssetView">Set as my default asset view</label>
</div>
Expand Down Expand Up @@ -945,14 +951,14 @@
* @param {String} queryEndDate The end date as a string in ISO format, to be used in the query.
* @return {Promise} A promise that resolves with the data.
*/
function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate) {
function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, useMaterializedView=true) {
if (previousResult && previousResult.start.getTime() === startDate.getTime() && previousResult.end.getTime() === endDate.getTime()){
return Promise.resolve(previousResult.data);
} else {
{% if active_subpage == "asset_graph" and has_kpis %}
getAssetKPIs();
{% endif %}
return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&compress_json=true', {
return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&compress_json=true' + '&use_materialized_view=' + useMaterializedView, {
method: "GET",
headers: { "Content-Type": "application/json" },
signal: signal,
Expand All @@ -962,6 +968,92 @@
}
}

const useMaterializedViewCheckbox = document.getElementById("useMaterializedView");
useMaterializedViewCheckbox.addEventListener("change", function() {
toggleMaterializedView();
});

// Toggle materialized view usage for timely beliefs
function toggleMaterializedView() {
// Get the checked status of the checkbox
const isChecked = useMaterializedViewCheckbox.checked;
const pickerStartDate = picker.getStartDate().toJSDate();
const queryStartDate= encodeURIComponent(toIsoStringWithOffset(pickerStartDate));
var queryEndDate = picker.getEndDate();
queryEndDate.setDate(queryEndDate.getDate() + 1);
queryEndDate = encodeURIComponent(toIsoStringWithOffset(queryEndDate.toJSDate()));
// Since we are going to make a call anyway so we'll send these values as null
const startDate = null;
const endDate = null;
previousResult = null;

$("#spinner").show();
if (isChecked) {
console.log("Checked.")
// fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true);
Promise.all([
fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true),
/**
// Fetch annotations
fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, {
method: "GET",
headers: {"Content-Type": "application/json"},
signal: signal,
})
.then(function(response) { return response.json(); }),
*/

// Embed chart
embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate),
]).then(function(result) {
$("#spinner").hide();
vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run();
previousResult = {
start: startDate,
end: endDate,
data: result[0]
};
checkSourceMasking(previousResult.data);
playBackDataLoadedForKnownDateRange = false;
/**
vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run();
*/
}).catch(console.error);
}
else {
// fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false);
Promise.all([
fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false),
/**
// Fetch annotations
fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, {
method: "GET",
headers: {"Content-Type": "application/json"},
signal: signal,
})
.then(function(response) { return response.json(); }),
*/

// Embed chart
embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate),
]).then(function(result) {
$("#spinner").hide();
vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run();
previousResult = {
start: startDate,
end: endDate,
data: result[0]
};
checkSourceMasking(previousResult.data);
playBackDataLoadedForKnownDateRange = false;
/**
vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run();
*/
}).catch(console.error);
}
$("#spinner").hide();
}

{% if active_subpage == "asset_graph" and has_kpis %}
function getAssetKPIs() {
const pickerStartDate = picker.getStartDate().toJSDate();
Expand Down
Loading
Loading