From 70167a10c8f62037b325cee1f59bdbc9d76a99e1 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Mon, 20 Nov 2023 13:38:46 -0500 Subject: [PATCH] Gracefully support planning windows longer than system lifetime (#376) * Gracefully support planning windows longer than system lifetime * Use an hour long planning window by default * Fix config * Update other planning window references * Centralize some logic * Migrate all datetime.now() usages to a common utility to avoid timezone issues * Fix --- config/planner.yml | 7 ++-- src/brad/admin/run_planner.py | 14 ++++--- src/brad/daemon/aurora_metrics.py | 7 ++-- src/brad/daemon/cloudwatch.py | 5 ++- src/brad/daemon/daemon.py | 27 +++++++++--- src/brad/daemon/front_end_metrics.py | 7 ++-- src/brad/daemon/perf_insights.py | 5 ++- src/brad/daemon/redshift_metrics.py | 7 ++-- src/brad/daemon/system_event_logger.py | 5 +-- src/brad/front_end/front_end.py | 9 ++-- src/brad/front_end/session.py | 4 +- src/brad/planner/debug_logger.py | 6 +-- src/brad/planner/metrics.py | 41 ++++++++++++------- src/brad/planner/neighborhood/logger.py | 4 +- src/brad/planner/neighborhood/neighborhood.py | 5 +-- src/brad/planner/triggers/elapsed_time.py | 5 ++- src/brad/planner/triggers/provider.py | 10 ++++- src/brad/planner/triggers/recent_change.py | 18 +++++--- src/brad/planner/triggers/trigger.py | 8 ++-- src/brad/planner/triggers/variable_costs.py | 18 ++++---- src/brad/planner/workload/provider.py | 10 ++++- src/brad/utils/time_periods.py | 16 ++++++++ .../workload_logging/epoch_file_handler.py | 7 +--- .../IMDB_extended/run_repeating_analytics.py | 2 +- 24 files changed, 159 insertions(+), 88 deletions(-) diff --git a/config/planner.yml b/config/planner.yml index 90fcc6c9..fd0194d2 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -5,8 +5,8 @@ strategy: query_based_beam planning_window: weeks: 0 days: 0 - hours: 0 - minutes: 5 + hours: 1 + minutes: 0 reinterpret_second_as: 1 @@ -53,7 +53,8 @@ triggers: ceiling_s: 0.030 sustained_epochs: 3 - recent_change: {} + recent_change: + delay_epochs: 5 ### ### Beam planning constants. diff --git a/src/brad/admin/run_planner.py b/src/brad/admin/run_planner.py index a6ff11a6..e96994fd 100644 --- a/src/brad/admin/run_planner.py +++ b/src/brad/admin/run_planner.py @@ -1,9 +1,8 @@ import asyncio import logging import pathlib -import pytz from typing import Dict, Optional -from datetime import timedelta, datetime +from datetime import timedelta from brad.asset_manager import AssetManager from brad.blueprint import Blueprint @@ -37,6 +36,7 @@ from brad.blueprint.manager import BlueprintManager from brad.front_end.engine_connections import EngineConnections from brad.utils.table_sizer import TableSizer +from brad.utils.time_periods import universal_now logger = logging.getLogger(__name__) @@ -214,14 +214,18 @@ async def run_planner_impl(args) -> None: monitor = Monitor(config, blueprint_mgr) monitor.set_up_metrics_sources() if args.use_fixed_metrics is not None: - now = datetime.now().astimezone(pytz.utc) metrics_provider: MetricsProvider = FixedMetricsProvider( Metrics(**parse_metrics(args.use_fixed_metrics)), - now, + universal_now(), ) else: metrics_provider = WindowedMetricsFromMonitor( - monitor, blueprint_mgr, config, planner_config + monitor, + blueprint_mgr, + config, + planner_config, + # N.B. This means the metrics window will be essentially empty. + universal_now(), ) if config.routing_policy == RoutingPolicy.ForestTableSelectivity: diff --git a/src/brad/daemon/aurora_metrics.py b/src/brad/daemon/aurora_metrics.py index f61be332..fda361a6 100644 --- a/src/brad/daemon/aurora_metrics.py +++ b/src/brad/daemon/aurora_metrics.py @@ -1,8 +1,7 @@ import asyncio import pandas as pd import json -import pytz -from datetime import timedelta, datetime +from datetime import timedelta from typing import List, Optional, Tuple from importlib.resources import files, as_file @@ -15,7 +14,7 @@ from brad.blueprint.manager import BlueprintManager from brad.config.engine import Engine from brad.config.file import ConfigFile -from brad.utils.time_periods import impute_old_missing_metrics +from brad.utils.time_periods import impute_old_missing_metrics, universal_now class AuroraMetrics(MetricsSourceWithForecasting): @@ -88,7 +87,7 @@ async def fetch_latest(self) -> None: ) # See the comment in `redshift_metrics.py`. - now = datetime.now().astimezone(pytz.utc) + now = universal_now() cutoff_ts = now - self.METRICS_DELAY new_metrics = impute_old_missing_metrics(new_metrics, cutoff_ts, value=0.0) new_metrics = new_metrics.dropna() diff --git a/src/brad/daemon/cloudwatch.py b/src/brad/daemon/cloudwatch.py index 0ce82f7b..965fac04 100644 --- a/src/brad/daemon/cloudwatch.py +++ b/src/brad/daemon/cloudwatch.py @@ -4,11 +4,12 @@ import pandas as pd import logging from typing import List, Optional, Tuple -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from .metrics_def import MetricDef from brad.config.engine import Engine from brad.config.file import ConfigFile +from brad.utils.time_periods import universal_now logger = logging.getLogger(__name__) @@ -84,7 +85,7 @@ def fetch_metrics( available later. """ - now = datetime.now(tz=timezone.utc) + now = universal_now() end_time = now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % period # Retrieve more than 1 epoch, for robustness; If we retrieve once per diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 2d9b7eb5..980b5f4f 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -1,12 +1,10 @@ import asyncio import logging import queue -import pytz import os import multiprocessing as mp import numpy as np from typing import Optional, List, Set -from datetime import datetime from brad.asset_manager import AssetManager from brad.blueprint import Blueprint @@ -56,7 +54,7 @@ from brad.planner.workload.provider import LoggedWorkloadProvider from brad.routing.policy import RoutingPolicy from brad.row_list import RowList -from brad.utils.time_periods import period_start +from brad.utils.time_periods import period_start, universal_now logger = logging.getLogger(__name__) @@ -112,6 +110,8 @@ def __init__( # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self._internal_command_tasks: Set[asyncio.Task] = set() + self._startup_timestamp = universal_now() + async def run_forever(self) -> None: """ Starts running the daemon. @@ -193,17 +193,25 @@ async def _run_setup(self) -> None: data_access_provider = _NoopDataAccessProvider() comparator_provider = PerformanceCeilingComparatorProvider(30.0, 0.030) + # Update just to get the most recent startup time. + self._startup_timestamp = universal_now() + providers = BlueprintProviders( workload_provider=LoggedWorkloadProvider( self._config, self._planner_config, self._blueprint_mgr, self._schema_name, + self._startup_timestamp, ), analytics_latency_scorer=latency_scorer, comparator_provider=comparator_provider, metrics_provider=WindowedMetricsFromMonitor( - self._monitor, self._blueprint_mgr, self._config, self._planner_config + self._monitor, + self._blueprint_mgr, + self._config, + self._planner_config, + self._startup_timestamp, ), data_access_provider=data_access_provider, estimator_provider=self._estimator_provider, @@ -213,6 +221,7 @@ async def _run_setup(self) -> None: self._monitor, data_access_provider, self._estimator_provider, + self._startup_timestamp, ), ) self._planner = BlueprintPlannerFactory.create( @@ -524,7 +533,7 @@ async def _handle_internal_command(self, command: str) -> RowList: return to_return elif command.startswith("BRAD_INSPECT_WORKLOAD"): - now = datetime.now().astimezone(pytz.utc) + now = universal_now() epoch_length = self._config.epoch_length planning_window = self._planner_config.planning_window() window_end = period_start(now, self._config.epoch_length) + epoch_length @@ -539,6 +548,14 @@ async def _handle_internal_command(self, command: str) -> RowList: window_multiplier = 1 window_start = window_end - planning_window * window_multiplier + if window_start < self._startup_timestamp: + window_start = period_start( + self._startup_timestamp, self._config.epoch_length + ) + logger.info( + "Adjusting lookback window to start at system startup: %s", + self._startup_timestamp.strftime("%Y-%m-%d %H:%M:%S,%f"), + ) w = ( WorkloadBuilder() .add_queries_from_s3_logs(self._config, window_start, window_end) diff --git a/src/brad/daemon/front_end_metrics.py b/src/brad/daemon/front_end_metrics.py index c014f763..9f426048 100644 --- a/src/brad/daemon/front_end_metrics.py +++ b/src/brad/daemon/front_end_metrics.py @@ -5,7 +5,7 @@ import pytz import copy from typing import Dict, List, Optional -from datetime import datetime, timezone +from datetime import datetime from ddsketch import DDSketch from .metrics_source import MetricsSourceWithForecasting @@ -15,6 +15,7 @@ from brad.daemon.metrics_logger import MetricsLogger from brad.utils.streaming_metric import StreamingMetric, StreamingNumericMetric from brad.utils import log_verbose +from brad.utils.time_periods import universal_now logger = logging.getLogger(__name__) @@ -70,7 +71,7 @@ def __init__( ) async def fetch_latest(self) -> None: - now = datetime.now(tz=timezone.utc) + now = universal_now() num_epochs = 5 end_time = ( now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % self._epoch_length @@ -191,7 +192,7 @@ def _metrics_logger(self) -> Optional[MetricsLogger]: return self._logger def handle_metric_report(self, report: MetricsReport) -> None: - now = datetime.now(tz=timezone.utc) + now = universal_now() fe_index = report.fe_index # Each front end server reports these metrics. diff --git a/src/brad/daemon/perf_insights.py b/src/brad/daemon/perf_insights.py index c655b049..9f3356f1 100644 --- a/src/brad/daemon/perf_insights.py +++ b/src/brad/daemon/perf_insights.py @@ -4,10 +4,11 @@ import pandas as pd from botocore.exceptions import ClientError from typing import List, Optional -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from .metrics_def import MetricDef from brad.config.file import ConfigFile +from brad.utils.time_periods import universal_now logger = logging.getLogger(__name__) @@ -52,7 +53,7 @@ def fetch_metrics( self, metrics_list: List[MetricDef], period: timedelta, num_prev_points: int ) -> pd.DataFrame: # Retrieve datapoints - now = datetime.now(tz=timezone.utc) + now = universal_now() end_time = now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % period # Retrieve more than 1 epoch, for robustness; If we retrieve once per diff --git a/src/brad/daemon/redshift_metrics.py b/src/brad/daemon/redshift_metrics.py index 85a46cce..585d3811 100644 --- a/src/brad/daemon/redshift_metrics.py +++ b/src/brad/daemon/redshift_metrics.py @@ -1,8 +1,7 @@ import asyncio import pandas as pd import json -import pytz -from datetime import timedelta, datetime +from datetime import timedelta from typing import List, Optional from importlib.resources import files, as_file @@ -13,7 +12,7 @@ from .cloudwatch import CloudWatchClient from brad.config.engine import Engine from brad.config.file import ConfigFile -from brad.utils.time_periods import impute_old_missing_metrics +from brad.utils.time_periods import impute_old_missing_metrics, universal_now class RedshiftMetrics(MetricsSourceWithForecasting): @@ -61,7 +60,7 @@ async def fetch_latest(self) -> None: # This approach ensures that clients of this object have reliable access # to metrics (i.e., a set of metrics for a period will only appear in # the DataFrame once we are confident they are all available). - now = datetime.now().astimezone(pytz.utc) + now = universal_now() cutoff_ts = now - self.METRICS_DELAY new_metrics = impute_old_missing_metrics(new_metrics, cutoff_ts, value=0.0) new_metrics = new_metrics.dropna() diff --git a/src/brad/daemon/system_event_logger.py b/src/brad/daemon/system_event_logger.py index 6f703bb7..050b967a 100644 --- a/src/brad/daemon/system_event_logger.py +++ b/src/brad/daemon/system_event_logger.py @@ -1,11 +1,10 @@ import csv import pathlib -import pytz -from datetime import datetime from typing import Optional from brad.config.file import ConfigFile from brad.config.system_event import SystemEvent +from brad.utils.time_periods import universal_now class SystemEventLogger: @@ -33,7 +32,7 @@ def log(self, event: SystemEvent, extra_details: Optional[str] = None) -> None: self._csv_writer.writerow(self._headers) self._logged_header = True - now = datetime.now().replace(tzinfo=pytz.utc) + now = universal_now() self._csv_writer.writerow( [ now.strftime("%Y-%m-%d %H:%M:%S"), diff --git a/src/brad/front_end/front_end.py b/src/brad/front_end/front_end.py index 71e04c3b..e740a47a 100644 --- a/src/brad/front_end/front_end.py +++ b/src/brad/front_end/front_end.py @@ -6,7 +6,7 @@ import os import multiprocessing as mp from typing import AsyncIterable, Optional, Dict, Any -from datetime import datetime, timezone, timedelta +from datetime import timedelta from ddsketch import DDSketch import grpc @@ -47,6 +47,7 @@ from brad.utils.mailbox import Mailbox from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff from brad.utils.run_time_reservoir import RunTimeReservoir +from brad.utils.time_periods import universal_now from brad.workload_logging.epoch_file_handler import EpochFileHandler logger = logging.getLogger(__name__) @@ -334,7 +335,7 @@ async def _run_query_impl( if transactional_query: connection = session.engines.get_connection(engine_to_use) cursor = connection.cursor_sync() - start = datetime.now(tz=timezone.utc) + start = universal_now() if query_rep.is_transaction_start(): session.set_txn_start_timestamp(start) # Using execute_sync() is lower overhead than the async @@ -344,9 +345,9 @@ async def _run_query_impl( else: connection = session.engines.get_reader_connection(engine_to_use) cursor = connection.cursor_sync() - start = datetime.now(tz=timezone.utc) + start = universal_now() await cursor.execute(query_rep.raw_query) - end = datetime.now(tz=timezone.utc) + end = universal_now() except ( pyodbc.ProgrammingError, pyodbc.Error, diff --git a/src/brad/front_end/session.py b/src/brad/front_end/session.py index 19597e07..d726ac5c 100644 --- a/src/brad/front_end/session.py +++ b/src/brad/front_end/session.py @@ -1,6 +1,5 @@ import asyncio import logging -import pytz from datetime import datetime from typing import Dict, Tuple, Optional @@ -12,6 +11,7 @@ from brad.planner.estimator import Estimator from brad.routing.policy import RoutingPolicy from brad.data_stats.postgres_estimator import PostgresEstimator +from brad.utils.time_periods import universal_now logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ def __init__( self._engines = engines self._in_txn = False self._closed = False - self._txn_start_timestamp = datetime.now(tz=pytz.utc) + self._txn_start_timestamp = universal_now() self._estimator = estimator @property diff --git a/src/brad/planner/debug_logger.py b/src/brad/planner/debug_logger.py index 0ae7682a..b76431cb 100644 --- a/src/brad/planner/debug_logger.py +++ b/src/brad/planner/debug_logger.py @@ -1,11 +1,10 @@ import csv -import datetime import pathlib -import pytz import pickle from typing import Optional, Dict, List, Any from brad.config.file import ConfigFile +from brad.utils.time_periods import universal_now class BlueprintPlanningDebugLogger: @@ -68,6 +67,5 @@ def log_object_if_requested( def _get_timestamp_str() -> str: - timestamp = datetime.datetime.now() - timestamp = timestamp.astimezone(pytz.utc) # UTC for consistency. + timestamp = universal_now() return timestamp.strftime("%Y-%m-%d_%H-%M-%S") diff --git a/src/brad/planner/metrics.py b/src/brad/planner/metrics.py index 92b0159d..5b3bbffa 100644 --- a/src/brad/planner/metrics.py +++ b/src/brad/planner/metrics.py @@ -1,6 +1,5 @@ import logging import math -import pytz import pandas as pd import numpy as np from datetime import datetime @@ -12,6 +11,7 @@ from brad.config.metrics import FrontEndMetric from brad.config.planner import PlannerConfig from brad.daemon.monitor import Monitor +from brad.utils.time_periods import elapsed_time, universal_now logger = logging.getLogger(__name__) @@ -74,30 +74,41 @@ def __init__( blueprint_mgr: BlueprintManager, config: ConfigFile, planner_config: PlannerConfig, + system_startup_timestamp: datetime, ) -> None: self._monitor = monitor self._blueprint_mgr = blueprint_mgr self._config = config self._planner_config = planner_config + self._system_startup_timestamp = system_startup_timestamp def get_metrics(self) -> Tuple[Metrics, datetime]: - epochs_per_planning_window = math.ceil( - self._planner_config.planning_window().total_seconds() - / self._config.epoch_length.total_seconds() - ) + running_time = elapsed_time(self._system_startup_timestamp) + planning_window = self._planner_config.planning_window() + epoch_length = self._config.epoch_length + if running_time > planning_window: + epochs_to_extract = math.ceil( + planning_window.total_seconds() / epoch_length.total_seconds() + ) + else: + epochs_to_extract = math.ceil( + running_time.total_seconds() / epoch_length.total_seconds() + ) ( redshift, aurora_writer, aurora_reader, front_end, ) = _extract_metrics_from_monitor( - self._monitor, self._blueprint_mgr, epochs_per_planning_window + self._monitor, self._blueprint_mgr, epochs_to_extract ) if redshift.empty and aurora_writer.empty and front_end.empty: logger.warning("All metrics are empty.") - now = datetime.now().astimezone(pytz.utc) - return (Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), now) + return ( + Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), + universal_now(), + ) assert not front_end.empty, "Front end metrics are empty." @@ -131,12 +142,12 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: str(most_recent_common), ) - aggregate_epochs = min(len(common_timestamps), epochs_per_planning_window) - if aggregate_epochs < epochs_per_planning_window: + aggregate_epochs = min(len(common_timestamps), epochs_to_extract) + if aggregate_epochs < epochs_to_extract: logger.warning( - "Aggregating metrics across %d epochs. The planning window has %d epochs.", + "Aggregating metrics across %d epochs. Wanted to extract %d epochs.", aggregate_epochs, - epochs_per_planning_window, + epochs_to_extract, ) agg_cfg = self._planner_config.metrics_agg() @@ -327,8 +338,10 @@ def get_metrics(self) -> Tuple[Metrics, datetime]: if redshift.empty and aurora_writer.empty and front_end.empty: logger.warning("All metrics are empty.") - now = datetime.now().astimezone(pytz.utc) - return (Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), now) + return ( + Metrics(1.0, 1.0, 1.0, 100.0, 100.0, 1.0, 1.0, 1.0, 0.0, 0.0), + universal_now(), + ) assert not front_end.empty, "Front end metrics are empty." diff --git a/src/brad/planner/neighborhood/logger.py b/src/brad/planner/neighborhood/logger.py index e8459789..89fa5ff6 100644 --- a/src/brad/planner/neighborhood/logger.py +++ b/src/brad/planner/neighborhood/logger.py @@ -1,8 +1,8 @@ import csv -import datetime from brad.blueprint import Blueprint from brad.planner.neighborhood.score import Score +from brad.utils.time_periods import universal_now class BlueprintPlanningLogger: @@ -11,7 +11,7 @@ class BlueprintPlanningLogger: """ def __init__(self) -> None: - curr_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + curr_time = universal_now().strftime("%Y-%m-%d_%H-%M-%S") out_file_name = f"brad_planning_{curr_time}.csv" self._out_file = open(out_file_name, "w", encoding="UTF-8") self._first_log = True diff --git a/src/brad/planner/neighborhood/neighborhood.py b/src/brad/planner/neighborhood/neighborhood.py index 518abb9d..b353c06a 100644 --- a/src/brad/planner/neighborhood/neighborhood.py +++ b/src/brad/planner/neighborhood/neighborhood.py @@ -1,10 +1,8 @@ import asyncio import logging -import pytz import pandas as pd from io import TextIOWrapper from typing import Dict, List, Optional, Tuple -from datetime import datetime from brad.blueprint.blueprint import Blueprint from brad.config.engine import Engine @@ -32,6 +30,7 @@ from brad.routing.rule_based import RuleBased from brad.front_end.engine_connections import EngineConnections from brad.utils.table_sizer import TableSizer +from brad.utils.time_periods import universal_now # from brad.planner.neighborhood.scaling_scorer import ALL_METRICS @@ -97,7 +96,7 @@ async def _run_replan_impl( current_workload, next_workload, ) = await self._providers.workload_provider.get_workloads( - datetime.now().astimezone(pytz.utc), window_multiplier + universal_now(), window_multiplier ) workload_filters = [ AuroraTransactions(next_workload), diff --git a/src/brad/planner/triggers/elapsed_time.py b/src/brad/planner/triggers/elapsed_time.py index aff2a6f5..51729500 100644 --- a/src/brad/planner/triggers/elapsed_time.py +++ b/src/brad/planner/triggers/elapsed_time.py @@ -1,9 +1,10 @@ import logging -from datetime import timedelta, datetime +from datetime import timedelta from typing import Optional from brad.blueprint import Blueprint from brad.planner.scoring.score import Score +from brad.utils.time_periods import universal_now from .trigger import Trigger @@ -17,7 +18,7 @@ def __init__(self, period: timedelta, epoch_length: timedelta) -> None: self._reset_trigger_next() async def should_replan(self) -> bool: - now = datetime.now() + now = universal_now() if now >= self._trigger_next: self._reset_trigger_next() logger.info( diff --git a/src/brad/planner/triggers/provider.py b/src/brad/planner/triggers/provider.py index 64179c48..ca50fb4b 100644 --- a/src/brad/planner/triggers/provider.py +++ b/src/brad/planner/triggers/provider.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List from brad.config.file import ConfigFile @@ -37,12 +38,14 @@ def __init__( monitor: Monitor, data_access_provider: DataAccessProvider, estimator_provider: EstimatorProvider, + startup_timestamp: datetime, ) -> None: self._config = config self._planner_config = planner_config self._monitor = monitor self._data_access_provider = data_access_provider self._estimator_provider = estimator_provider + self._startup_timestamp = startup_timestamp def get_triggers(self) -> List[Trigger]: trigger_config = self._planner_config.trigger_configs() @@ -90,6 +93,7 @@ def get_triggers(self) -> List[Trigger]: self._estimator_provider, var_costs["threshold"], self._config.epoch_length, + self._startup_timestamp, ) ) @@ -118,7 +122,11 @@ def get_triggers(self) -> List[Trigger]: recent_change = trigger_config["recent_change"] if "disabled" not in recent_change: trigger_list.append( - RecentChange(self._planner_config, self._config.epoch_length) + RecentChange( + self._planner_config, + self._config.epoch_length, + recent_change["delay_epochs"], + ) ) return trigger_list diff --git a/src/brad/planner/triggers/recent_change.py b/src/brad/planner/triggers/recent_change.py index f6d51a8e..d1040c17 100644 --- a/src/brad/planner/triggers/recent_change.py +++ b/src/brad/planner/triggers/recent_change.py @@ -1,5 +1,4 @@ import logging -import pytz from datetime import timedelta, datetime from typing import Optional @@ -9,6 +8,7 @@ from brad.daemon.aurora_metrics import AuroraMetrics from brad.daemon.redshift_metrics import RedshiftMetrics from brad.planner.scoring.score import Score +from brad.utils.time_periods import universal_now from .trigger import Trigger @@ -20,7 +20,12 @@ class RecentChange(Trigger): This triggers a replan if there was a recent provisioning change. """ - def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> None: + def __init__( + self, + planner_config: PlannerConfig, + epoch_length: timedelta, + delay_epochs: int, + ) -> None: super().__init__(epoch_length) self._planner_config = planner_config self._is_first_change = True @@ -30,15 +35,16 @@ def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> No self._metrics_delay = max( AuroraMetrics.METRICS_DELAY, RedshiftMetrics.METRICS_DELAY ) + self._delay_epochs = delay_epochs async def should_replan(self) -> bool: if self._last_provisioning_change is None: return False - window = self._planner_config.planning_window() - now = datetime.now(tz=pytz.utc) + delay_window = self._delay_epochs * self._epoch_length + now = universal_now() - if now > self._last_provisioning_change + window + self._metrics_delay: + if now > self._last_provisioning_change + delay_window + self._metrics_delay: self._last_provisioning_change = None logger.info("Triggering replan because of a recent provisioning change.") return True @@ -69,7 +75,7 @@ def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None aurora_diff = diff.aurora_diff() redshift_diff = diff.redshift_diff() if aurora_diff is not None or redshift_diff is not None: - self._last_provisioning_change = datetime.now(tz=pytz.utc) + self._last_provisioning_change = universal_now() logger.info( "RecentChangeTrigger: Will trigger one planning window after %s", self._last_provisioning_change.strftime("%Y-%m-%d_%H-%M-%S"), diff --git a/src/brad/planner/triggers/trigger.py b/src/brad/planner/triggers/trigger.py index d00ad500..dfeca7ed 100644 --- a/src/brad/planner/triggers/trigger.py +++ b/src/brad/planner/triggers/trigger.py @@ -1,8 +1,8 @@ -import pytz from typing import Optional -from datetime import datetime, timedelta +from datetime import timedelta from brad.blueprint import Blueprint from brad.planner.scoring.score import Score +from brad.utils.time_periods import universal_now class Trigger: @@ -40,8 +40,8 @@ def on_replan(self, trigger: Optional["Trigger"]) -> None: """ def _reset_cutoff(self) -> None: - self._cutoff = datetime.now(tz=pytz.utc) + self._cutoff = universal_now() def _passed_n_epochs_since_cutoff(self, n: int) -> bool: - elapsed = datetime.now(tz=pytz.utc) - self._cutoff + elapsed = universal_now() - self._cutoff return elapsed >= n * self._epoch_length diff --git a/src/brad/planner/triggers/variable_costs.py b/src/brad/planner/triggers/variable_costs.py index d2e61c9c..ee3b7b04 100644 --- a/src/brad/planner/triggers/variable_costs.py +++ b/src/brad/planner/triggers/variable_costs.py @@ -1,6 +1,5 @@ import logging import math -import pytz import pandas as pd from datetime import datetime, timedelta from typing import List, Tuple @@ -21,6 +20,7 @@ compute_athena_scanned_bytes, ) from brad.routing.router import Router +from brad.utils.time_periods import elapsed_time, universal_now logger = logging.getLogger(__name__) @@ -35,6 +35,7 @@ def __init__( estimator_provider: EstimatorProvider, threshold_frac: float, epoch_length: timedelta, + startup_timestamp: datetime, ) -> None: """ This will trigger a replan if the current variable costs (currently, @@ -51,6 +52,7 @@ def __init__( self._data_access_provider = data_access_provider self._estimator_provider = estimator_provider self._change_ratio = 1.0 + threshold_frac + self._startup_timestamp = startup_timestamp async def should_replan(self) -> bool: if self._current_blueprint is None or self._current_score is None: @@ -98,13 +100,13 @@ async def _estimate_current_scan_hourly_cost(self) -> Tuple[float, float]: return 0.0, 0.0 # Extract the queries seen in the last window. - window_end = datetime.now() - window_end = window_end.astimezone(pytz.utc) - window_start = ( - window_end - - self._planner_config.planning_window() - - self._config.epoch_length - ) + window_end = universal_now() + planning_window = self._planner_config.planning_window() + running_time = elapsed_time(self._startup_timestamp) + if running_time > planning_window: + window_start = window_end - planning_window - self._config.epoch_length + else: + window_start = self._startup_timestamp logger.debug("Variable costs range: %s -- %s", window_start, window_end) workload = ( WorkloadBuilder() diff --git a/src/brad/planner/workload/provider.py b/src/brad/planner/workload/provider.py index 64404bb1..6e569f86 100644 --- a/src/brad/planner/workload/provider.py +++ b/src/brad/planner/workload/provider.py @@ -6,10 +6,10 @@ from brad.config.engine import Engine from brad.config.file import ConfigFile from brad.config.planner import PlannerConfig +from brad.front_end.engine_connections import EngineConnections from brad.planner.workload import Workload from brad.planner.workload.builder import WorkloadBuilder from brad.utils.table_sizer import TableSizer -from brad.front_end.engine_connections import EngineConnections logger = logging.getLogger(__name__) @@ -71,12 +71,14 @@ def __init__( planner_config: PlannerConfig, blueprint_mgr: BlueprintManager, schema_name: str, + system_startup_time: datetime, ) -> None: self._config = config self._planner_config = planner_config self._workload: Optional[Workload] = None self._blueprint_mgr = blueprint_mgr self._schema_name = schema_name + self._system_startup_time = system_startup_time async def get_workloads( self, @@ -86,6 +88,12 @@ async def get_workloads( ) -> Tuple[Workload, Workload]: window_length = self._planner_config.planning_window() * window_multiplier window_start = window_end - window_length + if window_start < self._system_startup_time: + logger.info( + "Adjusting lookback window to start at system startup: %s", + self._system_startup_time.strftime("%Y-%m-%d %H:%M:%S,%f"), + ) + window_start = self._system_startup_time logger.debug( "Retrieving workload in range %s -- %s. Length: %s", window_start, diff --git a/src/brad/utils/time_periods.py b/src/brad/utils/time_periods.py index 602cc467..ce8fc9fc 100644 --- a/src/brad/utils/time_periods.py +++ b/src/brad/utils/time_periods.py @@ -1,3 +1,4 @@ +import pytz import pandas as pd from typing import Iterator from datetime import datetime, timedelta @@ -64,3 +65,18 @@ def time_point_intersect(start: datetime, end: datetime, timepoint: datetime) -> NOTE: The left endpoint is inclusive. The right endpoint is exclusive. """ return timepoint >= start and timepoint < end + + +def elapsed_time(startup_timestamp: datetime) -> timedelta: + """ + Returns the time elapsed since the provided system startup timestamp. + """ + return universal_now() - startup_timestamp + + +def universal_now() -> datetime: + """ + Returns a timestamp that represents the current date and time in a + standardized timezone. + """ + return datetime.now(tz=pytz.utc) diff --git a/src/brad/workload_logging/epoch_file_handler.py b/src/brad/workload_logging/epoch_file_handler.py index 90cd8256..78c02a8d 100644 --- a/src/brad/workload_logging/epoch_file_handler.py +++ b/src/brad/workload_logging/epoch_file_handler.py @@ -4,13 +4,12 @@ import logging import os import pathlib -import pytz from collections import deque from datetime import datetime, timedelta, timezone from typing import Optional, Tuple, Deque from .common import TIMESTAMP_PREFIX_FORMAT -from brad.utils.time_periods import period_start +from brad.utils.time_periods import period_start, universal_now logger = logging.getLogger(__name__) @@ -74,9 +73,7 @@ async def refresh(self) -> None: Meant to be called periodically (once an "epoch") to do any cleanup tasks (e.g., closing the current epoch, uploading files). """ - epoch_start = period_start( - datetime.now().astimezone(pytz.utc), self._epoch_length - ) + epoch_start = period_start(universal_now(), self._epoch_length) self._close_epoch_and_advance_if_needed(epoch_start) await self._do_uploads() diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index eac1e5f7..46eaf14e 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -775,7 +775,7 @@ def signal_handler(_signal, _frame): p.join() for idx, p in enumerate(processes): - print("Runner {idx} exit code:", p.exitcode, flush=True, file=sys.stderr) + print(f"Runner {idx} exit code:", p.exitcode, flush=True, file=sys.stderr) print("Done repeating analytics!", flush=True, file=sys.stderr)