Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flexmeasures/api/dev/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.services.annotations import prepare_annotations_for_chart
from flexmeasures.ui.utils.view_utils import set_session_variables
from flexmeasures.utils.validation_utils import (
get_timed_belief_min_v,
)


class SensorAPI(FlaskView):
Expand Down Expand Up @@ -90,6 +93,7 @@ def get_chart(self, id: int, sensor: Sensor, **kwargs):
"most_recent_beliefs_only": fields.Boolean(
required=False, load_default=True
),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
},
location="query",
)
Expand All @@ -110,6 +114,7 @@ def get_chart_data(self, id: int, sensor: Sensor, **kwargs):
- "resolution" (see :ref:`resolutions`)
- "most_recent_beliefs_only" (if true, returns the most recent belief for each event; if false, returns each belief for each event; defaults to true)
"""
kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session)
return sensor.search_beliefs(as_json=True, **kwargs)

@route("/<id>/chart_annotations", strict_slashes=False)
Expand Down
5 changes: 5 additions & 0 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from flexmeasures.utils.coding_utils import (
flatten_unique,
)
from flexmeasures.utils.validation_utils import (
get_timed_belief_min_v,
)
from flexmeasures.ui.utils.view_utils import clear_session, set_session_variables
from flexmeasures.auth.policy import check_access
from werkzeug.exceptions import Forbidden, Unauthorized
Expand Down Expand Up @@ -657,6 +660,7 @@ def get_chart(self, id: int, asset: GenericAsset, **kwargs):
"beliefs_after": AwareDateTimeField(format="iso", required=False),
"beliefs_before": AwareDateTimeField(format="iso", required=False),
"most_recent_beliefs_only": fields.Boolean(required=False),
"use_materialized_view": fields.Boolean(required=False, load_default=True),
},
location="query",
)
Expand All @@ -669,6 +673,7 @@ def get_chart_data(self, id: int, asset: GenericAsset, **kwargs):
Data for use in charts (in case you have the chart specs already).
"""
sensors = flatten_unique(asset.validate_sensors_to_show())
kwargs["timed_belief_min_v"] = get_timed_belief_min_v(db.session)
return asset.search_beliefs(sensors=sensors, as_json=True, **kwargs)

@route("/<id>/auditlog")
Expand Down
29 changes: 29 additions & 0 deletions flexmeasures/cli/db_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import click

from flexmeasures.cli.utils import MsgStyle
from flexmeasures.data import db


@click.group("db-ops")
Expand Down Expand Up @@ -95,4 +96,32 @@ def restore(file: str):
click.secho("db restore unsuccessful", **MsgStyle.ERROR)


@fm_db_ops.command("refresh-materialized-views")
@with_appcontext
@click.option("--concurrent", is_flag=True, default=False)
def refresh_materialized_views(concurrent: bool):
"""Refresh materialized views for better query performance."""
from sqlalchemy import text

refresh_type = "CONCURRENTLY" if concurrent else ""
import time

start_time = time.time()
click.echo(
f"Refreshing materialized views {'CONCURRENTLY' if concurrent else 'without concurrency'}..."
)
try:
db.session.execute(
text(f"REFRESH MATERIALIZED {refresh_type} VIEW timed_belief_min_v;")
)
db.session.commit()
elapsed_time = time.time() - start_time
click.echo(
f"✓ Materialized views refreshed successfully in {elapsed_time:.2f} seconds"
)
except Exception as e:
db.session.rollback()
click.echo(f"✗ Error refreshing materialized views: {e}")


app.cli.add_command(fm_db_ops)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Add materialized view for belief optimization

Revision ID: c98798csds8c
Revises: b8f3cda5e023
Create Date: 2025-08-08 04:55:33.722545

