Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT '{{query_id}}' AS QUERY_ID, SYSTEM$EXPLAIN_PLAN_JSON('{{query_id}}') AS QUERY_PLAN;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT * FROM TABLE(GET_QUERY_OPERATOR_STATS('{{query_id}}'));
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT
*
FROM
TABLE(
INFORMATION_SCHEMA.QUERY_HISTORY_BY_WAREHOUSE(
END_TIME_RANGE_START => TO_TIMESTAMP_LTZ({{start_timestamp}}),
END_TIME_RANGE_END => TO_TIMESTAMP_LTZ({{end_timestamp}}),
RESULT_LIMIT => 10000,
WAREHOUSE_NAME => '{{warehouse}}'
)
)
ORDER BY
start_time;
144 changes: 119 additions & 25 deletions perfkitbenchmarker/providers/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import json
import logging
import os
from typing import Any, Union
from typing import Any, Union, override
from absl import flags
from perfkitbenchmarker import data
from perfkitbenchmarker import edw_service
Expand Down Expand Up @@ -319,6 +319,39 @@ def GetSnowflakeClientInterface(
class Snowflake(edw_service.EdwService):
"""Object representing a Snowflake Data Warehouse Instance."""

SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index'

CREATE_INDEX_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2'
)
DELETE_INDEX_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2'
)
GET_INDEX_STATUS_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2'
)
INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2'
)
LOAD_SEARCH_DATA_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2'
)
INDEX_SEARCH_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2'
)
GET_ROW_COUNT_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2'
)
TIME_BOUND_QUERY_HISTORY_TEMPLATE = (
'edw/snowflake_aws/metadata/time_bound_query_history.sql.j2'
)
INDIVIDUAL_QUERY_PLAN_TEMPLATE = (
'edw/snowflake_aws/metadata/individual_query_plan.sql.j2'
)
INDIVIDUAL_QUERY_STATS_TEMPLATE = (
'edw/snowflake_aws/metadata/individual_query_stats.sql.j2'
)

CLOUD: str = None
SERVICE_TYPE = None

Expand Down Expand Up @@ -515,30 +548,6 @@ def GetMetadata(self):
basic_data.update(self.client_interface.GetMetadata())
return basic_data

SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index'

CREATE_INDEX_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2'
)
DELETE_INDEX_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2'
)
GET_INDEX_STATUS_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2'
)
INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2'
)
LOAD_SEARCH_DATA_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2'
)
INDEX_SEARCH_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2'
)
GET_ROW_COUNT_QUERY_TEMPLATE = (
f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2'
)

def CreateSearchIndex(
self, table_path: str, index_name: str
) -> tuple[float, dict[str, Any]]:
Expand Down Expand Up @@ -667,3 +676,88 @@ def SetWarehouse(self, warehouse: str):
)
self.warehouse = warehouse
self.client_interface.warehouse = warehouse

def _RunMetadataQuery(
self, query_template: str, query_name: str, context: dict[str, Any]
) -> dict[str, Any]:
self.client_interface.client_vm.RenderTemplate(
data.ResourcePath(query_template),
query_name,
context,
)
# log the query text
self.client_interface.client_vm.RemoteCommand(f'cat {query_name}')
_, output = self.client_interface.ExecuteQuery(
query_name, print_results=True
)
col_res = output['query_results']
return col_res

def _GetIndividualQueryMetadata(self, query_id: str) -> list[dict[str, Any]]:
query_plan_file_name = f'individual_query_plan_{query_id}.sql'
query_stats_file_name = f'individual_query_stats_{query_id}.sql'
context = {
'query_id': query_id,
}
query_plan_rows = self.ColsToRows(
self._RunMetadataQuery(
self.INDIVIDUAL_QUERY_PLAN_TEMPLATE,
query_plan_file_name,
context,
)
)
query_stats_rows = self.ColsToRows(
self._RunMetadataQuery(
self.INDIVIDUAL_QUERY_STATS_TEMPLATE,
query_stats_file_name,
context,
)
)
results = [
{
'metric': 'edw_sf_query_plan',
'value': 1,
'unit': 'metadata',
'metadata': {
f'sf_{key}': value for key, value in query_plan_rows[0].items()
},
},
{
'metric': 'edw_sf_query_stats',
'value': 1,
'unit': 'metadata',
'metadata': {
'sf_query_stats': json.dumps(query_stats_rows, default=str)
},
},
]
return results

@override
def GetTimeBoundAuxiliaryMetrics(
self, start_timestamp: float, end_timestamp: float
) -> list[dict[str, Any]]:
"""Returns the auxiliary metrics for the given run."""
query_file_name = f'metadata_query_{start_timestamp}.sql'
context = {
'start_timestamp': start_timestamp,
'end_timestamp': end_timestamp,
'warehouse': self.warehouse,
}
col_res = self._RunMetadataQuery(
self.TIME_BOUND_QUERY_HISTORY_TEMPLATE,
query_file_name,
context,
)
row_res = self.ColsToRows(col_res)
history_results = []
for row in row_res:
history_results.append({
'metric': 'sf_query_metadata',
'value': 1,
'unit': 'metadata',
'metadata': {f'sf_{key}': value for key, value in row.items()},
})
for qid in col_res['QUERY_ID']:
history_results.extend(self._GetIndividualQueryMetadata(qid))
return history_results