diff --git a/DESCRIPTION.md b/DESCRIPTION.md index eac6d7643..0ae980e5b 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -23,6 +23,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne - Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID. - Fix expired S3 credentials update and increment retry when expired credentials are found. - Added `client_fetch_threads` experimental parameter to better utilize threads for fetching query results. + - Add `snowflake_type` parameter to execute method of a cursor. When set to `OBJECT` or `ARRAY` it allows binding of these semi-structured types. - v3.14.0(March 03, 2025) - Bumped pyOpenSSL dependency upper boundary from <25.0.0 to <26.0.0. diff --git a/src/snowflake/connector/connection.py b/src/snowflake/connector/connection.py index c1bcd7753..1e39981a1 100644 --- a/src/snowflake/connector/connection.py +++ b/src/snowflake/connector/connection.py @@ -3,6 +3,7 @@ import atexit import collections.abc +import json import logging import os import pathlib @@ -28,6 +29,8 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from snowflake.connector.errorcode import ER_NOT_SUPPORT_DATA_TYPE + from . import errors, proxy from ._query_context_cache import QueryContextCache from .auth import ( @@ -1778,36 +1781,112 @@ def _get_snowflake_type_and_binding( self.converter.to_snowflake_bindings(snowflake_type, v), ) + @staticmethod + def _server_side_binding_supported(snowflake_type: str): + if snowflake_type == "VARIANT": + logger.warning("Server side binding for VARIANT type is not supported.") + return snowflake_type in ("OBJECT", "ARRAY") + + def _process_server_side_semi_structured_bindings( + self, cursor: SnowflakeCursor | None, params: Sequence, snowflake_type: str + ) -> dict[str, dict[str, str]]: + processed_params = {} + + if snowflake_type == "OBJECT": + for idx, v in enumerate(params): + if isinstance(v, dict): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": json.dumps(v), + } + elif isinstance(v, str) or v is None: + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": v, + } + else: + Error.errorhandler_wrapper( + self, + cursor, + ProgrammingError, + { + "msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.", + "errno": ER_NOT_SUPPORT_DATA_TYPE, + }, + ) + elif snowflake_type == "ARRAY": + for idx, v in enumerate(params): + if isinstance(v, (tuple, list)): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": json.dumps(v), + } + elif isinstance(v, str) or v is None: + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": v, + } + else: + Error.errorhandler_wrapper( + self, + cursor, + ProgrammingError, + { + "msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.", + "errno": ER_NOT_SUPPORT_DATA_TYPE, + }, + ) + + return processed_params + # TODO we could probably rework this to not make dicts like this: {'1': 'value', '2': '13'} def _process_params_qmarks( self, params: Sequence | None, cursor: SnowflakeCursor | None = None, + snowflake_type: str | None = None, ) -> dict[str, dict[str, str]] | None: if not params: return None processed_params = {} - get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor) - - for idx, v in enumerate(params): - if isinstance(v, list): - snowflake_type = self.converter.snowflake_type(v) - all_param_data = list(map(get_type_and_binding, v)) - first_type = all_param_data[0].type - # if all elements have the same snowflake type, update snowflake_type - if all(param_data.type == first_type for param_data in all_param_data): - snowflake_type = first_type - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "value": [param_data.binding for param_data in all_param_data], - } - else: - snowflake_type, snowflake_binding = get_type_and_binding(v) - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "value": snowflake_binding, - } + if self._server_side_binding_supported(snowflake_type): + processed_params = self._process_server_side_semi_structured_bindings( + cursor, params, snowflake_type + ) + + else: + get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor) + + for idx, v in enumerate(params): + if isinstance(v, list): + inferred_snowflake_type = self.converter.snowflake_type(v) + all_param_data = list(map(get_type_and_binding, v)) + first_type = all_param_data[0].type + # if all elements have the same snowflake type, update snowflake_type + if all( + param_data.type == first_type for param_data in all_param_data + ): + inferred_snowflake_type = first_type + if snowflake_type and inferred_snowflake_type != snowflake_type: + logger.warning( + f"Inferred snowflake type: {inferred_snowflake_type} is different than provided: {snowflake_type}. Proceeding with provided one. Omit this parameter in order to proceed with inferred type", + ) + target_type = inferred_snowflake_type + processed_params[str(idx + 1)] = { + "type": target_type, + "value": [param_data.binding for param_data in all_param_data], + } + else: + target_type, snowflake_binding = get_type_and_binding(v) + processed_params[str(idx + 1)] = { + "type": target_type, + "value": snowflake_binding, + } if logger.getEffectiveLevel() <= logging.DEBUG: for k, v in processed_params.items(): logger.debug("idx: %s, type: %s", k, v.get("type")) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index ae94d4747..e87fa12d5 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -839,6 +839,7 @@ def execute( file_stream: IO[bytes] | None = None, num_statements: int | None = None, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> Self | None: ... @overload @@ -869,6 +870,7 @@ def execute( file_stream: IO[bytes] | None = None, num_statements: int | None = None, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> dict[str, Any] | None: ... def execute( @@ -899,6 +901,7 @@ def execute( num_statements: int | None = None, _force_qmark_paramstyle: bool = False, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> Self | dict[str, Any] | None: """Executes a command/query. @@ -935,6 +938,7 @@ def execute( statements being submitted (or 0 if submitting an uncounted number) when using a multi-statement query. _force_qmark_paramstyle: Force the use of qmark paramstyle regardless of the connection's paramstyle. _dataframe_ast: Base64-encoded dataframe request abstract syntax tree. + snowflake_type: Type for parameter binding. Enables server-side semi-structured types binding if set to ARRAY, OBJECT or VARIANT. Returns: The cursor itself, or None if some error happened, or the response returned @@ -1000,7 +1004,7 @@ def execute( ) kwargs["binding_params"] = self._connection._process_params_qmarks( - params, self + params, self, snowflake_type=snowflake_type ) m = DESC_TABLE_RE.match(query) diff --git a/test/integ/test_bindings.py b/test/integ/test_bindings.py index e5820c199..c2da1ff51 100644 --- a/test/integ/test_bindings.py +++ b/test/integ/test_bindings.py @@ -2,6 +2,7 @@ from __future__ import annotations import calendar +import json import tempfile import time from datetime import date, datetime @@ -489,6 +490,100 @@ def test_binding_insert_date(conn_cnx, db_parameters): assert cursor.execute(bind_query, bind_variables_2).fetchall() == [(None,)] +@pytest.mark.skipolddriver +def test_binding_variant(conn_cnx): + pytest.skip("Server-side binding of VARIANT type is not supported") + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "VARIANT" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type) + cursor.execute( + bind_query, + params=("{'a': 1, 'b': 2, 'c': 3}",), + snowflake_type=snowflake_type, + ) + cursor.execute( + bind_query, + params=({"a": 1, "b": 2, "c": 3},), + snowflake_type=snowflake_type, + ) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == [1, 2, 3] + assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3} + assert json.loads(results[4][0]) == {"a": 1, "b": 2, "c": 3} + + +@pytest.mark.skipolddriver +def test_binding_array_without_schema(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "ARRAY" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("[1, 2, 3]",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type) + cursor.execute( + bind_query, params=(["a", "b", "c"],), snowflake_type=snowflake_type + ) + cursor.execute(bind_query, params=([1, "2", 3],), snowflake_type=snowflake_type) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == [1, 2, 3] + assert json.loads(results[3][0]) == [1, 2, 3] + assert json.loads(results[4][0]) == ["a", "b", "c"] + assert json.loads(results[5][0]) == [1, "2", 3] + + +@pytest.mark.skipolddriver +def test_binding_object_without_schema(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "OBJECT" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute( + bind_query, + params=("{'a': 1, 'b': 2, 'c': 3}",), + snowflake_type=snowflake_type, + ) + cursor.execute( + bind_query, + params=({"a": 1, "b": 2, "c": 3},), + snowflake_type=snowflake_type, + ) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == {"a": 1, "b": 2, "c": 3} + assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3} + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize("snowflake_type", ("ARRAY", "OBJECT")) +def test_semi_structured_binding_fails_when_invalid_type(conn_cnx, snowflake_type): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with pytest.raises(ProgrammingError, match=r"Attempted to insert value"): + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + cursor.execute( + f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});" + ) + cursor.execute(bind_query, params=({1},), snowflake_type=snowflake_type) + + @pytest.mark.skipolddriver def test_bulk_insert_binding_fallback(conn_cnx): """When stage creation fails, bulk inserts falls back to server side binding and disables stage optimization."""