diff --git a/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_plan.sql.j2 b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_plan.sql.j2 new file mode 100644 index 0000000000..5151aa6f7f --- /dev/null +++ b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_plan.sql.j2 @@ -0,0 +1 @@ +SELECT '{{query_id}}' AS QUERY_ID, SYSTEM$EXPLAIN_PLAN_JSON('{{query_id}}') AS QUERY_PLAN; diff --git a/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_stats.sql.j2 b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_stats.sql.j2 new file mode 100644 index 0000000000..18944b938e --- /dev/null +++ b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/individual_query_stats.sql.j2 @@ -0,0 +1 @@ +SELECT * FROM TABLE(GET_QUERY_OPERATOR_STATS('{{query_id}}')); diff --git a/perfkitbenchmarker/data/edw/snowflake_aws/metadata/time_bound_query_history.sql.j2 b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/time_bound_query_history.sql.j2 new file mode 100644 index 0000000000..511493b568 --- /dev/null +++ b/perfkitbenchmarker/data/edw/snowflake_aws/metadata/time_bound_query_history.sql.j2 @@ -0,0 +1,13 @@ +SELECT + * +FROM + TABLE( + INFORMATION_SCHEMA.QUERY_HISTORY_BY_WAREHOUSE( + END_TIME_RANGE_START => TO_TIMESTAMP_LTZ({{start_timestamp}}), + END_TIME_RANGE_END => TO_TIMESTAMP_LTZ({{end_timestamp}}), + RESULT_LIMIT => 10000, + WAREHOUSE_NAME => '{{warehouse}}' + ) + ) +ORDER BY + start_time; diff --git a/perfkitbenchmarker/providers/snowflake/snowflake.py b/perfkitbenchmarker/providers/snowflake/snowflake.py index 98a0c7ee2e..24cf646bf3 100644 --- a/perfkitbenchmarker/providers/snowflake/snowflake.py +++ b/perfkitbenchmarker/providers/snowflake/snowflake.py @@ -22,7 +22,7 @@ import json import logging import os -from typing import Any, Union +from typing import Any, Union, override from absl import flags from perfkitbenchmarker import data from perfkitbenchmarker import edw_service @@ -319,6 +319,39 @@ def GetSnowflakeClientInterface( class Snowflake(edw_service.EdwService): """Object representing a Snowflake Data Warehouse Instance.""" + SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index' + + CREATE_INDEX_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2' + ) + DELETE_INDEX_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2' + ) + GET_INDEX_STATUS_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2' + ) + INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2' + ) + LOAD_SEARCH_DATA_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2' + ) + INDEX_SEARCH_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2' + ) + GET_ROW_COUNT_QUERY_TEMPLATE = ( + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2' + ) + TIME_BOUND_QUERY_HISTORY_TEMPLATE = ( + 'edw/snowflake_aws/metadata/time_bound_query_history.sql.j2' + ) + INDIVIDUAL_QUERY_PLAN_TEMPLATE = ( + 'edw/snowflake_aws/metadata/individual_query_plan.sql.j2' + ) + INDIVIDUAL_QUERY_STATS_TEMPLATE = ( + 'edw/snowflake_aws/metadata/individual_query_stats.sql.j2' + ) + CLOUD: str = None SERVICE_TYPE = None @@ -515,30 +548,6 @@ def GetMetadata(self): basic_data.update(self.client_interface.GetMetadata()) return basic_data - SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index' - - CREATE_INDEX_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2' - ) - DELETE_INDEX_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2' - ) - GET_INDEX_STATUS_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2' - ) - INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2' - ) - LOAD_SEARCH_DATA_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2' - ) - INDEX_SEARCH_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2' - ) - GET_ROW_COUNT_QUERY_TEMPLATE = ( - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2' - ) - def CreateSearchIndex( self, table_path: str, index_name: str ) -> tuple[float, dict[str, Any]]: @@ -667,3 +676,88 @@ def SetWarehouse(self, warehouse: str): ) self.warehouse = warehouse self.client_interface.warehouse = warehouse + + def _RunMetadataQuery( + self, query_template: str, query_name: str, context: dict[str, Any] + ) -> dict[str, Any]: + self.client_interface.client_vm.RenderTemplate( + data.ResourcePath(query_template), + query_name, + context, + ) + # log the query text + self.client_interface.client_vm.RemoteCommand(f'cat {query_name}') + _, output = self.client_interface.ExecuteQuery( + query_name, print_results=True + ) + col_res = output['query_results'] + return col_res + + def _GetIndividualQueryMetadata(self, query_id: str) -> list[dict[str, Any]]: + query_plan_file_name = f'individual_query_plan_{query_id}.sql' + query_stats_file_name = f'individual_query_stats_{query_id}.sql' + context = { + 'query_id': query_id, + } + query_plan_rows = self.ColsToRows( + self._RunMetadataQuery( + self.INDIVIDUAL_QUERY_PLAN_TEMPLATE, + query_plan_file_name, + context, + ) + ) + query_stats_rows = self.ColsToRows( + self._RunMetadataQuery( + self.INDIVIDUAL_QUERY_STATS_TEMPLATE, + query_stats_file_name, + context, + ) + ) + results = [ + { + 'metric': 'edw_sf_query_plan', + 'value': 1, + 'unit': 'metadata', + 'metadata': { + f'sf_{key}': value for key, value in query_plan_rows[0].items() + }, + }, + { + 'metric': 'edw_sf_query_stats', + 'value': 1, + 'unit': 'metadata', + 'metadata': { + 'sf_query_stats': json.dumps(query_stats_rows, default=str) + }, + }, + ] + return results + + @override + def GetTimeBoundAuxiliaryMetrics( + self, start_timestamp: float, end_timestamp: float + ) -> list[dict[str, Any]]: + """Returns the auxiliary metrics for the given run.""" + query_file_name = f'metadata_query_{start_timestamp}.sql' + context = { + 'start_timestamp': start_timestamp, + 'end_timestamp': end_timestamp, + 'warehouse': self.warehouse, + } + col_res = self._RunMetadataQuery( + self.TIME_BOUND_QUERY_HISTORY_TEMPLATE, + query_file_name, + context, + ) + row_res = self.ColsToRows(col_res) + history_results = [] + for row in row_res: + history_results.append({ + 'metric': 'sf_query_metadata', + 'value': 1, + 'unit': 'metadata', + 'metadata': {f'sf_{key}': value for key, value in row.items()}, + }) + for qid in col_res['QUERY_ID']: + history_results.extend(self._GetIndividualQueryMetadata(qid)) + return history_results