From fc7d2931674578c1d358c7ad3d21c90135db81cf Mon Sep 17 00:00:00 2001 From: wuziniu Date: Tue, 14 Nov 2023 10:31:01 -0500 Subject: [PATCH] minor modification to cost model parse query --- .../benchmark_tools/athena/parse_query.py | 2 +- .../redshift/database_connection.py | 2 +- .../benchmark_tools/redshift/parse_query.py | 142 +++++++++++++++++- 3 files changed, 143 insertions(+), 3 deletions(-) diff --git a/workloads/cross_db_benchmark/benchmark_tools/athena/parse_query.py b/workloads/cross_db_benchmark/benchmark_tools/athena/parse_query.py index 380894b5..cdc6c987 100644 --- a/workloads/cross_db_benchmark/benchmark_tools/athena/parse_query.py +++ b/workloads/cross_db_benchmark/benchmark_tools/athena/parse_query.py @@ -419,7 +419,7 @@ def augment_no_workers(p, top_no_workers=0): q.sql, column_id_mapping, table_id_mapping, - is_explain_only=True, + is_explain_only=False, use_true_card=use_true_card, db_conn=db_conn, cursor=cursor, diff --git a/workloads/cross_db_benchmark/benchmark_tools/redshift/database_connection.py b/workloads/cross_db_benchmark/benchmark_tools/redshift/database_connection.py index b45714c8..270879a0 100644 --- a/workloads/cross_db_benchmark/benchmark_tools/redshift/database_connection.py +++ b/workloads/cross_db_benchmark/benchmark_tools/redshift/database_connection.py @@ -179,7 +179,7 @@ def run_query_collect_statistics( explain_only=False, timeout_sec=None, clear_cache=True, - plain_run=True, + plain_run=False, ): results = None runtimes = None diff --git a/workloads/cross_db_benchmark/benchmark_tools/redshift/parse_query.py b/workloads/cross_db_benchmark/benchmark_tools/redshift/parse_query.py index d672dfb6..79ac2515 100644 --- a/workloads/cross_db_benchmark/benchmark_tools/redshift/parse_query.py +++ b/workloads/cross_db_benchmark/benchmark_tools/redshift/parse_query.py @@ -193,7 +193,7 @@ def augment_no_workers(p, top_no_workers=0): q.sql, column_id_mapping, table_id_mapping, - is_explain_only=True, + is_explain_only=False, use_true_card=use_true_card, db_conn=db_conn, cursor=cursor, @@ -256,3 +256,143 @@ def augment_no_workers(p, top_no_workers=0): json.dump(run_stats, outfile) return parsed_runs, stats + + +def get_parsed_plan_single_table( + explain_plan, database_stats, column_id_mapping, runtime +): + # this is hard coded for IMDB telemetry + # TODO: make it work generally for single table query + parsed_query = dict() + parsed_query["plan_parameters"] = { + "op_name": "embedding", + "num_tables": 1, + "num_joins": 0, + } + parsed_query["output_columns"] = [ + {"aggregation": "COUNT", "columns": [i]} for i in range(len(column_id_mapping)) + ] + parsed_query["num_tables"] = 1 + parsed_query["aurora_omit_join_cond"] = [] + parsed_query["plan_runtime"] = runtime + parsed_query["join_nodes"] = [ + { + "plan_parameters": { + "op_name": "join", + "act_card": 0, + "est_card": 0, + "act_children_card": 0.0, + "est_children_card": 0.0, + "est_width": 0, + }, + "filter_columns": { + "columns": [0, 0], + "operator": "=", + "literal_feature": 0, + }, + "tables": [0, 0], + "table_alias": [None, None], + } + ] + parsed_query["scan_nodes"] = dict() + scan_info = explain_plan.children[0]["plan_parameters"] + plan_parameters = dict() + plan_parameters["op_name"] = "scan" + plan_parameters["est_card"] = scan_info["est_card"] + plan_parameters["act_card"] = scan_info["est_card"] + plan_parameters["est_width"] = scan_info["est_width"] + plan_parameters["est_children_card"] = database_stats["table_stats"][0]["reltuples"] + plan_parameters["act_children_card"] = database_stats["table_stats"][0]["reltuples"] + filter_columns = scan_info["filter_columns"] + output_columns = [ + {"aggregation": "None", "columns": [i]} for i in range(len(column_id_mapping)) + ] + parsed_query["scan_nodes"][0] = { + "table": 0, + "plan_parameters": plan_parameters, + "filter_columns": filter_columns, + "output_columns": output_columns, + } + return parsed_query + + +def parse_queries_redshift_single_table( + run_stats, + exclude_runtime_first_run=True, + only_runtime_first_run=False, + max_runtime=300000, +): + column_id_mapping = dict() + table_id_mapping = dict() + + partial_column_name_mapping = collections.defaultdict(set) + + database_stats = run_stats["table_stats"] + parsed_queries = [] + sql_queries = [] + skipped = [] + + # enrich column stats with table sizes + table_sizes = dict() + for table_stat in database_stats["table_stats"]: + table_sizes[table_stat["relname"]] = table_stat["reltuples"] + + for i, column_stat in enumerate(database_stats["column_stats"]): + table = column_stat["tablename"] + column = column_stat["attname"] + column_stat["table_size"] = table_sizes[table] + column_id_mapping[(table, column)] = i + partial_column_name_mapping[column].add(table) + + # similar for table statistics + for i, table_stat in enumerate(database_stats["table_stats"]): + table = table_stat["relname"] + table_id_mapping[table] = i + + for query_no, q in enumerate(tqdm(run_stats["query_list"])): + is_timeout = False + if hasattr(q, "error") and q.error: + skipped.append(query_no) + continue + # do not parse timeout queries + if hasattr(q, "timeout") and q.timeout: + is_timeout = True + + sql_queries.append(q["sql"]) + if is_timeout: + runtime = max_runtime * np.uniform(1, 3) + elif hasattr(q, "runtimes") and len(q.runtimes) > 1: + # We ran for more than one repetition. + # Always discard the first run to exclude compilation overhead (Redshift). + if only_runtime_first_run: + runtime = q.runtimes[0] * 1000 + elif exclude_runtime_first_run: + runtime = statistics.mean(q.runtimes[1:]) * 1000 + else: + runtime = statistics.mean(q.runtimes) * 1000 + else: + runtime = q.runtime * 1000 + + explain_plan, _, _ = parse_plan(q["analyze_plans"], analyze=False) + explain_plan.parse_lines_recursively() + explain_plan.parse_columns_bottom_up( + column_id_mapping, + partial_column_name_mapping, + table_id_mapping, + alias_dict=dict(), + ) + paresed_query = get_parsed_plan_single_table( + explain_plan, database_stats, column_id_mapping, runtime + ) + paresed_query["query_index"] = query_no + parsed_queries.append(paresed_query) + + parsed_runs = dict( + parsed_plans=[], + parsed_queries=parsed_queries, + sql_queries=sql_queries, + database_stats=database_stats, + run_kwargs=None, + skipped=skipped, + ) + return parsed_runs