Skip to content

Commit

Permalink
Add utilities to support the specialized scenario (#383)
Browse files Browse the repository at this point in the history
* Add utility to add tables to an existing blueprint

* Add the watchdog check

* Check in the watchdog

* Lint/type fixes

* Fixes to the alter schema script

* More fixes
  • Loading branch information
geoffxy authored Nov 22, 2023
1 parent 75ea75b commit 5e4292b
Show file tree
Hide file tree
Showing 9 changed files with 717 additions and 1 deletion.
441 changes: 441 additions & 0 deletions config/schemas/imdb_specialized.yml

Large diffs are not rendered by default.

172 changes: 172 additions & 0 deletions src/brad/admin/alter_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import asyncio
import logging

from brad.asset_manager import AssetManager
from brad.blueprint import Blueprint
from brad.blueprint.user import UserProvidedBlueprint
from brad.blueprint.sql_gen.table import TableSqlGenerator
from brad.blueprint.manager import BlueprintManager
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.front_end.engine_connections import EngineConnections
from brad.planner.data import bootstrap_blueprint

logger = logging.getLogger(__name__)


def register_admin_action(subparser) -> None:
parser = subparser.add_parser(
"alter_schema", help="Alters an existing schema on BRAD."
)
parser.add_argument(
"--config-file",
type=str,
required=True,
help="Path to BRAD's configuration file.",
)
parser.add_argument(
"--schema-name",
type=str,
required=True,
help="The name of the schema.",
)
parser.add_argument(
"--new-schema-file",
type=str,
help="Path to the database schema to bootstrap.",
)
parser.add_argument(
"--skip-persisting-blueprint",
action="store_true",
help="Set this flag to avoid persisting the blueprint. "
"Only meant to be used if you know what you are doing!",
)
parser.add_argument(
"--engines", nargs="+", default=["aurora", "redshift", "athena"]
)
parser.add_argument("--take-action", action="store_true")
parser.set_defaults(admin_action=alter_schema)


async def alter_schema_impl(args):
# 1. Load the config.
config = ConfigFile.load(args.config_file)
assets = AssetManager(config)
blueprint_mgr = BlueprintManager(config, assets, args.schema_name)
await blueprint_mgr.load()
current_blueprint = blueprint_mgr.get_blueprint()

# 2. Load and validate the user-provided schema.
user = UserProvidedBlueprint.load_from_yaml_file(args.new_schema_file)
user.validate()

# 3. Get the bootstrapped blueprint.
altered_blueprint = bootstrap_blueprint(user)

# This alter schema is primitive for now (only to support experiments). It
# only adds tables that are missing from the current blueprint.

# 4. Connect to the engines.
engines_filter = {Engine.from_str(engine_str) for engine_str in args.engines}
cxns = EngineConnections.connect_sync(
config,
blueprint_mgr.get_directory(),
schema_name=args.schema_name,
autocommit=False,
specific_engines=engines_filter,
)

# 5. Figure out which tables are new. These will be created.
existing_tables = {table.name for table in current_blueprint.tables()}
tables_to_create = {
table.name
for table in altered_blueprint.tables()
if table.name not in existing_tables
}

logger.info("Will create the following tables: %s", str(tables_to_create))
if not args.take_action:
logger.info("Set --take-action to make the schema changes.")
return

# 6. Install the required extensions.
if Engine.Aurora in engines_filter:
aurora = cxns.get_connection(Engine.Aurora)
cursor = aurora.cursor_sync()
cursor.execute_sync("CREATE EXTENSION IF NOT EXISTS vector")
cursor.commit_sync()

# 7. Set up the new tables.
sql_gen = TableSqlGenerator(config, altered_blueprint)

for table in altered_blueprint.tables():
if table.name not in tables_to_create:
continue

table_locations = altered_blueprint.get_table_locations(table.name)
for location in table_locations:
if location not in engines_filter:
logger.info(
"Skipping creating '%s' on %s because the engine was not "
"specified using --engines.",
table.name,
location,
)
continue
logger.info(
"Creating table '%s' on %s...",
table.name,
location,
)
queries, db_type = sql_gen.generate_create_table_sql(table, location)
conn = cxns.get_connection(db_type)
cursor = conn.cursor_sync()
for q in queries:
logger.debug("Running on %s: %s", str(db_type), q)
cursor.execute_sync(q)

# 8. Update the extraction progress table.
if Engine.Aurora in engines_filter:
for table_name in tables_to_create:
queries, db_type = sql_gen.generate_extraction_progress_init(table_name)
conn = cxns.get_connection(db_type)
cursor = conn.cursor_sync()
for q in queries:
logger.debug("Running on %s: %s", str(db_type), q)
cursor.execute_sync(q)

# 9. Commit the changes.
# N.B. Athena does not support the notion of committing a transaction.
if Engine.Aurora in engines_filter:
cxns.get_connection(Engine.Aurora).cursor_sync().commit_sync()
if Engine.Redshift in engines_filter:
cxns.get_connection(Engine.Redshift).cursor_sync().commit_sync()

# 10. Persist the data blueprint.
if not args.skip_persisting_blueprint:
merged_tables = current_blueprint.tables().copy()
merged_table_locations = current_blueprint.table_locations().copy()

# Append the new table metadata to the blueprint.
for table_name in tables_to_create:
merged_tables.append(altered_blueprint.get_table(table_name))
merged_table_locations[table_name] = altered_blueprint.get_table_locations(
table_name
)

merged_blueprint = Blueprint(
current_blueprint.schema_name(),
merged_tables,
merged_table_locations,
current_blueprint.aurora_provisioning(),
current_blueprint.redshift_provisioning(),
current_blueprint.get_routing_policy(),
)
blueprint_mgr.force_new_blueprint_sync(merged_blueprint, score=None)

logger.info("Done!")


# This method is called by `brad.exec.admin.main`.
def alter_schema(args):
asyncio.run(alter_schema_impl(args))
15 changes: 15 additions & 0 deletions src/brad/blueprint/sql_gen/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ def generate_extraction_progress_set_up_table_sql(self) -> Tuple[List[str], Engi

return (queries, Engine.Aurora)

def generate_extraction_progress_init(
self, table_name: str
) -> Tuple[List[str], Engine]:
queries = []
initialize_template = (
"INSERT INTO "
+ AURORA_EXTRACT_PROGRESS_TABLE_NAME
+ " (table_name, next_extract_seq, next_shadow_extract_seq) VALUES ('{table_name}', 0, 0)"
)
base_table_names = self._blueprint.base_table_names()
table_locations = self._blueprint.get_table_locations(table_name)
if Engine.Aurora in table_locations and table_name in base_table_names:
queries.append(initialize_template.format(table_name=table_name))
return (queries, Engine.Aurora)


def generate_create_index_sql(
table: Table, indexes: List[Tuple[Column, ...]]
Expand Down
17 changes: 16 additions & 1 deletion src/brad/blueprint/user.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import yaml
import pathlib
from typing import List, Set
from typing import List, Set, Dict

from brad.config.engine import Engine
from .provisioning import Provisioning
from .table import Column, Table

Expand Down Expand Up @@ -34,6 +35,7 @@ def load_from_yaml_str(cls, yaml_str: str) -> "UserProvidedBlueprint":
@classmethod
def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint":
tables: List[Table] = []
bootstrap_locations: Dict[str, Set[Engine]] = {}
for raw_table in raw_yaml["tables"]:
name = raw_table["table_name"]
columns = list(
Expand Down Expand Up @@ -65,6 +67,13 @@ def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint":
Table(name, columns, table_deps, transform, secondary_indexed_columns)
)

if "bootstrap_locations" in raw_table:
engine_set = {
Engine.from_str(engine)
for engine in raw_table["bootstrap_locations"]
}
bootstrap_locations[name] = engine_set

if "provisioning" in raw_yaml:
aurora = raw_yaml["provisioning"]["aurora"]
redshift = raw_yaml["provisioning"]["redshift"]
Expand All @@ -86,6 +95,7 @@ def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint":
tables,
aurora_provisioning,
redshift_provisioning,
bootstrap_locations,
)

def __init__(
Expand All @@ -94,11 +104,13 @@ def __init__(
tables: List[Table],
aurora_provisioning: Provisioning,
redshift_provisioning: Provisioning,
bootstrap_locations: Dict[str, Set[Engine]],
):
self._schema_name = schema_name
self._tables = tables
self._aurora_provisioning = aurora_provisioning
self._redshift_provisioning = redshift_provisioning
self._bootstrap_locations = bootstrap_locations

@property
def schema_name(self) -> str:
Expand All @@ -114,6 +126,9 @@ def aurora_provisioning(self) -> Provisioning:
def redshift_provisioning(self) -> Provisioning:
return self._redshift_provisioning

def bootstrap_locations(self) -> Dict[str, Set[Engine]]:
return self._bootstrap_locations

def validate(self) -> None:
"""
Checks the user-declared tables and ensures that there are (i) no
Expand Down
3 changes: 3 additions & 0 deletions src/brad/config/system_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ class SystemEvent(enum.Enum):
PostTransitionCompleted = "post_transition_completed"

AuroraPrimaryFailover = "aurora_primary_failover"

# If this event occurs, we must redo the experiment.
WatchdogFired = "watchdog_fired"
48 changes: 48 additions & 0 deletions src/brad/daemon/blueprint_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Optional

from brad.config.engine import Engine
from brad.blueprint import Blueprint
from brad.config.system_event import SystemEvent
from brad.daemon.system_event_logger import SystemEventLogger


class BlueprintWatchdog:
"""
Used to prevent selecting blueprints that would cause issues. If the
watchdog fires, we must redo the experiment (this is meant as a backstop).
"""

def __init__(self, event_logger: Optional[SystemEventLogger]) -> None:
self._event_logger = event_logger

def reject_blueprint(self, blueprint: Blueprint) -> bool:
# Telemetry table should not go onto Aurora.
try:
telemetry_locations = blueprint.get_table_locations("telemetry")
if Engine.Aurora in telemetry_locations:
if self._event_logger is not None:
self._event_logger.log(
SystemEvent.WatchdogFired,
f"telemetry_placed_on_aurora: {str(telemetry_locations)}",
)
return True
except ValueError:
# Indicates the table is not used in this schema - no problem.
pass

# Embedding table should not leave Aurora.
try:
embedding_locations = blueprint.get_table_locations("embeddings")
if embedding_locations != [Engine.Aurora]:
if self._event_logger is not None:
self._event_logger.log(
SystemEvent.WatchdogFired,
f"embedding_left_aurora: {str(embedding_locations)}",
)
return True
except ValueError:
# Indicates the table is not used in this schema - no problem.
pass

# All ok.
return False
8 changes: 8 additions & 0 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from brad.daemon.monitor import Monitor
from brad.daemon.system_event_logger import SystemEventLogger
from brad.daemon.transition_orchestrator import TransitionOrchestrator
from brad.daemon.blueprint_watchdog import BlueprintWatchdog
from brad.data_stats.estimator import Estimator
from brad.data_stats.postgres_estimator import PostgresEstimator
from brad.data_sync.execution.executor import DataSyncExecutor
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(
self._transition_task: Optional[asyncio.Task[None]] = None

self._system_event_logger = SystemEventLogger.create_if_requested(self._config)
self._watchdog = BlueprintWatchdog(self._system_event_logger)

# This is used to hold references to internal command tasks we create.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
Expand Down Expand Up @@ -395,6 +397,12 @@ async def _handle_new_blueprint(
self._system_event_logger.log(SystemEvent.NewBlueprintSkipped)
return

if self._watchdog.reject_blueprint(blueprint):
logger.warning(
"Blueprint watchdog fired! Must re-run this blueprint planning pass."
)
return

if PERSIST_BLUEPRINT_VAR in os.environ:
logger.info(
"Force-persisting the new blueprint. Run a manual transition and "
Expand Down
2 changes: 2 additions & 0 deletions src/brad/exec/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import brad.admin.restore_blueprint as restore_blueprint
import brad.admin.replay_planner as replay_planner
import brad.admin.clean_dataset as clean_dataset
import brad.admin.alter_schema as alter_schema

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +42,7 @@ def register_command(subparsers) -> None:
restore_blueprint.register_admin_action(admin_subparsers)
replay_planner.register_admin_action(admin_subparsers)
clean_dataset.register_admin_action(admin_subparsers)
alter_schema.register_admin_action(admin_subparsers)
parser.set_defaults(func=main)


Expand Down
12 changes: 12 additions & 0 deletions src/brad/planner/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Dict, List

from brad.blueprint import Blueprint
Expand All @@ -7,6 +8,8 @@
from brad.routing.abstract_policy import FullRoutingPolicy
from brad.routing.round_robin import RoundRobin

logger = logging.getLogger(__name__)


def bootstrap_blueprint(user: UserProvidedBlueprint) -> Blueprint:
"""
Expand Down Expand Up @@ -85,6 +88,15 @@ def process_table(table: Table, expect_standalone_base_table: bool):
# Sanity check: Each table should be present on at least one engine.
assert all(map(lambda locs: len(locs) > 0, table_locations.values()))

# Overwrite the placements where requested.
bootstrap_locations = user.bootstrap_locations()
for table_name in tables_by_name.keys():
if table_name not in bootstrap_locations:
continue
new_locations = list(bootstrap_locations[table_name])
logger.info("Setting the locations of %s to %s", table_name, str(new_locations))
table_locations[table_name] = new_locations

# We pass through the provisioning hints provided by the user.
return Blueprint(
user.schema_name,
Expand Down

0 comments on commit 5e4292b

Please sign in to comment.