diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index 1a59c243..1790654e 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -137,24 +137,21 @@ async def _run_replan_impl( ctx.compute_engine_latency_norm_factor() beam_size = self._planner_config.beam_size() - engines = [Engine.Aurora, Engine.Redshift, Engine.Athena] first_query_idx = query_indices[0] + first_query = analytical_queries[first_query_idx] current_top_k: List[BlueprintCandidate] = [] - # Not a fundamental limitation, but it simplifies the implementation - # below if this condition is true. - assert beam_size >= len(engines) - # 4. Initialize the top-k set (beam). - for routing_engine in engines: + for routing_engine in Engine.from_bitmap( + planning_router.run_functionality_routing(first_query) + ): candidate = BlueprintCandidate.based_on( self._current_blueprint, self._comparator ) candidate.add_transactional_tables(ctx) - query = analytical_queries[first_query_idx] candidate.add_query( first_query_idx, - query, + first_query, routing_engine, next_workload.get_predicted_analytical_latency( first_query_idx, routing_engine @@ -187,10 +184,16 @@ async def _run_replan_impl( next_top_k: List[BlueprintCandidate] = [] query = analytical_queries[query_idx] + # Only a subset of the engines may support this query if it uses + # "special functionality". + engine_candidates = Engine.from_bitmap( + planning_router.run_functionality_routing(query) + ) + # For each candidate in the current top k, expand it by one # query in the workload. for curr_candidate in current_top_k: - for routing_engine in engines: + for routing_engine in engine_candidates: next_candidate = curr_candidate.clone() next_candidate.add_query( query_idx, diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index d0ba2360..5a2903af 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -102,6 +102,7 @@ async def _run_replan_impl( ) # 2. Cluster queries by tables and sort by gains (sum). + # TODO: Need to consider functionality when creating clusters. clusters = self._preprocess_workload_queries(next_workload) # Sanity check. We cannot run planning without at least one query in the diff --git a/src/brad/routing/router.py b/src/brad/routing/router.py index 7f3b579b..f70e3687 100644 --- a/src/brad/routing/router.py +++ b/src/brad/routing/router.py @@ -93,7 +93,7 @@ async def engine_for( place_support = self._run_location_routing(query, self._table_placement_bitmap) # Engine functionality constraints. - func_support = self._run_functionality_routing(query) + func_support = self.run_functionality_routing(query) # Get supported engines. valid_locations = place_support & func_support @@ -145,7 +145,7 @@ def engine_for_sync( # Ideally we re-implement a sync version. return asyncio.run(self.engine_for(query, session)) - def _run_functionality_routing(self, query: QueryRep) -> int: + def run_functionality_routing(self, query: QueryRep) -> int: """ Based on the functinalities required by the query (e.g. geospatial), compute the set of engines that are able to serve this query.