"""

from alembic import op

# revision identifiers
revision = "c98798csds8c"
down_revision = "b8f3cda5e023"
branch_labels = None
depends_on = None


def upgrade():
# Create the materialized view with proper alias
op.execute(
"""
CREATE MATERIALIZED VIEW timed_belief_min_v AS
SELECT *
FROM (
SELECT
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id,
MIN(timed_belief.belief_horizon) AS most_recent_belief_horizon
FROM timed_belief
INNER JOIN data_source
ON data_source.id = timed_belief.source_id
GROUP BY
timed_belief.sensor_id,
timed_belief.event_start,
timed_belief.source_id
) AS belief_mins
GROUP BY
sensor_id,
event_start,
source_id,
most_recent_belief_horizon;
"""
)

# Create indexes
op.execute(
"""
CREATE INDEX idx_timed_belief_min_v_sensor_event
ON timed_belief_min_v(sensor_id, event_start);
"""
)

op.execute(
"""
CREATE INDEX idx_timed_belief_min_v_event_start
ON timed_belief_min_v(event_start);
"""
)


def downgrade():
op.execute("DROP MATERIALIZED VIEW IF EXISTS timed_belief_min_v CASCADE;")
6 changes: 5 additions & 1 deletion flexmeasures/data/models/generic_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask import current_app
from flask_security import current_user
import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.engine import Row
from sqlalchemy.ext.hybrid import hybrid_method
from sqlalchemy.sql.expression import func, text
Expand Down Expand Up @@ -647,6 +647,8 @@ def search_beliefs(
most_recent_events_only: bool = False,
as_json: bool = False,
resolution: timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> BeliefsDataFrame | str:
"""Search all beliefs about events for all sensors of this asset

