Skip to content

Commit

Permalink
Gracefully support planning windows longer than system lifetime (#376)
Browse files Browse the repository at this point in the history
* Gracefully support planning windows longer than system lifetime

* Use an hour long planning window by default

* Fix config

* Update other planning window references

* Centralize some logic

* Migrate all datetime.now() usages to a common utility to avoid timezone issues

* Fix
  • Loading branch information
geoffxy authored Nov 20, 2023
1 parent d2add61 commit 70167a1
Show file tree
Hide file tree
Showing 24 changed files with 159 additions and 88 deletions.
7 changes: 4 additions & 3 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ strategy: query_based_beam
planning_window:
weeks: 0
days: 0
hours: 0
minutes: 5
hours: 1
minutes: 0

reinterpret_second_as: 1

Expand Down Expand Up @@ -53,7 +53,8 @@ triggers:
ceiling_s: 0.030
sustained_epochs: 3

recent_change: {}
recent_change:
delay_epochs: 5

###
### Beam planning constants.
Expand Down
14 changes: 9 additions & 5 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import asyncio
import logging
import pathlib
import pytz
from typing import Dict, Optional
from datetime import timedelta, datetime
from datetime import timedelta

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
Expand Down Expand Up @@ -37,6 +36,7 @@
from brad.blueprint.manager import BlueprintManager
from brad.front_end.engine_connections import EngineConnections
from brad.utils.table_sizer import TableSizer
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -214,14 +214,18 @@ async def run_planner_impl(args) -> None:
monitor = Monitor(config, blueprint_mgr)
monitor.set_up_metrics_sources()
if args.use_fixed_metrics is not None:
now = datetime.now().astimezone(pytz.utc)
metrics_provider: MetricsProvider = FixedMetricsProvider(
Metrics(**parse_metrics(args.use_fixed_metrics)),
now,
universal_now(),
)
else:
metrics_provider = WindowedMetricsFromMonitor(
monitor, blueprint_mgr, config, planner_config
monitor,
blueprint_mgr,
config,
planner_config,
# N.B. This means the metrics window will be essentially empty.
universal_now(),
)

if config.routing_policy == RoutingPolicy.ForestTableSelectivity:
Expand Down
7 changes: 3 additions & 4 deletions src/brad/daemon/aurora_metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
import pandas as pd
import json
import pytz
from datetime import timedelta, datetime
from datetime import timedelta
from typing import List, Optional, Tuple
from importlib.resources import files, as_file

Expand All @@ -15,7 +14,7 @@
from brad.blueprint.manager import BlueprintManager
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.utils.time_periods import impute_old_missing_metrics
from brad.utils.time_periods import impute_old_missing_metrics, universal_now


class AuroraMetrics(MetricsSourceWithForecasting):
Expand Down Expand Up @@ -88,7 +87,7 @@ async def fetch_latest(self) -> None:
)

# See the comment in `redshift_metrics.py`.
now = datetime.now().astimezone(pytz.utc)
now = universal_now()
cutoff_ts = now - self.METRICS_DELAY
new_metrics = impute_old_missing_metrics(new_metrics, cutoff_ts, value=0.0)
new_metrics = new_metrics.dropna()
Expand Down
5 changes: 3 additions & 2 deletions src/brad/daemon/cloudwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import pandas as pd
import logging
from typing import List, Optional, Tuple
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta

