diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py
index 3394e5c1..6f065ecb 100644
--- a/src/brad/daemon/daemon.py
+++ b/src/brad/daemon/daemon.py
@@ -131,7 +131,10 @@ def __init__(
if self._start_ui and UiManager.is_supported():
self._ui_mgr: Optional[UiManager] = UiManager.create(
- self._config, self._monitor, self._blueprint_mgr
+ self._config,
+ self._monitor,
+ self._blueprint_mgr,
+ self._system_event_logger,
)
else:
self._ui_mgr = None
diff --git a/src/brad/daemon/system_event_logger.py b/src/brad/daemon/system_event_logger.py
index 050b967a..378645d5 100644
--- a/src/brad/daemon/system_event_logger.py
+++ b/src/brad/daemon/system_event_logger.py
@@ -1,11 +1,15 @@
import csv
import pathlib
-from typing import Optional
+from datetime import datetime
+from typing import Optional, Deque, Tuple, List
+from collections import deque
from brad.config.file import ConfigFile
from brad.config.system_event import SystemEvent
from brad.utils.time_periods import universal_now
+SystemEventRecord = Tuple[datetime, SystemEvent, str]
+
class SystemEventLogger:
"""
@@ -26,6 +30,8 @@ def __init__(self, log_path: pathlib.Path) -> None:
self._file = open(self._log_path, "a", encoding="UTF-8")
self._csv_writer = csv.writer(self._file)
self._logged_header = False
+ self._memlog: Deque[Tuple[datetime, SystemEvent, str]] = deque()
+ self._memlog_maxlen = 100
def log(self, event: SystemEvent, extra_details: Optional[str] = None) -> None:
if not self._logged_header:
@@ -33,11 +39,27 @@ def log(self, event: SystemEvent, extra_details: Optional[str] = None) -> None:
self._logged_header = True
now = universal_now()
+ row = (
+ now,
+ event,
+ extra_details if extra_details is not None else "",
+ )
+
+ if len(self._memlog) == self._memlog_maxlen:
+ self._memlog.popleft()
+ self._memlog.append(row)
+
self._csv_writer.writerow(
[
- now.strftime("%Y-%m-%d %H:%M:%S"),
- event.value,
- extra_details if extra_details is not None else "",
+ row[0].strftime("%Y-%m-%d %H:%M:%S"),
+ row[1].value,
+ row[2],
]
)
self._file.flush()
+
+ def current_memlog(self) -> List[SystemEventRecord]:
+ """
+ Used for retrieving the system event log.
+ """
+ return list(self._memlog)
diff --git a/src/brad/ui/manager.py b/src/brad/ui/manager.py
index 6c96e8ee..72f7d58f 100644
--- a/src/brad/ui/manager.py
+++ b/src/brad/ui/manager.py
@@ -1,8 +1,9 @@
-from typing import Any
+from typing import Any, Optional
from brad.config.file import ConfigFile
from brad.daemon.monitor import Monitor
from brad.blueprint.manager import BlueprintManager
+from brad.daemon.system_event_logger import SystemEventLogger
class UiManager:
@@ -27,11 +28,15 @@ def is_supported() -> bool:
@classmethod
def create(
- cls, config: ConfigFile, monitor: Monitor, blueprint_mgr: BlueprintManager
+ cls,
+ config: ConfigFile,
+ monitor: Monitor,
+ blueprint_mgr: BlueprintManager,
+ system_event_logger: Optional[SystemEventLogger],
) -> "UiManager":
from brad.ui.manager_impl import UiManagerImpl
- return cls(UiManagerImpl(config, monitor, blueprint_mgr))
+ return cls(UiManagerImpl(config, monitor, blueprint_mgr, system_event_logger))
# We hide away the implementation details to allow external code to import
# `UiManager` without worrying about import errors (e.g., because the
diff --git a/src/brad/ui/manager_impl.py b/src/brad/ui/manager_impl.py
index e160a505..9ac5a7d3 100644
--- a/src/brad/ui/manager_impl.py
+++ b/src/brad/ui/manager_impl.py
@@ -4,26 +4,40 @@
import importlib.resources as pkg_resources
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
-from typing import Optional
+from typing import Optional, List
import brad.ui.static as brad_app
from brad.blueprint.manager import BlueprintManager
from brad.config.file import ConfigFile
from brad.daemon.monitor import Monitor
from brad.ui.uvicorn_server import PatchedUvicornServer
-from brad.ui.models import MetricsData, TimestampedMetrics, DisplayableBlueprint
+from brad.ui.models import (
+ MetricsData,
+ TimestampedMetrics,
+ DisplayableBlueprint,
+ SystemState,
+ DisplayableVirtualEngine,
+ VirtualInfrastructure,
+ DisplayableTable,
+)
from brad.daemon.front_end_metrics import FrontEndMetric
+from brad.daemon.system_event_logger import SystemEventLogger, SystemEventRecord
logger = logging.getLogger(__name__)
class UiManagerImpl:
def __init__(
- self, config: ConfigFile, monitor: Monitor, blueprint_mgr: BlueprintManager
+ self,
+ config: ConfigFile,
+ monitor: Monitor,
+ blueprint_mgr: BlueprintManager,
+ system_event_logger: Optional[SystemEventLogger],
) -> None:
self.config = config
self.monitor = monitor
self.blueprint_mgr = blueprint_mgr
+ self.system_event_logger = system_event_logger
async def serve_forever(self) -> None:
global manager # pylint: disable=global-statement
@@ -72,10 +86,62 @@ def get_metrics(num_values: int = 3) -> MetricsData:
@app.get("/api/1/system_state")
-def get_system_state() -> DisplayableBlueprint:
+def get_system_state() -> SystemState:
assert manager is not None
blueprint = manager.blueprint_mgr.get_blueprint()
- return DisplayableBlueprint.from_blueprint(blueprint)
+ dbp = DisplayableBlueprint.from_blueprint(blueprint)
+
+ # TODO: Hardcoded virtualized infrasturcture and writers.
+ txn_tables = ["theatres", "showings", "ticket_orders", "movie_info", "aka_title"]
+ txn_only = ["theatres", "showings", "ticket_orders"]
+ vdbe1 = DisplayableVirtualEngine(
+ index=1,
+ freshness="Serializable",
+ dialect="PostgreSQL SQL",
+ peak_latency_s=0.030,
+ tables=[
+ DisplayableTable(name=name, is_writer=True, mapped_to=["Aurora"])
+ for name in [
+ "theatres",
+ "showings",
+ "ticket_orders",
+ "movie_info",
+ "aka_title",
+ ]
+ ],
+ )
+ vdbe1.tables.sort(key=lambda t: t.name)
+ vdbe2 = DisplayableVirtualEngine(
+ index=2,
+ freshness="≤ 10 minutes stale",
+ dialect="PostgreSQL SQL",
+ peak_latency_s=30.0,
+ tables=[
+ DisplayableTable(name=table.name, is_writer=False, mapped_to=[])
+ for table in blueprint.tables()
+ if table.name not in txn_only
+ ],
+ )
+ vdbe2.tables.sort(key=lambda t: t.name)
+ for engine in dbp.engines:
+ if engine.name != "Aurora":
+ continue
+ for t in engine.tables:
+ if t.name in txn_tables:
+ t.is_writer = True
+ virtual_infra = VirtualInfrastructure(engines=[vdbe1, vdbe2])
+
+ return SystemState(virtual_infra=virtual_infra, blueprint=dbp)
+
+
+@app.get("/api/1/system_events")
+def get_system_events() -> List[SystemEventRecord]:
+ assert manager is not None
+ return (
+ manager.system_event_logger.current_memlog()
+ if manager.system_event_logger is not None
+ else []
+ )
# Serve the static pages.
diff --git a/src/brad/ui/models.py b/src/brad/ui/models.py
index bd80f7e6..d5293ac8 100644
--- a/src/brad/ui/models.py
+++ b/src/brad/ui/models.py
@@ -14,10 +14,16 @@ class MetricsData(BaseModel):
named_metrics: Dict[str, TimestampedMetrics]
+class DisplayableTable(BaseModel):
+ name: str
+ is_writer: bool = False
+ mapped_to: List[str] = []
+
+
class DisplayablePhysicalEngine(BaseModel):
name: str
provisioning: Optional[str]
- tables: List[str]
+ tables: List[DisplayableTable]
class DisplayableBlueprint(BaseModel):
@@ -29,11 +35,12 @@ def from_blueprint(cls, blueprint: Blueprint) -> "DisplayableBlueprint":
aurora = blueprint.aurora_provisioning()
if aurora.num_nodes() > 0:
aurora_tables = [
- table.name
+ # TODO: Hardcoded Aurora writer. This will change down the road.
+ DisplayableTable(name=table.name, is_writer=False)
for table, locations in blueprint.tables_with_locations()
if Engine.Aurora in locations
]
- aurora_tables.sort()
+ aurora_tables.sort(key=lambda t: t.name)
engines.append(
DisplayablePhysicalEngine(
name="Aurora",
@@ -45,11 +52,12 @@ def from_blueprint(cls, blueprint: Blueprint) -> "DisplayableBlueprint":
redshift = blueprint.redshift_provisioning()
if redshift.num_nodes() > 0:
redshift_tables = [
- table.name
+ # TODO: Hardcoded Redshift writer. This will change down the road.
+ DisplayableTable(name=table.name, is_writer=False)
for table, locations in blueprint.tables_with_locations()
if Engine.Redshift in locations
]
- redshift_tables.sort()
+ redshift_tables.sort(key=lambda t: t.name)
engines.append(
DisplayablePhysicalEngine(
name="Redshift",
@@ -59,11 +67,12 @@ def from_blueprint(cls, blueprint: Blueprint) -> "DisplayableBlueprint":
)
athena_tables = [
- table.name
+ # TODO: Hardcoded Athena writer. This will change down the road.
+ DisplayableTable(name=table.name, is_writer=False)
for table, locations in blueprint.tables_with_locations()
if Engine.Athena in locations
]
- athena_tables.sort()
+ athena_tables.sort(key=lambda t: t.name)
if len(athena_tables) > 0:
engines.append(
DisplayablePhysicalEngine(
@@ -72,3 +81,20 @@ def from_blueprint(cls, blueprint: Blueprint) -> "DisplayableBlueprint":
)
return cls(engines=engines)
+
+
+class DisplayableVirtualEngine(BaseModel):
+ index: int
+ freshness: str
+ dialect: str
+ peak_latency_s: Optional[float] = None
+ tables: List[DisplayableTable] = []
+
+
+class VirtualInfrastructure(BaseModel):
+ engines: List[DisplayableVirtualEngine]
+
+
+class SystemState(BaseModel):
+ virtual_infra: VirtualInfrastructure
+ blueprint: DisplayableBlueprint
diff --git a/ui/src/App.css b/ui/src/App.css
index 7fcfdc62..ddd05d04 100644
--- a/ui/src/App.css
+++ b/ui/src/App.css
@@ -27,7 +27,7 @@ body {
}
.body-container {
- max-width: 2000px;
+ max-width: 2100px;
margin-top: 120px;
flex-grow: 1;
@@ -37,7 +37,7 @@ body {
}
.column {
- flex-basis: calc(50% - 15px);
+ flex-basis: 0;
padding: 0 20px;
display: flex;
diff --git a/ui/src/App.jsx b/ui/src/App.jsx
index ddcc4ec3..aafd011c 100644
--- a/ui/src/App.jsx
+++ b/ui/src/App.jsx
@@ -1,121 +1,25 @@
-import axios from "axios";
import { useState, useEffect } from "react";
import Header from "./components/Header";
import VirtualInfraView from "./components/VirtualInfraView";
import BlueprintView from "./components/BlueprintView";
import PerfView from "./components/PerfView";
+import { fetchSystemState } from "./api";
import "./App.css";
const REFRESH_INTERVAL_MS = 30 * 1000;
-const API_PREFIX = "/api/1";
-
-function parseMetrics({ named_metrics }) {
- const result = {};
- Object.entries(named_metrics).forEach(([metricName, metricValues]) => {
- const parsedTs = metricValues.timestamps.map(
- (timestamp) => new Date(timestamp),
- );
- result[metricName] = {
- timestamps: parsedTs,
- values: metricValues.values,
- };
- });
- return result;
-}
-
-function mergeMetrics(existingMetrics, fetchedMetrics) {
- // General merge function, generated by ChatGPT.
- const existingTimestamps = existingMetrics.timestamps;
- const existingValues = existingMetrics.values;
- const newTimestamps = fetchedMetrics.timestamps;
- const newValues = fetchedMetrics.values;
-
- const mergedTimestamps = [];
- const mergedValues = [];
-
- let existingIndex = 0;
- let newIndex = 0;
-
- while (
- existingIndex < existingTimestamps.length &&
- newIndex < newTimestamps.length
- ) {
- const existingTimestamp = existingTimestamps[existingIndex];
- const newTimestamp = newTimestamps[newIndex];
-
- if (existingTimestamp < newTimestamp) {
- mergedTimestamps.push(existingTimestamp);
- mergedValues.push(existingValues[existingIndex]);
- existingIndex++;
- } else if (existingTimestamp > newTimestamp) {
- mergedTimestamps.push(newTimestamp);
- mergedValues.push(newValues[newIndex]);
- newIndex++;
- } else {
- // Timestamps are equal. Ignore the fetched metric (it _should_ be equal).
- mergedTimestamps.push(existingTimestamp);
- mergedValues.push(existingValues[existingIndex]);
- existingIndex++;
- newIndex++;
- }
- }
-
- // Add remaining timestamps and values from the existing object
- while (existingIndex < existingTimestamps.length) {
- mergedTimestamps.push(existingTimestamps[existingIndex]);
- mergedValues.push(existingValues[existingIndex]);
- existingIndex++;
- }
-
- // Add remaining timestamps and values from the new object
- while (newIndex < newTimestamps.length) {
- mergedTimestamps.push(newTimestamps[newIndex]);
- mergedValues.push(newValues[newIndex]);
- newIndex++;
- }
-
- return {
- timestamps: mergedTimestamps,
- values: mergedValues,
- };
-}
-
-function mergeAllMetrics(currentMetrics, fetchedMetrics) {
- let addedNew = false;
- const mergedResults = {};
- // TODO: Handle missing keys on either side. This will remove metrics if
- // `fetchedMetrics` does not have the relevant metric.
- Object.entries(fetchedMetrics).forEach(([metricName, metricValues]) => {
- let current = null;
- if (currentMetrics.hasOwnProperty(metricName)) {
- current = currentMetrics[metricName];
- } else {
- current = {
- timestamps: [],
- values: [],
- };
- }
- const fetched = metricValues;
- const merged = mergeMetrics(current, fetched);
- mergedResults[metricName] = merged;
- if (current.timestamps.length < merged.timestamps.length) {
- addedNew = true;
- }
- });
- return [mergedResults, addedNew];
-}
function App() {
- const [metricsData, setMetricsData] = useState({});
- const [systemState, setSystemState] = useState({});
+ const [systemState, setSystemState] = useState({
+ blueprint: null,
+ virtual_infra: null,
+ });
// Fetch updated system state periodically.
useEffect(() => {
let timeoutId = null;
const refreshData = async () => {
- const resultState = await axios.get(`${API_PREFIX}/system_state`);
- const newSystemState = resultState.data;
+ const newSystemState = await fetchSystemState();
// TODO: Not the best way to check for equality.
if (JSON.stringify(systemState) !== JSON.stringify(newSystemState)) {
setSystemState(newSystemState);
@@ -133,50 +37,18 @@ function App() {
};
}, [systemState]);
- // Fetch updated metrics.
- useEffect(() => {
- let timeoutId = null;
- const refreshData = async () => {
- const resultMetrics = await axios.get(`${API_PREFIX}/metrics`);
- const rawMetrics = resultMetrics.data;
- const fetchedMetrics = parseMetrics(rawMetrics);
- const [mergedMetrics, addedNewMetrics] = mergeAllMetrics(
- metricsData,
- fetchedMetrics,
- );
- if (addedNewMetrics) {
- setMetricsData(mergedMetrics);
- }
- timeoutId = setTimeout(refreshData, REFRESH_INTERVAL_MS);
- };
-
- // Run first fetch immediately.
- timeoutId = setTimeout(refreshData, 0);
- return () => {
- if (timeoutId === null) {
- return;
- }
- clearTimeout(timeoutId);
- };
- }, [metricsData]);
-
return (
<>