From 2722162789414da3743f4a08b2f7a97bb7beb528 Mon Sep 17 00:00:00 2001 From: Zexin Yao Date: Mon, 28 Apr 2025 11:01:08 -0700 Subject: [PATCH 1/5] SNOW-2057867 refactor and fixes to make pandas write work for Python sprocs - use _upload to implement write_pandas - correctly use scoped keyword and temp naming pattern for write_pandas - fix a minor bug where file_transfer_agent returns meta.dst_file_name as file name in download (dst_file_name may contain sub-directories, whereas name will be the base file name without any sub-directory prefixes. We should keep it consistent with upload, where we use meta.name as file name ### Tests - existing test_pandas_tools.py to make sure the change does not break existing write_pandas logic --- .../connector/file_transfer_agent.py | 2 +- src/snowflake/connector/pandas_tools.py | 31 +++++++++---------- test/integ/pandas/test_pandas_tools.py | 21 ++++++++++--- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/snowflake/connector/file_transfer_agent.py b/src/snowflake/connector/file_transfer_agent.py index b3cdc5f9c..f1db12a60 100644 --- a/src/snowflake/connector/file_transfer_agent.py +++ b/src/snowflake/connector/file_transfer_agent.py @@ -818,7 +818,7 @@ def result(self) -> dict[str, Any]: rowset.append( [ - meta.dst_file_name, + meta.name, dst_file_size, meta.result_status, error_details, diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 5c1626954..45c942882 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -383,6 +383,10 @@ def write_pandas( "Unsupported table type. Expected table types: temp/temporary, transient" ) + if table_type.lower() in ["temp", "temporary"]: + # Add scoped keyword when applicable. + table_type = get_temp_type_for_object(_use_scoped_temp_object).lower() + if chunk_size is None: chunk_size = len(df) @@ -438,22 +442,13 @@ def write_pandas( # Dump chunk into parquet file chunk.to_parquet(chunk_path, compression=compression, **kwargs) # Upload parquet file - upload_sql = ( - "PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ " - "'file://{path}' ? PARALLEL={parallel}" - ).format( - path=chunk_path.replace("\\", "\\\\").replace("'", "\\'"), - parallel=parallel, - ) - params = ("@" + stage_location,) - logger.debug(f"uploading files with '{upload_sql}', params: %s", params) - cursor.execute( - upload_sql, - _is_internal=True, - _force_qmark_paramstyle=True, - params=params, - num_statements=1, + path = chunk_path.replace("\\", "\\\\").replace("'", "\\'") + cursor._upload( + local_file_name="file://" + path, + stage_location="@" + stage_location, + options={"parallel": parallel}, ) + # Remove chunk file os.remove(chunk_path) @@ -516,7 +511,11 @@ def drop_object(name: str, object_type: str) -> None: target_table_location = build_location_helper( database, schema, - random_string() if (overwrite and auto_create_table) else table_name, + ( + random_name_for_temp_object(TempObjectType.TABLE) + if (overwrite and auto_create_table) + else table_name + ), quote_identifiers, ) diff --git a/test/integ/pandas/test_pandas_tools.py b/test/integ/pandas/test_pandas_tools.py index 8d69fd1a9..cb68a0f8c 100644 --- a/test/integ/pandas/test_pandas_tools.py +++ b/test/integ/pandas/test_pandas_tools.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, Callable, Generator from unittest import mock +from unittest.mock import MagicMock import numpy.random import pytest @@ -543,7 +544,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(), @@ -591,7 +595,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(), @@ -643,7 +650,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: cnx._update_parameters({"PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS": True}) success, nchunks, nrows, _ = write_pandas( cnx, @@ -701,7 +711,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(), From 2af775b98d9c4f75baafa9dbf0f9e0de43d5dcba Mon Sep 17 00:00:00 2001 From: Zexin Yao Date: Mon, 28 Apr 2025 12:59:12 -0700 Subject: [PATCH 2/5] revert the change to file_transfer_agent.py, because we want to follow a behavior change process for that --- src/snowflake/connector/file_transfer_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/file_transfer_agent.py b/src/snowflake/connector/file_transfer_agent.py index f1db12a60..b3cdc5f9c 100644 --- a/src/snowflake/connector/file_transfer_agent.py +++ b/src/snowflake/connector/file_transfer_agent.py @@ -818,7 +818,7 @@ def result(self) -> dict[str, Any]: rowset.append( [ - meta.name, + meta.dst_file_name, dst_file_size, meta.result_status, error_details, From ed8f6bdcb3854f3d4b85938a74950044a7d69d68 Mon Sep 17 00:00:00 2001 From: Zexin Yao Date: Tue, 29 Apr 2025 16:38:20 -0700 Subject: [PATCH 3/5] use single quotes to surround the local path, to keep it consistent with old code --- src/snowflake/connector/pandas_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 45c942882..2fe2079fe 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -444,7 +444,7 @@ def write_pandas( # Upload parquet file path = chunk_path.replace("\\", "\\\\").replace("'", "\\'") cursor._upload( - local_file_name="file://" + path, + local_file_name=f"'file://{path}'", stage_location="@" + stage_location, options={"parallel": parallel}, ) From 30ce232728ec6b6e56084a7400ba92a17dc4abcd Mon Sep 17 00:00:00 2001 From: Zexin Yao Date: Wed, 30 Apr 2025 09:37:03 -0700 Subject: [PATCH 4/5] put "source_compression": "auto_detect" in the options - this has no effect for regular / non-sproc use cases because it is the default option - sproc require it to be explicitly present, so we need it here (we will have a future server side change to make it optional as well for sproc) --- src/snowflake/connector/pandas_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 2fe2079fe..0da02c11c 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -446,7 +446,7 @@ def write_pandas( cursor._upload( local_file_name=f"'file://{path}'", stage_location="@" + stage_location, - options={"parallel": parallel}, + options={"parallel": parallel, "source_compression": "auto_detect"}, ) # Remove chunk file From e841eb4d0daaeba096737643eff22834a93aa746 Mon Sep 17 00:00:00 2001 From: Zexin Yao Date: Wed, 30 Apr 2025 09:42:12 -0700 Subject: [PATCH 5/5] fix a pandas_tools bug, where we don't use snowpark naming pattern for regular temp object - the naming pattern applies to both regular temp and scoped temp, but here we only use that naming pattern for scoped temp, this is incorrect - it works fine for non-sproc (for now), but it will break for sproc - this change of naming pattern has no effect on customers because these are creating intermediate results, which is not consumed directly by customers --- src/snowflake/connector/pandas_tools.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 0da02c11c..038ff2083 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -20,7 +20,6 @@ from snowflake.connector import ProgrammingError from snowflake.connector.options import pandas from snowflake.connector.telemetry import TelemetryData, TelemetryField -from snowflake.connector.util_text import random_string from ._utils import ( _PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS_STRING, @@ -103,11 +102,7 @@ def _create_temp_stage( overwrite: bool, use_scoped_temp_object: bool = False, ) -> str: - stage_name = ( - random_name_for_temp_object(TempObjectType.STAGE) - if use_scoped_temp_object - else random_string() - ) + stage_name = random_name_for_temp_object(TempObjectType.STAGE) stage_location = build_location_helper( database=database, schema=schema, @@ -174,11 +169,7 @@ def _create_temp_file_format( sql_use_logical_type: str, use_scoped_temp_object: bool = False, ) -> str: - file_format_name = ( - random_name_for_temp_object(TempObjectType.FILE_FORMAT) - if use_scoped_temp_object - else random_string() - ) + file_format_name = random_name_for_temp_object(TempObjectType.FILE_FORMAT) file_format_location = build_location_helper( database=database, schema=schema,