from .metrics_def import MetricDef
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,7 +85,7 @@ def fetch_metrics(
available later.
"""

now = datetime.now(tz=timezone.utc)
now = universal_now()
end_time = now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % period

# Retrieve more than 1 epoch, for robustness; If we retrieve once per
Expand Down
27 changes: 22 additions & 5 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import logging
import queue
import pytz
import os
import multiprocessing as mp
import numpy as np
from typing import Optional, List, Set
from datetime import datetime

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
Expand Down Expand Up @@ -56,7 +54,7 @@
from brad.planner.workload.provider import LoggedWorkloadProvider
from brad.routing.policy import RoutingPolicy
from brad.row_list import RowList
from brad.utils.time_periods import period_start
from brad.utils.time_periods import period_start, universal_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,6 +110,8 @@ def __init__(
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
self._internal_command_tasks: Set[asyncio.Task] = set()

self._startup_timestamp = universal_now()

async def run_forever(self) -> None:
"""
Starts running the daemon.
Expand Down Expand Up @@ -193,17 +193,25 @@ async def _run_setup(self) -> None:
data_access_provider = _NoopDataAccessProvider()
comparator_provider = PerformanceCeilingComparatorProvider(30.0, 0.030)

# Update just to get the most recent startup time.
self._startup_timestamp = universal_now()

providers = BlueprintProviders(
workload_provider=LoggedWorkloadProvider(
self._config,
self._planner_config,
self._blueprint_mgr,
self._schema_name,
self._startup_timestamp,
),
analytics_latency_scorer=latency_scorer,
comparator_provider=comparator_provider,
metrics_provider=WindowedMetricsFromMonitor(
self._monitor, self._blueprint_mgr, self._config, self._planner_config
self._monitor,
self._blueprint_mgr,
self._config,
self._planner_config,
self._startup_timestamp,
),
data_access_provider=data_access_provider,
estimator_provider=self._estimator_provider,
Expand All @@ -213,6 +221,7 @@ async def _run_setup(self) -> None:
self._monitor,
data_access_provider,
self._estimator_provider,
self._startup_timestamp,
),
)
self._planner = BlueprintPlannerFactory.create(
Expand Down Expand Up @@ -524,7 +533,7 @@ async def _handle_internal_command(self, command: str) -> RowList:
return to_return

elif command.startswith("BRAD_INSPECT_WORKLOAD"):
now = datetime.now().astimezone(pytz.utc)
now = universal_now()
epoch_length = self._config.epoch_length
planning_window = self._planner_config.planning_window()
window_end = period_start(now, self._config.epoch_length) + epoch_length
Expand All @@ -539,6 +548,14 @@ async def _handle_internal_command(self, command: str) -> RowList:
window_multiplier = 1

window_start = window_end - planning_window * window_multiplier
if window_start < self._startup_timestamp:
window_start = period_start(
self._startup_timestamp, self._config.epoch_length
)
logger.info(
"Adjusting lookback window to start at system startup: %s",
self._startup_timestamp.strftime("%Y-%m-%d %H:%M:%S,%f"),
)
w = (
WorkloadBuilder()
.add_queries_from_s3_logs(self._config, window_start, window_end)
Expand Down
7 changes: 4 additions & 3 deletions src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytz
import copy
from typing import Dict, List, Optional
from datetime import datetime, timezone
from datetime import datetime
from ddsketch import DDSketch

from .metrics_source import MetricsSourceWithForecasting
Expand All @@ -15,6 +15,7 @@
from brad.daemon.metrics_logger import MetricsLogger
from brad.utils.streaming_metric import StreamingMetric, StreamingNumericMetric
from brad.utils import log_verbose
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,7 +71,7 @@ def __init__(
)

async def fetch_latest(self) -> None:
now = datetime.now(tz=timezone.utc)
now = universal_now()
num_epochs = 5
end_time = (
now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % self._epoch_length
Expand Down Expand Up @@ -191,7 +192,7 @@ def _metrics_logger(self) -> Optional[MetricsLogger]:
return self._logger

def handle_metric_report(self, report: MetricsReport) -> None:
now = datetime.now(tz=timezone.utc)
now = universal_now()
fe_index = report.fe_index

# Each front end server reports these metrics.
Expand Down
5 changes: 3 additions & 2 deletions src/brad/daemon/perf_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import pandas as pd
from botocore.exceptions import ClientError
from typing import List, Optional
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta

from .metrics_def import MetricDef
from brad.config.file import ConfigFile
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +53,7 @@ def fetch_metrics(
self, metrics_list: List[MetricDef], period: timedelta, num_prev_points: int
) -> pd.DataFrame:
# Retrieve datapoints
now = datetime.now(tz=timezone.utc)
now = universal_now()
end_time = now - (now - datetime.min.replace(tzinfo=pytz.UTC)) % period

# Retrieve more than 1 epoch, for robustness; If we retrieve once per
Expand Down
7 changes: 3 additions & 4 deletions src/brad/daemon/redshift_metrics.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
import pandas as pd
import json
import pytz
from datetime import timedelta, datetime
from datetime import timedelta
from typing import List, Optional
from importlib.resources import files, as_file

Expand All @@ -13,7 +12,7 @@
from .cloudwatch import CloudWatchClient
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.utils.time_periods import impute_old_missing_metrics
from brad.utils.time_periods import impute_old_missing_metrics, universal_now


class RedshiftMetrics(MetricsSourceWithForecasting):
Expand Down Expand Up @@ -61,7 +60,7 @@ async def fetch_latest(self) -> None:
# This approach ensures that clients of this object have reliable access
# to metrics (i.e., a set of metrics for a period will only appear in
# the DataFrame once we are confident they are all available).
now = datetime.now().astimezone(pytz.utc)
now = universal_now()
cutoff_ts = now - self.METRICS_DELAY
new_metrics = impute_old_missing_metrics(new_metrics, cutoff_ts, value=0.0)
new_metrics = new_metrics.dropna()
Expand Down
5 changes: 2 additions & 3 deletions src/brad/daemon/system_event_logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import csv
import pathlib
import pytz
from datetime import datetime
from typing import Optional

from brad.config.file import ConfigFile
from brad.config.system_event import SystemEvent
from brad.utils.time_periods import universal_now


class SystemEventLogger:
Expand Down Expand Up @@ -33,7 +32,7 @@ def log(self, event: SystemEvent, extra_details: Optional[str] = None) -> None:
self._csv_writer.writerow(self._headers)
self._logged_header = True

now = datetime.now().replace(tzinfo=pytz.utc)
now = universal_now()
self._csv_writer.writerow(
[
now.strftime("%Y-%m-%d %H:%M:%S"),
Expand Down
9 changes: 5 additions & 4 deletions src/brad/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import multiprocessing as mp
from typing import AsyncIterable, Optional, Dict, Any
from datetime import datetime, timezone, timedelta
from datetime import timedelta
from ddsketch import DDSketch

import grpc
Expand Down Expand Up @@ -47,6 +47,7 @@
from brad.utils.mailbox import Mailbox
from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff
from brad.utils.run_time_reservoir import RunTimeReservoir
from brad.utils.time_periods import universal_now
from brad.workload_logging.epoch_file_handler import EpochFileHandler

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -334,7 +335,7 @@ async def _run_query_impl(
if transactional_query:
connection = session.engines.get_connection(engine_to_use)
cursor = connection.cursor_sync()
start = datetime.now(tz=timezone.utc)
start = universal_now()
if query_rep.is_transaction_start():
session.set_txn_start_timestamp(start)
# Using execute_sync() is lower overhead than the async
Expand All @@ -344,9 +345,9 @@ async def _run_query_impl(
else:
connection = session.engines.get_reader_connection(engine_to_use)
cursor = connection.cursor_sync()
start = datetime.now(tz=timezone.utc)
start = universal_now()
await cursor.execute(query_rep.raw_query)
end = datetime.now(tz=timezone.utc)
end = universal_now()
except (
pyodbc.ProgrammingError,
pyodbc.Error,
Expand Down
4 changes: 2 additions & 2 deletions src/brad/front_end/session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import pytz
from datetime import datetime
from typing import Dict, Tuple, Optional

Expand All @@ -12,6 +11,7 @@
from brad.planner.estimator import Estimator
from brad.routing.policy import RoutingPolicy
from brad.data_stats.postgres_estimator import PostgresEstimator
from brad.utils.time_periods import universal_now

logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,7 @@ def __init__(
self._engines = engines
self._in_txn = False
self._closed = False
self._txn_start_timestamp = datetime.now(tz=pytz.utc)
self._txn_start_timestamp = universal_now()
self._estimator = estimator

@property
Expand Down
6 changes: 2 additions & 4 deletions src/brad/planner/debug_logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import csv
import datetime
import pathlib
import pytz
import pickle
from typing import Optional, Dict, List, Any

from brad.config.file import ConfigFile
from brad.utils.time_periods import universal_now


class BlueprintPlanningDebugLogger:
Expand Down Expand Up @@ -68,6 +67,5 @@ def log_object_if_requested(


def _get_timestamp_str() -> str:
timestamp = datetime.datetime.now()
timestamp = timestamp.astimezone(pytz.utc) # UTC for consistency.
timestamp = universal_now()
return timestamp.strftime("%Y-%m-%d_%H-%M-%S")
Loading

0 comments on commit 70167a1

Please sign in to comment.