Skip to content

Commit

Permalink
minor modification to cost model parse query
Browse files Browse the repository at this point in the history
  • Loading branch information
wuziniu committed Nov 14, 2023
1 parent bf7d26c commit fc7d293
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def augment_no_workers(p, top_no_workers=0):
q.sql,
column_id_mapping,
table_id_mapping,
is_explain_only=True,
is_explain_only=False,
use_true_card=use_true_card,
db_conn=db_conn,
cursor=cursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def run_query_collect_statistics(
explain_only=False,
timeout_sec=None,
clear_cache=True,
plain_run=True,
plain_run=False,
):
results = None
runtimes = None
Expand Down
142 changes: 141 additions & 1 deletion workloads/cross_db_benchmark/benchmark_tools/redshift/parse_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def augment_no_workers(p, top_no_workers=0):
q.sql,
column_id_mapping,
table_id_mapping,
is_explain_only=True,
is_explain_only=False,
use_true_card=use_true_card,
db_conn=db_conn,
cursor=cursor,
Expand Down Expand Up @@ -256,3 +256,143 @@ def augment_no_workers(p, top_no_workers=0):
json.dump(run_stats, outfile)

return parsed_runs, stats


def get_parsed_plan_single_table(
explain_plan, database_stats, column_id_mapping, runtime
):
# this is hard coded for IMDB telemetry
# TODO: make it work generally for single table query
parsed_query = dict()
parsed_query["plan_parameters"] = {
"op_name": "embedding",
"num_tables": 1,
"num_joins": 0,
}
parsed_query["output_columns"] = [
{"aggregation": "COUNT", "columns": [i]} for i in range(len(column_id_mapping))
]
parsed_query["num_tables"] = 1
parsed_query["aurora_omit_join_cond"] = []
parsed_query["plan_runtime"] = runtime
parsed_query["join_nodes"] = [
{
"plan_parameters": {
"op_name": "join",
"act_card": 0,
"est_card": 0,
"act_children_card": 0.0,
"est_children_card": 0.0,
"est_width": 0,
},
"filter_columns": {
"columns": [0, 0],
"operator": "=",
"literal_feature": 0,
},
"tables": [0, 0],
"table_alias": [None, None],
}
]
parsed_query["scan_nodes"] = dict()
scan_info = explain_plan.children[0]["plan_parameters"]
plan_parameters = dict()
plan_parameters["op_name"] = "scan"
plan_parameters["est_card"] = scan_info["est_card"]
plan_parameters["act_card"] = scan_info["est_card"]
plan_parameters["est_width"] = scan_info["est_width"]
plan_parameters["est_children_card"] = database_stats["table_stats"][0]["reltuples"]
plan_parameters["act_children_card"] = database_stats["table_stats"][0]["reltuples"]
filter_columns = scan_info["filter_columns"]
output_columns = [
{"aggregation": "None", "columns": [i]} for i in range(len(column_id_mapping))
]
parsed_query["scan_nodes"][0] = {
"table": 0,
"plan_parameters": plan_parameters,
"filter_columns": filter_columns,
"output_columns": output_columns,
}
return parsed_query


def parse_queries_redshift_single_table(
run_stats,
exclude_runtime_first_run=True,
only_runtime_first_run=False,
max_runtime=300000,
):
column_id_mapping = dict()
table_id_mapping = dict()

partial_column_name_mapping = collections.defaultdict(set)

database_stats = run_stats["table_stats"]
parsed_queries = []
sql_queries = []
skipped = []

# enrich column stats with table sizes
table_sizes = dict()
for table_stat in database_stats["table_stats"]:
table_sizes[table_stat["relname"]] = table_stat["reltuples"]

for i, column_stat in enumerate(database_stats["column_stats"]):
table = column_stat["tablename"]
column = column_stat["attname"]
column_stat["table_size"] = table_sizes[table]
column_id_mapping[(table, column)] = i
partial_column_name_mapping[column].add(table)

# similar for table statistics
for i, table_stat in enumerate(database_stats["table_stats"]):
table = table_stat["relname"]
table_id_mapping[table] = i

for query_no, q in enumerate(tqdm(run_stats["query_list"])):
is_timeout = False
if hasattr(q, "error") and q.error:
skipped.append(query_no)
continue
# do not parse timeout queries
if hasattr(q, "timeout") and q.timeout:
is_timeout = True

sql_queries.append(q["sql"])
if is_timeout:
runtime = max_runtime * np.uniform(1, 3)
elif hasattr(q, "runtimes") and len(q.runtimes) > 1:
# We ran for more than one repetition.
# Always discard the first run to exclude compilation overhead (Redshift).
if only_runtime_first_run:
runtime = q.runtimes[0] * 1000
elif exclude_runtime_first_run:
runtime = statistics.mean(q.runtimes[1:]) * 1000
else:
runtime = statistics.mean(q.runtimes) * 1000
else:
runtime = q.runtime * 1000

explain_plan, _, _ = parse_plan(q["analyze_plans"], analyze=False)
explain_plan.parse_lines_recursively()
explain_plan.parse_columns_bottom_up(
column_id_mapping,
partial_column_name_mapping,
table_id_mapping,
alias_dict=dict(),
)
paresed_query = get_parsed_plan_single_table(
explain_plan, database_stats, column_id_mapping, runtime
)
paresed_query["query_index"] = query_no
parsed_queries.append(paresed_query)

parsed_runs = dict(
parsed_plans=[],
parsed_queries=parsed_queries,
sql_queries=sql_queries,
database_stats=database_stats,
run_kwargs=None,
skipped=skipped,
)
return parsed_runs

0 comments on commit fc7d293

Please sign in to comment.