From 6f7f2f2032c7bc1badfed900b12d186b34ec9655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Wed, 9 Apr 2025 16:57:46 +0200 Subject: [PATCH 1/4] first iteration of server-side semi structured types bindings # Conflicts: # src/snowflake/connector/connection.py --- src/snowflake/connector/connection.py | 104 +++++++++++++++++++++----- src/snowflake/connector/cursor.py | 6 +- test/integ/test_bindings.py | 94 +++++++++++++++++++++++ 3 files changed, 183 insertions(+), 21 deletions(-) diff --git a/src/snowflake/connector/connection.py b/src/snowflake/connector/connection.py index c1bcd7753..391ef403c 100644 --- a/src/snowflake/connector/connection.py +++ b/src/snowflake/connector/connection.py @@ -3,6 +3,7 @@ import atexit import collections.abc +import json import logging import os import pathlib @@ -1783,31 +1784,94 @@ def _process_params_qmarks( self, params: Sequence | None, cursor: SnowflakeCursor | None = None, + snowflake_type: str | None = None, ) -> dict[str, dict[str, str]] | None: if not params: return None processed_params = {} - get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor) - - for idx, v in enumerate(params): - if isinstance(v, list): - snowflake_type = self.converter.snowflake_type(v) - all_param_data = list(map(get_type_and_binding, v)) - first_type = all_param_data[0].type - # if all elements have the same snowflake type, update snowflake_type - if all(param_data.type == first_type for param_data in all_param_data): - snowflake_type = first_type - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "value": [param_data.binding for param_data in all_param_data], - } - else: - snowflake_type, snowflake_binding = get_type_and_binding(v) - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "value": snowflake_binding, - } + if snowflake_type == "OBJECT": + for idx, v in enumerate(params): + if isinstance(v, dict): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": json.dumps(v), + } + elif isinstance(v, str): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": v, + } + else: + raise ValueError() + elif snowflake_type == "ARRAY": + for idx, v in enumerate(params): + if isinstance(v, (tuple, list)): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": json.dumps(v), + } + elif isinstance(v, str): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": v, + } + else: + raise ValueError() + elif snowflake_type == "VARIANT": + for idx, v in enumerate(params): + # TODO: handle None values + if isinstance(v, str): + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": v, + } + else: + try: + value = json.dumps(v) + except TypeError: + raise ValueError() + + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": value, + } + else: + get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor) + + for idx, v in enumerate(params): + if isinstance(v, list): + inferred_snowflake_type = self.converter.snowflake_type(v) + all_param_data = list(map(get_type_and_binding, v)) + first_type = all_param_data[0].type + # if all elements have the same snowflake type, update snowflake_type + if all( + param_data.type == first_type for param_data in all_param_data + ): + inferred_snowflake_type = first_type + if inferred_snowflake_type != snowflake_type: + logger.warning( + "Inferred snowflake type: {} is different than provided: {}. Proceeding with provided one. Omit this parameter in order to proceed with inferred type", + inferred_snowflake_type, + snowflake_type, + ) + snowflake_type = inferred_snowflake_type + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "value": [param_data.binding for param_data in all_param_data], + } + else: + snowflake_type, snowflake_binding = get_type_and_binding(v) + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "value": snowflake_binding, + } if logger.getEffectiveLevel() <= logging.DEBUG: for k, v in processed_params.items(): logger.debug("idx: %s, type: %s", k, v.get("type")) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index ae94d4747..e87fa12d5 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -839,6 +839,7 @@ def execute( file_stream: IO[bytes] | None = None, num_statements: int | None = None, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> Self | None: ... @overload @@ -869,6 +870,7 @@ def execute( file_stream: IO[bytes] | None = None, num_statements: int | None = None, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> dict[str, Any] | None: ... def execute( @@ -899,6 +901,7 @@ def execute( num_statements: int | None = None, _force_qmark_paramstyle: bool = False, _dataframe_ast: str | None = None, + snowflake_type: str | None = None, ) -> Self | dict[str, Any] | None: """Executes a command/query. @@ -935,6 +938,7 @@ def execute( statements being submitted (or 0 if submitting an uncounted number) when using a multi-statement query. _force_qmark_paramstyle: Force the use of qmark paramstyle regardless of the connection's paramstyle. _dataframe_ast: Base64-encoded dataframe request abstract syntax tree. + snowflake_type: Type for parameter binding. Enables server-side semi-structured types binding if set to ARRAY, OBJECT or VARIANT. Returns: The cursor itself, or None if some error happened, or the response returned @@ -1000,7 +1004,7 @@ def execute( ) kwargs["binding_params"] = self._connection._process_params_qmarks( - params, self + params, self, snowflake_type=snowflake_type ) m = DESC_TABLE_RE.match(query) diff --git a/test/integ/test_bindings.py b/test/integ/test_bindings.py index e5820c199..e3064af34 100644 --- a/test/integ/test_bindings.py +++ b/test/integ/test_bindings.py @@ -2,6 +2,7 @@ from __future__ import annotations import calendar +import json import tempfile import time from datetime import date, datetime @@ -489,6 +490,99 @@ def test_binding_insert_date(conn_cnx, db_parameters): assert cursor.execute(bind_query, bind_variables_2).fetchall() == [(None,)] +@pytest.mark.skipolddriver +def test_binding_variant(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "VARIANT" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type) + cursor.execute( + bind_query, + params=("{'a': 1, 'b': 2, 'c': 3}",), + snowflake_type=snowflake_type, + ) + cursor.execute( + bind_query, + params=({"a": 1, "b": 2, "c": 3},), + snowflake_type=snowflake_type, + ) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == [1, 2, 3] + assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3} + assert json.loads(results[4][0]) == {"a": 1, "b": 2, "c": 3} + + +@pytest.mark.skipolddriver +def test_binding_array_without_schema(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "ARRAY" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("[1, 2, 3]",), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type) + cursor.execute( + bind_query, params=(["a", "b", "c"],), snowflake_type=snowflake_type + ) + cursor.execute(bind_query, params=([1, "2", 3],), snowflake_type=snowflake_type) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == [1, 2, 3] + assert json.loads(results[3][0]) == [1, 2, 3] + assert json.loads(results[4][0]) == ["a", "b", "c"] + assert json.loads(results[5][0]) == [1, "2", 3] + + +@pytest.mark.skipolddriver +def test_binding_object_without_schema(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + snowflake_type = "OBJECT" + cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});") + cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type) + cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type) + cursor.execute( + bind_query, + params=("{'a': 1, 'b': 2, 'c': 3}",), + snowflake_type=snowflake_type, + ) + cursor.execute( + bind_query, + params=({"a": 1, "b": 2, "c": 3},), + snowflake_type=snowflake_type, + ) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert results[0][0] is None + assert results[1][0] is None + assert json.loads(results[2][0]) == {"a": 1, "b": 2, "c": 3} + assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3} + + +@pytest.mark.skipolddriver +def test_binding_of_primitives(conn_cnx): + bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" + with conn_cnx(paramstyle="numeric") as cnx, cnx.cursor() as cursor: + cursor.execute("CREATE OR REPLACE TABLE TEST_TABLE1 (col1 ARRAY); ") + cursor.execute(bind_query, params=([1, 2, 3],)) + + results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() + + assert json.loads(results[0][0]) == 1 + + @pytest.mark.skipolddriver def test_bulk_insert_binding_fallback(conn_cnx): """When stage creation fails, bulk inserts falls back to server side binding and disables stage optimization.""" From 7aaf1d2572e2ba6d233bd6fda3b7e39c8c97f1a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Mon, 14 Apr 2025 15:37:42 +0200 Subject: [PATCH 2/4] different processing of None values, throwing proper errors --- src/snowflake/connector/connection.py | 103 ++++++++++++++++++-------- test/integ/test_bindings.py | 16 ++-- 2 files changed, 82 insertions(+), 37 deletions(-) diff --git a/src/snowflake/connector/connection.py b/src/snowflake/connector/connection.py index 391ef403c..0c5bc7066 100644 --- a/src/snowflake/connector/connection.py +++ b/src/snowflake/connector/connection.py @@ -29,6 +29,8 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from snowflake.connector.errorcode import ER_NOT_SUPPORT_DATA_TYPE + from . import errors, proxy from ._query_context_cache import QueryContextCache from .auth import ( @@ -1779,15 +1781,13 @@ def _get_snowflake_type_and_binding( self.converter.to_snowflake_bindings(snowflake_type, v), ) - # TODO we could probably rework this to not make dicts like this: {'1': 'value', '2': '13'} - def _process_params_qmarks( - self, - params: Sequence | None, - cursor: SnowflakeCursor | None = None, - snowflake_type: str | None = None, - ) -> dict[str, dict[str, str]] | None: - if not params: - return None + @staticmethod + def _is_semi_structured_type(snowflake_type: str): + return snowflake_type in ("OBJECT", "ARRAY", "VARIANT") + + def _process_server_side_semi_structured_bindings( + self, cursor: SnowflakeCursor | None, params: Sequence, snowflake_type: str + ) -> dict[str, dict[str, str]]: processed_params = {} if snowflake_type == "OBJECT": @@ -1798,14 +1798,22 @@ def _process_params_qmarks( "fmt": "json", "value": json.dumps(v), } - elif isinstance(v, str): + elif isinstance(v, str) or v is None: processed_params[str(idx + 1)] = { "type": snowflake_type, "fmt": "json", "value": v, } else: - raise ValueError() + Error.errorhandler_wrapper( + self, + cursor, + ProgrammingError, + { + "msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.", + "errno": ER_NOT_SUPPORT_DATA_TYPE, + }, + ) elif snowflake_type == "ARRAY": for idx, v in enumerate(params): if isinstance(v, (tuple, list)): @@ -1814,34 +1822,73 @@ def _process_params_qmarks( "fmt": "json", "value": json.dumps(v), } - elif isinstance(v, str): + elif isinstance(v, str) or v is None: processed_params[str(idx + 1)] = { "type": snowflake_type, "fmt": "json", "value": v, } else: - raise ValueError() + Error.errorhandler_wrapper( + self, + cursor, + ProgrammingError, + { + "msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.", + "errno": ER_NOT_SUPPORT_DATA_TYPE, + }, + ) elif snowflake_type == "VARIANT": + snowflake_type = "TEXT" for idx, v in enumerate(params): - # TODO: handle None values - if isinstance(v, str): + if isinstance(v, str) or v is None: processed_params[str(idx + 1)] = { "type": snowflake_type, "fmt": "json", "value": v, } else: + value = None try: value = json.dumps(v) except TypeError: - raise ValueError() + Error.errorhandler_wrapper( + self, + cursor, + ProgrammingError, + { + "msg": "Attempted to insert value {} as {} but the it's of an unsupported type: {}.".format( + v, snowflake_type, type(v) + ), + "errno": ER_NOT_SUPPORT_DATA_TYPE, + }, + ) + + if value is not None: + processed_params[str(idx + 1)] = { + "type": snowflake_type, + "fmt": "json", + "value": value, + } + + return processed_params + + # TODO we could probably rework this to not make dicts like this: {'1': 'value', '2': '13'} + def _process_params_qmarks( + self, + params: Sequence | None, + cursor: SnowflakeCursor | None = None, + snowflake_type: str | None = None, + ) -> dict[str, dict[str, str]] | None: + if not params: + return None + processed_params = {} + + if self._is_semi_structured_type(snowflake_type): + processed_params = self._process_server_side_semi_structured_bindings( + cursor, params, snowflake_type + ) - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "fmt": "json", - "value": value, - } else: get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor) @@ -1855,21 +1902,19 @@ def _process_params_qmarks( param_data.type == first_type for param_data in all_param_data ): inferred_snowflake_type = first_type - if inferred_snowflake_type != snowflake_type: + if snowflake_type and inferred_snowflake_type != snowflake_type: logger.warning( - "Inferred snowflake type: {} is different than provided: {}. Proceeding with provided one. Omit this parameter in order to proceed with inferred type", - inferred_snowflake_type, - snowflake_type, + f"Inferred snowflake type: {inferred_snowflake_type} is different than provided: {snowflake_type}. Proceeding with provided one. Omit this parameter in order to proceed with inferred type", ) - snowflake_type = inferred_snowflake_type + target_type = inferred_snowflake_type processed_params[str(idx + 1)] = { - "type": snowflake_type, + "type": target_type, "value": [param_data.binding for param_data in all_param_data], } else: - snowflake_type, snowflake_binding = get_type_and_binding(v) + target_type, snowflake_binding = get_type_and_binding(v) processed_params[str(idx + 1)] = { - "type": snowflake_type, + "type": target_type, "value": snowflake_binding, } if logger.getEffectiveLevel() <= logging.DEBUG: diff --git a/test/integ/test_bindings.py b/test/integ/test_bindings.py index e3064af34..e3909b316 100644 --- a/test/integ/test_bindings.py +++ b/test/integ/test_bindings.py @@ -572,15 +572,15 @@ def test_binding_object_without_schema(conn_cnx): @pytest.mark.skipolddriver -def test_binding_of_primitives(conn_cnx): +@pytest.mark.parametrize("snowflake_type", ("ARRAY", "OBJECT")) +def test_semi_structured_binding_fails_when_invalid_type(conn_cnx, snowflake_type): bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" - with conn_cnx(paramstyle="numeric") as cnx, cnx.cursor() as cursor: - cursor.execute("CREATE OR REPLACE TABLE TEST_TABLE1 (col1 ARRAY); ") - cursor.execute(bind_query, params=([1, 2, 3],)) - - results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall() - - assert json.loads(results[0][0]) == 1 + with pytest.raises(ProgrammingError, match=r"Attempted to insert value"): + with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: + cursor.execute( + f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});" + ) + cursor.execute(bind_query, params=({1},), snowflake_type=snowflake_type) @pytest.mark.skipolddriver From e3dab8231138e7da4ac1d14fddecfd0c673189cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Mon, 14 Apr 2025 17:00:37 +0200 Subject: [PATCH 3/4] remove implementation for variant type, mark variant tests as skipped --- DESCRIPTION.md | 1 + src/snowflake/connector/connection.py | 40 ++++----------------------- test/integ/test_bindings.py | 1 + 3 files changed, 7 insertions(+), 35 deletions(-) diff --git a/DESCRIPTION.md b/DESCRIPTION.md index eac6d7643..0ae980e5b 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -23,6 +23,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne - Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID. - Fix expired S3 credentials update and increment retry when expired credentials are found. - Added `client_fetch_threads` experimental parameter to better utilize threads for fetching query results. + - Add `snowflake_type` parameter to execute method of a cursor. When set to `OBJECT` or `ARRAY` it allows binding of these semi-structured types. - v3.14.0(March 03, 2025) - Bumped pyOpenSSL dependency upper boundary from <25.0.0 to <26.0.0. diff --git a/src/snowflake/connector/connection.py b/src/snowflake/connector/connection.py index 0c5bc7066..1e39981a1 100644 --- a/src/snowflake/connector/connection.py +++ b/src/snowflake/connector/connection.py @@ -1782,8 +1782,10 @@ def _get_snowflake_type_and_binding( ) @staticmethod - def _is_semi_structured_type(snowflake_type: str): - return snowflake_type in ("OBJECT", "ARRAY", "VARIANT") + def _server_side_binding_supported(snowflake_type: str): + if snowflake_type == "VARIANT": + logger.warning("Server side binding for VARIANT type is not supported.") + return snowflake_type in ("OBJECT", "ARRAY") def _process_server_side_semi_structured_bindings( self, cursor: SnowflakeCursor | None, params: Sequence, snowflake_type: str @@ -1838,38 +1840,6 @@ def _process_server_side_semi_structured_bindings( "errno": ER_NOT_SUPPORT_DATA_TYPE, }, ) - elif snowflake_type == "VARIANT": - snowflake_type = "TEXT" - for idx, v in enumerate(params): - if isinstance(v, str) or v is None: - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "fmt": "json", - "value": v, - } - else: - value = None - try: - value = json.dumps(v) - except TypeError: - Error.errorhandler_wrapper( - self, - cursor, - ProgrammingError, - { - "msg": "Attempted to insert value {} as {} but the it's of an unsupported type: {}.".format( - v, snowflake_type, type(v) - ), - "errno": ER_NOT_SUPPORT_DATA_TYPE, - }, - ) - - if value is not None: - processed_params[str(idx + 1)] = { - "type": snowflake_type, - "fmt": "json", - "value": value, - } return processed_params @@ -1884,7 +1854,7 @@ def _process_params_qmarks( return None processed_params = {} - if self._is_semi_structured_type(snowflake_type): + if self._server_side_binding_supported(snowflake_type): processed_params = self._process_server_side_semi_structured_bindings( cursor, params, snowflake_type ) diff --git a/test/integ/test_bindings.py b/test/integ/test_bindings.py index e3909b316..da6e38fe0 100644 --- a/test/integ/test_bindings.py +++ b/test/integ/test_bindings.py @@ -492,6 +492,7 @@ def test_binding_insert_date(conn_cnx, db_parameters): @pytest.mark.skipolddriver def test_binding_variant(conn_cnx): + pytest.skip("Server side binding of VARIANT type is not supported") bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: snowflake_type = "VARIANT" From 930573f73a0feb552fa45e221f556ded1e329622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Kubik?= Date: Mon, 14 Apr 2025 17:00:45 +0200 Subject: [PATCH 4/4] remove implementation for variant type, mark variant tests as skipped --- test/integ/test_bindings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integ/test_bindings.py b/test/integ/test_bindings.py index da6e38fe0..c2da1ff51 100644 --- a/test/integ/test_bindings.py +++ b/test/integ/test_bindings.py @@ -492,7 +492,7 @@ def test_binding_insert_date(conn_cnx, db_parameters): @pytest.mark.skipolddriver def test_binding_variant(conn_cnx): - pytest.skip("Server side binding of VARIANT type is not supported") + pytest.skip("Server-side binding of VARIANT type is not supported") bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)" with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor: snowflake_type = "VARIANT"