diff --git a/config/schemas/imdb_specialized.yml b/config/schemas/imdb_specialized.yml new file mode 100644 index 00000000..81d3ece7 --- /dev/null +++ b/config/schemas/imdb_specialized.yml @@ -0,0 +1,441 @@ +schema_name: imdb_specialized + +tables: +- table_name: homes + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: location_x + data_type: DECIMAL(10) + - name: location_y + data_type: DECIMAL(10) + +- table_name: embeddings + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: movie_id + data_type: BIGINT + - name: embedding + data_type: vector(1536) + bootstrap_locations: + - aurora + +- table_name: telemetry + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: ip + data_type: TEXT + - name: timestamp + data_type: TIMESTAMP + - name: movie_id + data_type: BIGINT + - name: event_id + data_type: INT + bootstrap_locations: + - athena + - redshift + +- table_name: theatres + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: name + data_type: TEXT + - name: location_x + data_type: DECIMAL(10) + - name: location_y + data_type: DECIMAL(10) + indexes: + - name + +- table_name: showings + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: theatre_id + data_type: BIGINT + - name: movie_id + data_type: BIGINT + - name: date_time + data_type: TIMESTAMP + - name: total_capacity + data_type: INT + - name: seats_left + data_type: INT + indexes: + - theatre_id + - movie_id + - theatre_id, date_time + +- table_name: ticket_orders + columns: + - name: id + data_type: SERIAL + primary_key: true + - name: showing_id + data_type: BIGINT + - name: quantity + data_type: INT + - name: contact_name + data_type: TEXT + - name: location_x + data_type: DECIMAL(10) + - name: location_y + data_type: DECIMAL(10) + indexes: + - showing_id + +# The tables below are from the original IMDB dataset. + +- table_name: aka_name + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: person_id + - data_type: character varying + name: name + - data_type: character varying(3) + name: imdb_index + - data_type: character varying(11) + name: name_pcode_cf + - data_type: character varying(11) + name: name_pcode_nf + - data_type: character varying(11) + name: surname_pcode + - data_type: character varying(65) + name: md5sum + indexes: + - person_id + # NOTE: Primary key columns are automatically indexed. These indexes are + # secondary indexes. Note that all indexes are ordered. To create composite + # key indexes, use a comma when listing columns (e.g., "person_id, name"). + +- table_name: aka_title + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: character varying + name: title + - data_type: character varying(4) + name: imdb_index + - data_type: BIGINT + name: kind_id + - data_type: BIGINT + name: production_year + - data_type: character varying(5) + name: phonetic_code + - data_type: BIGINT + name: episode_of_id + - data_type: BIGINT + name: season_nr + - data_type: BIGINT + name: episode_nr + - data_type: character varying(72) + name: note + - data_type: character varying(32) + name: md5sum + indexes: + - movie_id + - kind_id + +- table_name: cast_info + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: person_id + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: person_role_id + - data_type: character varying + name: note + - data_type: BIGINT + name: nr_order + - data_type: BIGINT + name: role_id + indexes: + - person_id + - movie_id + - person_role_id + +- table_name: char_name + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying + name: name + - data_type: character varying(2) + name: imdb_index + - data_type: BIGINT + name: imdb_id + - data_type: character varying(5) + name: name_pcode_nf + - data_type: character varying(5) + name: surname_pcode + - data_type: character varying(32) + name: md5sum + indexes: + - imdb_id + +- table_name: comp_cast_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(32) + name: kind + +- table_name: company_name + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying + name: name + - data_type: character varying(6) + name: country_code + - data_type: BIGINT + name: imdb_id + - data_type: character varying(5) + name: name_pcode_nf + - data_type: character varying(5) + name: name_pcode_sf + - data_type: character varying(32) + name: md5sum + indexes: + - imdb_id + +- table_name: company_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(32) + name: kind + +- table_name: complete_cast + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: subject_id + - data_type: BIGINT + name: status_id + indexes: + - movie_id + - subject_id + - status_id + +- table_name: info_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(32) + name: info + +- table_name: keyword + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying + name: keyword + - data_type: character varying(5) + name: phonetic_code + +- table_name: kind_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(15) + name: kind + +- table_name: link_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(32) + name: link + +- table_name: movie_companies + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: company_id + - data_type: BIGINT + name: company_type_id + - data_type: character varying + name: note + indexes: + - movie_id + - company_id + - company_type_id + +- table_name: movie_info_idx + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: info_type_id + - data_type: character varying + name: info + - data_type: character varying(1) + name: note + indexes: + - movie_id + - info_type_id + +- table_name: movie_keyword + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: keyword_id + indexes: + - movie_id + - keyword_id + +- table_name: movie_link + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: linked_movie_id + - data_type: BIGINT + name: link_type_id + indexes: + - movie_id + - linked_movie_id + - link_type_id + +- table_name: name + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying + name: name + - data_type: character varying(9) + name: imdb_index + - data_type: BIGINT + name: imdb_id + - data_type: character varying(1) + name: gender + - data_type: character varying(5) + name: name_pcode_cf + - data_type: character varying(5) + name: name_pcode_nf + - data_type: character varying(5) + name: surname_pcode + - data_type: character varying(32) + name: md5sum + indexes: + - imdb_id + +- table_name: role_type + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying(32) + name: role + +- table_name: title + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: character varying + name: title + - data_type: character varying(5) + name: imdb_index + - data_type: BIGINT + name: kind_id + - data_type: BIGINT + name: production_year + - data_type: BIGINT + name: imdb_id + - data_type: character varying(5) + name: phonetic_code + - data_type: BIGINT + name: episode_of_id + - data_type: BIGINT + name: season_nr + - data_type: BIGINT + name: episode_nr + - data_type: character varying(49) + name: series_years + - data_type: character varying(32) + name: md5sum + indexes: + - kind_id + - imdb_id + - episode_of_id + +- table_name: movie_info + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: movie_id + - data_type: BIGINT + name: info_type_id + - data_type: character varying + name: info + - data_type: character varying + name: note + indexes: + - movie_id + - info_type_id + +- table_name: person_info + columns: + - data_type: SERIAL + name: id + primary_key: true + - data_type: BIGINT + name: person_id + - data_type: BIGINT + name: info_type_id + - data_type: character varying + name: info + - data_type: character varying + name: note + indexes: + - person_id + - info_type_id diff --git a/src/brad/admin/alter_schema.py b/src/brad/admin/alter_schema.py new file mode 100644 index 00000000..8119ca71 --- /dev/null +++ b/src/brad/admin/alter_schema.py @@ -0,0 +1,172 @@ +import asyncio +import logging + +from brad.asset_manager import AssetManager +from brad.blueprint import Blueprint +from brad.blueprint.user import UserProvidedBlueprint +from brad.blueprint.sql_gen.table import TableSqlGenerator +from brad.blueprint.manager import BlueprintManager +from brad.config.engine import Engine +from brad.config.file import ConfigFile +from brad.front_end.engine_connections import EngineConnections +from brad.planner.data import bootstrap_blueprint + +logger = logging.getLogger(__name__) + + +def register_admin_action(subparser) -> None: + parser = subparser.add_parser( + "alter_schema", help="Alters an existing schema on BRAD." + ) + parser.add_argument( + "--config-file", + type=str, + required=True, + help="Path to BRAD's configuration file.", + ) + parser.add_argument( + "--schema-name", + type=str, + required=True, + help="The name of the schema.", + ) + parser.add_argument( + "--new-schema-file", + type=str, + help="Path to the database schema to bootstrap.", + ) + parser.add_argument( + "--skip-persisting-blueprint", + action="store_true", + help="Set this flag to avoid persisting the blueprint. " + "Only meant to be used if you know what you are doing!", + ) + parser.add_argument( + "--engines", nargs="+", default=["aurora", "redshift", "athena"] + ) + parser.add_argument("--take-action", action="store_true") + parser.set_defaults(admin_action=alter_schema) + + +async def alter_schema_impl(args): + # 1. Load the config. + config = ConfigFile.load(args.config_file) + assets = AssetManager(config) + blueprint_mgr = BlueprintManager(config, assets, args.schema_name) + await blueprint_mgr.load() + current_blueprint = blueprint_mgr.get_blueprint() + + # 2. Load and validate the user-provided schema. + user = UserProvidedBlueprint.load_from_yaml_file(args.new_schema_file) + user.validate() + + # 3. Get the bootstrapped blueprint. + altered_blueprint = bootstrap_blueprint(user) + + # This alter schema is primitive for now (only to support experiments). It + # only adds tables that are missing from the current blueprint. + + # 4. Connect to the engines. + engines_filter = {Engine.from_str(engine_str) for engine_str in args.engines} + cxns = EngineConnections.connect_sync( + config, + blueprint_mgr.get_directory(), + schema_name=args.schema_name, + autocommit=False, + specific_engines=engines_filter, + ) + + # 5. Figure out which tables are new. These will be created. + existing_tables = {table.name for table in current_blueprint.tables()} + tables_to_create = { + table.name + for table in altered_blueprint.tables() + if table.name not in existing_tables + } + + logger.info("Will create the following tables: %s", str(tables_to_create)) + if not args.take_action: + logger.info("Set --take-action to make the schema changes.") + return + + # 6. Install the required extensions. + if Engine.Aurora in engines_filter: + aurora = cxns.get_connection(Engine.Aurora) + cursor = aurora.cursor_sync() + cursor.execute_sync("CREATE EXTENSION IF NOT EXISTS vector") + cursor.commit_sync() + + # 7. Set up the new tables. + sql_gen = TableSqlGenerator(config, altered_blueprint) + + for table in altered_blueprint.tables(): + if table.name not in tables_to_create: + continue + + table_locations = altered_blueprint.get_table_locations(table.name) + for location in table_locations: + if location not in engines_filter: + logger.info( + "Skipping creating '%s' on %s because the engine was not " + "specified using --engines.", + table.name, + location, + ) + continue + logger.info( + "Creating table '%s' on %s...", + table.name, + location, + ) + queries, db_type = sql_gen.generate_create_table_sql(table, location) + conn = cxns.get_connection(db_type) + cursor = conn.cursor_sync() + for q in queries: + logger.debug("Running on %s: %s", str(db_type), q) + cursor.execute_sync(q) + + # 8. Update the extraction progress table. + if Engine.Aurora in engines_filter: + for table_name in tables_to_create: + queries, db_type = sql_gen.generate_extraction_progress_init(table_name) + conn = cxns.get_connection(db_type) + cursor = conn.cursor_sync() + for q in queries: + logger.debug("Running on %s: %s", str(db_type), q) + cursor.execute_sync(q) + + # 9. Commit the changes. + # N.B. Athena does not support the notion of committing a transaction. + if Engine.Aurora in engines_filter: + cxns.get_connection(Engine.Aurora).cursor_sync().commit_sync() + if Engine.Redshift in engines_filter: + cxns.get_connection(Engine.Redshift).cursor_sync().commit_sync() + + # 10. Persist the data blueprint. + if not args.skip_persisting_blueprint: + merged_tables = current_blueprint.tables().copy() + merged_table_locations = current_blueprint.table_locations().copy() + + # Append the new table metadata to the blueprint. + for table_name in tables_to_create: + merged_tables.append(altered_blueprint.get_table(table_name)) + merged_table_locations[table_name] = altered_blueprint.get_table_locations( + table_name + ) + + merged_blueprint = Blueprint( + current_blueprint.schema_name(), + merged_tables, + merged_table_locations, + current_blueprint.aurora_provisioning(), + current_blueprint.redshift_provisioning(), + current_blueprint.get_routing_policy(), + ) + blueprint_mgr.force_new_blueprint_sync(merged_blueprint, score=None) + + logger.info("Done!") + + +# This method is called by `brad.exec.admin.main`. +def alter_schema(args): + asyncio.run(alter_schema_impl(args)) diff --git a/src/brad/blueprint/sql_gen/table.py b/src/brad/blueprint/sql_gen/table.py index 665db33f..e465e0c8 100644 --- a/src/brad/blueprint/sql_gen/table.py +++ b/src/brad/blueprint/sql_gen/table.py @@ -217,6 +217,21 @@ def generate_extraction_progress_set_up_table_sql(self) -> Tuple[List[str], Engi return (queries, Engine.Aurora) + def generate_extraction_progress_init( + self, table_name: str + ) -> Tuple[List[str], Engine]: + queries = [] + initialize_template = ( + "INSERT INTO " + + AURORA_EXTRACT_PROGRESS_TABLE_NAME + + " (table_name, next_extract_seq, next_shadow_extract_seq) VALUES ('{table_name}', 0, 0)" + ) + base_table_names = self._blueprint.base_table_names() + table_locations = self._blueprint.get_table_locations(table_name) + if Engine.Aurora in table_locations and table_name in base_table_names: + queries.append(initialize_template.format(table_name=table_name)) + return (queries, Engine.Aurora) + def generate_create_index_sql( table: Table, indexes: List[Tuple[Column, ...]] diff --git a/src/brad/blueprint/user.py b/src/brad/blueprint/user.py index f787d479..2708437b 100644 --- a/src/brad/blueprint/user.py +++ b/src/brad/blueprint/user.py @@ -1,7 +1,8 @@ import yaml import pathlib -from typing import List, Set +from typing import List, Set, Dict +from brad.config.engine import Engine from .provisioning import Provisioning from .table import Column, Table @@ -34,6 +35,7 @@ def load_from_yaml_str(cls, yaml_str: str) -> "UserProvidedBlueprint": @classmethod def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint": tables: List[Table] = [] + bootstrap_locations: Dict[str, Set[Engine]] = {} for raw_table in raw_yaml["tables"]: name = raw_table["table_name"] columns = list( @@ -65,6 +67,13 @@ def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint": Table(name, columns, table_deps, transform, secondary_indexed_columns) ) + if "bootstrap_locations" in raw_table: + engine_set = { + Engine.from_str(engine) + for engine in raw_table["bootstrap_locations"] + } + bootstrap_locations[name] = engine_set + if "provisioning" in raw_yaml: aurora = raw_yaml["provisioning"]["aurora"] redshift = raw_yaml["provisioning"]["redshift"] @@ -86,6 +95,7 @@ def _load_from_raw_yaml(cls, raw_yaml) -> "UserProvidedBlueprint": tables, aurora_provisioning, redshift_provisioning, + bootstrap_locations, ) def __init__( @@ -94,11 +104,13 @@ def __init__( tables: List[Table], aurora_provisioning: Provisioning, redshift_provisioning: Provisioning, + bootstrap_locations: Dict[str, Set[Engine]], ): self._schema_name = schema_name self._tables = tables self._aurora_provisioning = aurora_provisioning self._redshift_provisioning = redshift_provisioning + self._bootstrap_locations = bootstrap_locations @property def schema_name(self) -> str: @@ -114,6 +126,9 @@ def aurora_provisioning(self) -> Provisioning: def redshift_provisioning(self) -> Provisioning: return self._redshift_provisioning + def bootstrap_locations(self) -> Dict[str, Set[Engine]]: + return self._bootstrap_locations + def validate(self) -> None: """ Checks the user-declared tables and ensures that there are (i) no diff --git a/src/brad/config/system_event.py b/src/brad/config/system_event.py index 31cdb9d7..169a2808 100644 --- a/src/brad/config/system_event.py +++ b/src/brad/config/system_event.py @@ -27,3 +27,6 @@ class SystemEvent(enum.Enum): PostTransitionCompleted = "post_transition_completed" AuroraPrimaryFailover = "aurora_primary_failover" + + # If this event occurs, we must redo the experiment. + WatchdogFired = "watchdog_fired" diff --git a/src/brad/daemon/blueprint_watchdog.py b/src/brad/daemon/blueprint_watchdog.py new file mode 100644 index 00000000..ca965831 --- /dev/null +++ b/src/brad/daemon/blueprint_watchdog.py @@ -0,0 +1,48 @@ +from typing import Optional + +from brad.config.engine import Engine +from brad.blueprint import Blueprint +from brad.config.system_event import SystemEvent +from brad.daemon.system_event_logger import SystemEventLogger + + +class BlueprintWatchdog: + """ + Used to prevent selecting blueprints that would cause issues. If the + watchdog fires, we must redo the experiment (this is meant as a backstop). + """ + + def __init__(self, event_logger: Optional[SystemEventLogger]) -> None: + self._event_logger = event_logger + + def reject_blueprint(self, blueprint: Blueprint) -> bool: + # Telemetry table should not go onto Aurora. + try: + telemetry_locations = blueprint.get_table_locations("telemetry") + if Engine.Aurora in telemetry_locations: + if self._event_logger is not None: + self._event_logger.log( + SystemEvent.WatchdogFired, + f"telemetry_placed_on_aurora: {str(telemetry_locations)}", + ) + return True + except ValueError: + # Indicates the table is not used in this schema - no problem. + pass + + # Embedding table should not leave Aurora. + try: + embedding_locations = blueprint.get_table_locations("embeddings") + if embedding_locations != [Engine.Aurora]: + if self._event_logger is not None: + self._event_logger.log( + SystemEvent.WatchdogFired, + f"embedding_left_aurora: {str(embedding_locations)}", + ) + return True + except ValueError: + # Indicates the table is not used in this schema - no problem. + pass + + # All ok. + return False diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index f4db4405..63f99297 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -27,6 +27,7 @@ from brad.daemon.monitor import Monitor from brad.daemon.system_event_logger import SystemEventLogger from brad.daemon.transition_orchestrator import TransitionOrchestrator +from brad.daemon.blueprint_watchdog import BlueprintWatchdog from brad.data_stats.estimator import Estimator from brad.data_stats.postgres_estimator import PostgresEstimator from brad.data_sync.execution.executor import DataSyncExecutor @@ -109,6 +110,7 @@ def __init__( self._transition_task: Optional[asyncio.Task[None]] = None self._system_event_logger = SystemEventLogger.create_if_requested(self._config) + self._watchdog = BlueprintWatchdog(self._system_event_logger) # This is used to hold references to internal command tasks we create. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task @@ -395,6 +397,12 @@ async def _handle_new_blueprint( self._system_event_logger.log(SystemEvent.NewBlueprintSkipped) return + if self._watchdog.reject_blueprint(blueprint): + logger.warning( + "Blueprint watchdog fired! Must re-run this blueprint planning pass." + ) + return + if PERSIST_BLUEPRINT_VAR in os.environ: logger.info( "Force-persisting the new blueprint. Run a manual transition and " diff --git a/src/brad/exec/admin.py b/src/brad/exec/admin.py index 0850a957..13a970ed 100644 --- a/src/brad/exec/admin.py +++ b/src/brad/exec/admin.py @@ -14,6 +14,7 @@ import brad.admin.restore_blueprint as restore_blueprint import brad.admin.replay_planner as replay_planner import brad.admin.clean_dataset as clean_dataset +import brad.admin.alter_schema as alter_schema logger = logging.getLogger(__name__) @@ -41,6 +42,7 @@ def register_command(subparsers) -> None: restore_blueprint.register_admin_action(admin_subparsers) replay_planner.register_admin_action(admin_subparsers) clean_dataset.register_admin_action(admin_subparsers) + alter_schema.register_admin_action(admin_subparsers) parser.set_defaults(func=main) diff --git a/src/brad/planner/data.py b/src/brad/planner/data.py index 966e4c9f..80b0a510 100644 --- a/src/brad/planner/data.py +++ b/src/brad/planner/data.py @@ -1,3 +1,4 @@ +import logging from typing import Dict, List from brad.blueprint import Blueprint @@ -7,6 +8,8 @@ from brad.routing.abstract_policy import FullRoutingPolicy from brad.routing.round_robin import RoundRobin +logger = logging.getLogger(__name__) + def bootstrap_blueprint(user: UserProvidedBlueprint) -> Blueprint: """ @@ -85,6 +88,15 @@ def process_table(table: Table, expect_standalone_base_table: bool): # Sanity check: Each table should be present on at least one engine. assert all(map(lambda locs: len(locs) > 0, table_locations.values())) + # Overwrite the placements where requested. + bootstrap_locations = user.bootstrap_locations() + for table_name in tables_by_name.keys(): + if table_name not in bootstrap_locations: + continue + new_locations = list(bootstrap_locations[table_name]) + logger.info("Setting the locations of %s to %s", table_name, str(new_locations)) + table_locations[table_name] = new_locations + # We pass through the provisioning hints provided by the user. return Blueprint( user.schema_name,