Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract per node Redshift CPU utilization #419

Merged
merged 4 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 69 additions & 12 deletions src/brad/daemon/cloudwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
import logging
from typing import List, Optional, Tuple
from typing import List, Optional, Dict, Tuple
from datetime import datetime, timedelta

from .metrics_def import MetricDef
Expand All @@ -13,6 +13,9 @@

logger = logging.getLogger(__name__)

# We only collect metrics for up to 16 Redshift nodes.
MAX_REDSHIFT_NODES = 16


class CloudWatchClient:
def __init__(
Expand All @@ -24,6 +27,7 @@ def __init__(
) -> None:
self._engine = engine
self._dimensions = []
self._is_for_redshift = False

assert (
cluster_identifier is not None or instance_identifier is not None
Expand Down Expand Up @@ -59,6 +63,7 @@ def __init__(
"Value": cluster_identifier,
}
)
self._is_for_redshift = True

if config is not None:
self._client = boto3.client(
Expand All @@ -75,7 +80,7 @@ def metric_names(metric_defs: List[MetricDef]) -> List[str]:

def fetch_metrics(
self,
metrics_list: List[Tuple[str, str]],
metrics_list: List[MetricDef],
period: timedelta,
num_prev_points: int,
) -> pd.DataFrame:
Expand All @@ -98,17 +103,30 @@ def fetch_metrics(

def fetch_batch(metrics_list):
queries = []
for metric, stat in metrics_list:
for metric, stat, dimension_info in metrics_list:
dimensions = self._dimensions.copy()

# CloudWatch expects this ID to start with a lowercase
# character.
if dimension_info is None:
id_to_use = f"a{metric}_{stat}"
else:
id_to_use = f"a{metric}_{stat}_{dimension_info['InternalValue']}"
dimensions.append(
{
"Name": dimension_info["CloudwatchName"],
"Value": dimension_info["CloudwatchValue"],
}
)

queries.append(
{
# CloudWatch expects this ID to start with a lowercase
# character.
"Id": f"a{metric}_{stat}",
"Id": id_to_use,
"MetricStat": {
"Metric": {
"Namespace": self._namespace,
"MetricName": metric,
"Dimensions": self._dimensions.copy(),
"Dimensions": dimensions,
},
"Period": int(period.total_seconds()),
"Stat": stat,
Expand All @@ -128,18 +146,25 @@ def fetch_batch(metrics_list):
resp_dict = {}
for metric_data in response["MetricDataResults"]:
metric_id = metric_data["Id"][1:]
metric_timestamps = metric_data["Timestamps"]
metric_timestamps = pd.to_datetime(
metric_data["Timestamps"], utc=True, unit="ns"
)
metric_values = metric_data["Values"]
resp_dict[metric_id] = pd.Series(
metric_values, index=metric_timestamps, dtype=np.float64
)

df = pd.DataFrame(resp_dict)
df = df.sort_index()
df.index = pd.to_datetime(df.index, utc=True, unit="ns")

for metric_def in metrics_list:
metric_name = "{}_{}".format(*metric_def)
metric, stat, dimension_info = metric_def
if dimension_info is None:
metric_name = "{}_{}".format(*metric_def)
else:
metric_name = "{}_{}_{}".format(
metric, stat, dimension_info["InternalValue"]
)
if metric_name in df.columns:
continue
# Missing metric value.
Expand All @@ -150,8 +175,40 @@ def fetch_batch(metrics_list):

results = []
batch_size = 10
for i in range(0, len(metrics_list), batch_size):
df = fetch_batch(metrics_list[i : i + batch_size])
metrics_list_internal: List[Tuple[str, str, Optional[Dict[str, str]]]] = []
for metric, stat in metrics_list:
metrics_list_internal.append((metric, stat, None))

# We fetch additional per-node metrics when working with Redshift.
if self._is_for_redshift and metric == "CPUUtilization":
for dimension in _REDSHIFT_NODE_DIMENSIONS:
metrics_list_internal.append((metric, stat, dimension))

for i in range(0, len(metrics_list_internal), batch_size):
df = fetch_batch(metrics_list_internal[i : i + batch_size])
results.append(df)

return pd.concat(results, axis=1)


# Ideally these should be configurable by the client's user. To avoid changing
# our abstraction in the short term, we use this now and will refactor later.
_REDSHIFT_NODE_DIMENSIONS = [
{
"CloudwatchName": "NodeID",
"CloudwatchValue": "Leader",
"InternalValue": "Leader",
},
{
"CloudwatchName": "NodeID",
"CloudwatchValue": "Shared",
"InternalValue": "Shared",
},
] + [
{
"CloudwatchName": "NodeID",
"CloudwatchValue": f"Compute-{node_num}",
"InternalValue": f"Compute{node_num}",
}
for node_num in range(MAX_REDSHIFT_NODES)
]
48 changes: 46 additions & 2 deletions src/brad/daemon/redshift_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import pandas as pd
import json
from datetime import timedelta
from typing import List, Optional
from typing import List, Optional, Tuple
from importlib.resources import files, as_file

import brad.daemon as daemon
from .metrics_def import MetricDef
from .metrics_source import MetricsSourceWithForecasting
from .metrics_logger import MetricsLogger
from .cloudwatch import CloudWatchClient
from .cloudwatch import CloudWatchClient, MAX_REDSHIFT_NODES
from brad.blueprint.manager import BlueprintManager
from brad.blueprint.provisioning import Provisioning
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.utils.time_periods import impute_old_missing_metrics, universal_now
Expand Down Expand Up @@ -77,6 +78,24 @@ async def fetch_latest(self) -> None:
now = universal_now()
cutoff_ts = now - self.METRICS_DELAY
new_metrics = impute_old_missing_metrics(new_metrics, cutoff_ts, value=0.0)

# Some dimension metrics are not relevant for the current blueprint.
# Note - one potential issue is zeroing out delayed metrics associated
# with an old blueprint. As long as we have a sufficient delay before
# allowing another blueprint transition, we should be OK.
blueprint = self._blueprint_mgr.get_blueprint()
_, redshift_dimensions_to_discard = relevant_redshift_node_dimensions(
blueprint.redshift_provisioning()
)
to_discard = []
for metric, stat in self._metric_defs:
if metric != "CPUUtilization":
continue
for dimension in redshift_dimensions_to_discard:
to_discard.append(f"{metric}_{stat}_{dimension}")
new_metrics[to_discard] = new_metrics[to_discard].fillna(0.0)

# Discard any remaining rows that contain NaNs.
new_metrics = new_metrics.dropna()

self._values = self._get_updated_metrics(new_metrics)
Expand Down Expand Up @@ -112,3 +131,28 @@ def _fetch_cw_metrics(self, num_prev_points: int) -> pd.DataFrame:
period=self._config.epoch_length,
num_prev_points=num_prev_points,
)


def relevant_redshift_node_dimensions(
redshift: Provisioning,
) -> Tuple[List[str], List[str]]:
"""
Returns the metrics to expect and to discard for the current blueprint.
"""
dim_to_expect = []
dim_to_discard = []
num_nodes = redshift.num_nodes()
if num_nodes == 1:
dim_to_expect.append("Shared")
dim_to_discard.append("Leader")
for idx in range(MAX_REDSHIFT_NODES):
dim_to_discard.append(f"Compute{idx}")
else:
dim_to_expect.append("Leader")
dim_to_discard.append("Shared")
for idx in range(MAX_REDSHIFT_NODES):
if idx < num_nodes:
dim_to_expect.append(f"Compute{idx}")
else:
dim_to_discard.append(f"Compute{idx}")
return dim_to_expect, dim_to_discard
Loading