diff --git a/src/snowflake/connector/pandas_tools.py b/src/snowflake/connector/pandas_tools.py index 5c1626954..038ff2083 100644 --- a/src/snowflake/connector/pandas_tools.py +++ b/src/snowflake/connector/pandas_tools.py @@ -20,7 +20,6 @@ from snowflake.connector import ProgrammingError from snowflake.connector.options import pandas from snowflake.connector.telemetry import TelemetryData, TelemetryField -from snowflake.connector.util_text import random_string from ._utils import ( _PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS_STRING, @@ -103,11 +102,7 @@ def _create_temp_stage( overwrite: bool, use_scoped_temp_object: bool = False, ) -> str: - stage_name = ( - random_name_for_temp_object(TempObjectType.STAGE) - if use_scoped_temp_object - else random_string() - ) + stage_name = random_name_for_temp_object(TempObjectType.STAGE) stage_location = build_location_helper( database=database, schema=schema, @@ -174,11 +169,7 @@ def _create_temp_file_format( sql_use_logical_type: str, use_scoped_temp_object: bool = False, ) -> str: - file_format_name = ( - random_name_for_temp_object(TempObjectType.FILE_FORMAT) - if use_scoped_temp_object - else random_string() - ) + file_format_name = random_name_for_temp_object(TempObjectType.FILE_FORMAT) file_format_location = build_location_helper( database=database, schema=schema, @@ -383,6 +374,10 @@ def write_pandas( "Unsupported table type. Expected table types: temp/temporary, transient" ) + if table_type.lower() in ["temp", "temporary"]: + # Add scoped keyword when applicable. + table_type = get_temp_type_for_object(_use_scoped_temp_object).lower() + if chunk_size is None: chunk_size = len(df) @@ -438,22 +433,13 @@ def write_pandas( # Dump chunk into parquet file chunk.to_parquet(chunk_path, compression=compression, **kwargs) # Upload parquet file - upload_sql = ( - "PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ " - "'file://{path}' ? PARALLEL={parallel}" - ).format( - path=chunk_path.replace("\\", "\\\\").replace("'", "\\'"), - parallel=parallel, - ) - params = ("@" + stage_location,) - logger.debug(f"uploading files with '{upload_sql}', params: %s", params) - cursor.execute( - upload_sql, - _is_internal=True, - _force_qmark_paramstyle=True, - params=params, - num_statements=1, + path = chunk_path.replace("\\", "\\\\").replace("'", "\\'") + cursor._upload( + local_file_name=f"'file://{path}'", + stage_location="@" + stage_location, + options={"parallel": parallel, "source_compression": "auto_detect"}, ) + # Remove chunk file os.remove(chunk_path) @@ -516,7 +502,11 @@ def drop_object(name: str, object_type: str) -> None: target_table_location = build_location_helper( database, schema, - random_string() if (overwrite and auto_create_table) else table_name, + ( + random_name_for_temp_object(TempObjectType.TABLE) + if (overwrite and auto_create_table) + else table_name + ), quote_identifiers, ) diff --git a/test/integ/pandas/test_pandas_tools.py b/test/integ/pandas/test_pandas_tools.py index 8d69fd1a9..cb68a0f8c 100644 --- a/test/integ/pandas/test_pandas_tools.py +++ b/test/integ/pandas/test_pandas_tools.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, Callable, Generator from unittest import mock +from unittest.mock import MagicMock import numpy.random import pytest @@ -543,7 +544,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(), @@ -591,7 +595,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(), @@ -643,7 +650,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: cnx._update_parameters({"PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS": True}) success, nchunks, nrows, _ = write_pandas( cnx, @@ -701,7 +711,10 @@ def mocked_execute(*args, **kwargs): with mock.patch( "snowflake.connector.cursor.SnowflakeCursor.execute", side_effect=mocked_execute, - ) as m_execute: + ) as m_execute, mock.patch( + "snowflake.connector.cursor.SnowflakeCursor._upload", + side_effect=MagicMock(), + ) as _: success, nchunks, nrows, _ = write_pandas( cnx, sf_connector_version_df.get(),