Expand Down Expand Up @@ -686,6 +688,8 @@ def search_beliefs(
most_recent_events_only=most_recent_events_only,
one_deterministic_belief_per_event_per_source=True,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json:
from flexmeasures.data.services.time_series import simplify_index
Expand Down
10 changes: 9 additions & 1 deletion flexmeasures/data/models/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flask import current_app

import pandas as pd
from sqlalchemy import select
from sqlalchemy import select, Table
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.schema import UniqueConstraint
Expand Down Expand Up @@ -354,6 +354,8 @@ def search_beliefs(
one_deterministic_belief_per_event_per_source: bool = False,
as_json: bool = False,
resolution: str | timedelta | None = None,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | str:
"""Search all beliefs about events for this sensor.

Expand Down Expand Up @@ -398,6 +400,8 @@ def search_beliefs(
one_deterministic_belief_per_event=one_deterministic_belief_per_event,
one_deterministic_belief_per_event_per_source=one_deterministic_belief_per_event_per_source,
resolution=resolution,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if as_json:
df = bdf.reset_index()
Expand Down Expand Up @@ -715,6 +719,8 @@ def search(
one_deterministic_belief_per_event_per_source: bool = False,
resolution: str | timedelta = None,
sum_multiple: bool = True,
use_materialized_view: bool = True,
timed_belief_min_v: Table | None = None,
) -> tb.BeliefsDataFrame | dict[str, tb.BeliefsDataFrame]:
"""Search all beliefs about events for the given sensors.

Expand Down Expand Up @@ -798,6 +804,8 @@ def search(
**most_recent_filters,
custom_filter_criteria=source_criteria,
custom_join_targets=custom_join_targets,
use_materialized_view=use_materialized_view,
timed_belief_min_v=timed_belief_min_v,
)
if use_latest_version_per_event:
bdf = keep_latest_version(
Expand Down
2 changes: 2 additions & 0 deletions flexmeasures/data/schemas/tests/test_input_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def test_input_schema():
# These arguments are not mapped to a field at all (state a reason)
excluded_arg_names = [
"as_json", # used in Sensor.search_beliefs but not in TimedBelief.search
"timed_belief_min_v", # used in Sensor.search_beliefs but not in TimedBelief.search
"use_materialized_view", # used in Sensor.search_beliefs but not in
]

arg_names_without_associated_fields = [
Expand Down
96 changes: 94 additions & 2 deletions flexmeasures/ui/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,13 @@
{% endfor %}
</ul>
</li>
{% if active_subpage == "asset_graph" %}
<div class="form-check form-switch" style="margin-left: auto;">
<input class="form-check-input" type="checkbox" id="useMaterializedView" checked title="Reload this graph with latest data instead of using recent cache (updated every {{ mv_refresh_interval }} minutes). This will take longer to load.">
<label class="form-check-label" for="useMaterializedView">Use materialized view</label>
</div>
{% endif %}
<div class="form-check form-switch" style="margin-left: 10px;">
<input class="form-check-input" type="checkbox" {% if breadcrumb_info['current_asset_view'] == session.get("default_asset_view") %} checked {% endif %} id="defaultAssetView" onchange="setDefaultAssetView(this, '{{ breadcrumb_info["current_asset_view"] }}')">
<label class="form-check-label" for="defaultAssetView">Set as my default asset view</label>
</div>
Expand Down Expand Up @@ -893,20 +899,106 @@
* @param {String} queryEndDate The end date as a string in ISO format, to be used in the query.
* @return {Promise} A promise that resolves with the data.
*/
function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate) {
function fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, useMaterializedView=true) {
if (previousResult && previousResult.start.getTime() === startDate.getTime() && previousResult.end.getTime() === endDate.getTime()){
return Promise.resolve(previousResult.data);
} else {
{% if active_subpage == "asset_graph" and has_kpis %}
getAssetKPIs();
{% endif %}
return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, {
return fetch(dataPath + '/chart_data?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&use_materialized_view=' + useMaterializedView, {
method: "GET",
headers: { "Content-Type": "application/json" },
signal: signal,
}).then(response => response.json());
}
}

const useMaterializedViewCheckbox = document.getElementById("useMaterializedView");
useMaterializedViewCheckbox.addEventListener("change", function() {
toggleMaterializedView();
});

// Toggle materialized view usage for timely beliefs
function toggleMaterializedView() {
// Get the checked status of the checkbox
const isChecked = useMaterializedViewCheckbox.checked;
const pickerStartDate = picker.getStartDate().toJSDate();
const queryStartDate= encodeURIComponent(toIsoStringWithOffset(pickerStartDate));
var queryEndDate = picker.getEndDate();
queryEndDate.setDate(queryEndDate.getDate() + 1);
queryEndDate = encodeURIComponent(toIsoStringWithOffset(queryEndDate.toJSDate()));
// Since we are going to make a call anyway so we'll send these values as null
const startDate = null;
const endDate = null;
previousResult = null;

$("#spinner").show();
if (isChecked) {
console.log("Checked.")
// fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true);
Promise.all([
fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, true),
/**
// Fetch annotations
fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, {
method: "GET",
headers: {"Content-Type": "application/json"},
signal: signal,
})
.then(function(response) { return response.json(); }),
*/

// Embed chart
embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate),
]).then(function(result) {
$("#spinner").hide();
vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run();
previousResult = {
start: startDate,
end: endDate,
data: result[0]
};
checkSourceMasking(previousResult.data);
playBackDataLoadedForKnownDateRange = false;
/**
vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run();
*/
}).catch(console.error);
}
else {
// fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false);
Promise.all([
fetchGraphDataAndKPIs(previousResult, startDate, endDate, queryStartDate, queryEndDate, false),
/**
// Fetch annotations
fetch(dataPath + '/chart_annotations?event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate, {
method: "GET",
headers: {"Content-Type": "application/json"},
signal: signal,
})
.then(function(response) { return response.json(); }),
*/

// Embed chart
embedAndLoad(chartSpecsPath + 'event_starts_after=' + queryStartDate + '&event_ends_before=' + queryEndDate + '&', elementId, datasetName, previousResult, startDate, endDate),
]).then(function(result) {
$("#spinner").hide();
vegaView.change(datasetName, vega.changeset().remove(vega.truthy).insert(result[0])).resize().run();
previousResult = {
start: startDate,
end: endDate,
data: result[0]
};
checkSourceMasking(previousResult.data);
playBackDataLoadedForKnownDateRange = false;
/**
vegaView.change(datasetName + '_annotations', vega.changeset().remove(vega.truthy).insert(result[1])).resize().run();
*/
}).catch(console.error);
}
$("#spinner").hide();
}

{% if active_subpage == "asset_graph" and has_kpis %}
function getAssetKPIs() {
Expand Down
4 changes: 4 additions & 0 deletions flexmeasures/ui/views/assets/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,16 @@ def graphs(self, id: str, start_time=None, end_time=None):
asset_form = AssetForm()
asset_form.with_options()
asset_form.process(obj=asset)
mv_refresh_interval = current_app.config.get(
"FLEXMEASURES_MVIEW_UPDATE_INTERVAL", None
)

return render_flexmeasures_template(
"assets/asset_graph.html",
asset=asset,
has_kpis=has_kpis,
asset_kpis=asset_kpis,
mv_refresh_interval=mv_refresh_interval,
current_page="Graphs",
)

Expand Down
1 change: 1 addition & 0 deletions flexmeasures/utils/config_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Config(object):
DEBUG: bool = False
LOGGING_LEVEL: int = logging.WARNING
SECRET_KEY: str | None = None
FLEXMEASURES_MVIEW_UPDATE_INTERVAL: int | None = None

FLEXMEASURES_ENV_DEFAULT = "production"

Expand Down
Loading
Loading