Skip to content

SNOW-2013774 support server side semi structured types bindings #2269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
- Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID.
- Fix expired S3 credentials update and increment retry when expired credentials are found.
- Added `client_fetch_threads` experimental parameter to better utilize threads for fetching query results.
- Add `snowflake_type` parameter to execute method of a cursor. When set to `OBJECT` or `ARRAY` it allows binding of these semi-structured types.

- v3.14.0(March 03, 2025)
- Bumped pyOpenSSL dependency upper boundary from <25.0.0 to <26.0.0.
Expand Down
119 changes: 99 additions & 20 deletions src/snowflake/connector/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import atexit
import collections.abc
import json
import logging
import os
import pathlib
Expand All @@ -28,6 +29,8 @@
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey

from snowflake.connector.errorcode import ER_NOT_SUPPORT_DATA_TYPE

from . import errors, proxy
from ._query_context_cache import QueryContextCache
from .auth import (
Expand Down Expand Up @@ -1778,36 +1781,112 @@ def _get_snowflake_type_and_binding(
self.converter.to_snowflake_bindings(snowflake_type, v),
)

@staticmethod
def _server_side_binding_supported(snowflake_type: str):
if snowflake_type == "VARIANT":
logger.warning("Server side binding for VARIANT type is not supported.")
return snowflake_type in ("OBJECT", "ARRAY")

def _process_server_side_semi_structured_bindings(
self, cursor: SnowflakeCursor | None, params: Sequence, snowflake_type: str
) -> dict[str, dict[str, str]]:
processed_params = {}

if snowflake_type == "OBJECT":
for idx, v in enumerate(params):
if isinstance(v, dict):
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"fmt": "json",
"value": json.dumps(v),
}
elif isinstance(v, str) or v is None:
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"fmt": "json",
"value": v,
}
else:
Error.errorhandler_wrapper(
self,
cursor,
ProgrammingError,
{
"msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.",
"errno": ER_NOT_SUPPORT_DATA_TYPE,
},
)
elif snowflake_type == "ARRAY":
for idx, v in enumerate(params):
if isinstance(v, (tuple, list)):
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"fmt": "json",
"value": json.dumps(v),
}
elif isinstance(v, str) or v is None:
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"fmt": "json",
"value": v,
}
else:
Error.errorhandler_wrapper(
self,
cursor,
ProgrammingError,
{
"msg": f"Attempted to insert value {v} as {snowflake_type} but it's of an unsupported type: {type(v)}.",
"errno": ER_NOT_SUPPORT_DATA_TYPE,
},
)

return processed_params

# TODO we could probably rework this to not make dicts like this: {'1': 'value', '2': '13'}
def _process_params_qmarks(
self,
params: Sequence | None,
cursor: SnowflakeCursor | None = None,
snowflake_type: str | None = None,
) -> dict[str, dict[str, str]] | None:
if not params:
return None
processed_params = {}

get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor)

for idx, v in enumerate(params):
if isinstance(v, list):
snowflake_type = self.converter.snowflake_type(v)
all_param_data = list(map(get_type_and_binding, v))
first_type = all_param_data[0].type
# if all elements have the same snowflake type, update snowflake_type
if all(param_data.type == first_type for param_data in all_param_data):
snowflake_type = first_type
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"value": [param_data.binding for param_data in all_param_data],
}
else:
snowflake_type, snowflake_binding = get_type_and_binding(v)
processed_params[str(idx + 1)] = {
"type": snowflake_type,
"value": snowflake_binding,
}
if self._server_side_binding_supported(snowflake_type):
processed_params = self._process_server_side_semi_structured_bindings(
cursor, params, snowflake_type
)

else:
get_type_and_binding = partial(self._get_snowflake_type_and_binding, cursor)

for idx, v in enumerate(params):
if isinstance(v, list):
inferred_snowflake_type = self.converter.snowflake_type(v)
all_param_data = list(map(get_type_and_binding, v))
first_type = all_param_data[0].type
# if all elements have the same snowflake type, update snowflake_type
if all(
param_data.type == first_type for param_data in all_param_data
):
inferred_snowflake_type = first_type
if snowflake_type and inferred_snowflake_type != snowflake_type:
logger.warning(
f"Inferred snowflake type: {inferred_snowflake_type} is different than provided: {snowflake_type}. Proceeding with provided one. Omit this parameter in order to proceed with inferred type",
)
target_type = inferred_snowflake_type
processed_params[str(idx + 1)] = {
"type": target_type,
"value": [param_data.binding for param_data in all_param_data],
}
else:
target_type, snowflake_binding = get_type_and_binding(v)
processed_params[str(idx + 1)] = {
"type": target_type,
"value": snowflake_binding,
}
if logger.getEffectiveLevel() <= logging.DEBUG:
for k, v in processed_params.items():
logger.debug("idx: %s, type: %s", k, v.get("type"))
Expand Down
6 changes: 5 additions & 1 deletion src/snowflake/connector/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ def execute(
file_stream: IO[bytes] | None = None,
num_statements: int | None = None,
_dataframe_ast: str | None = None,
snowflake_type: str | None = None,
) -> Self | None: ...

@overload
Expand Down Expand Up @@ -869,6 +870,7 @@ def execute(
file_stream: IO[bytes] | None = None,
num_statements: int | None = None,
_dataframe_ast: str | None = None,
snowflake_type: str | None = None,
) -> dict[str, Any] | None: ...

def execute(
Expand Down Expand Up @@ -899,6 +901,7 @@ def execute(
num_statements: int | None = None,
_force_qmark_paramstyle: bool = False,
_dataframe_ast: str | None = None,
snowflake_type: str | None = None,
) -> Self | dict[str, Any] | None:
"""Executes a command/query.

Expand Down Expand Up @@ -935,6 +938,7 @@ def execute(
statements being submitted (or 0 if submitting an uncounted number) when using a multi-statement query.
_force_qmark_paramstyle: Force the use of qmark paramstyle regardless of the connection's paramstyle.
_dataframe_ast: Base64-encoded dataframe request abstract syntax tree.
snowflake_type: Type for parameter binding. Enables server-side semi-structured types binding if set to ARRAY, OBJECT or VARIANT.

Returns:
The cursor itself, or None if some error happened, or the response returned
Expand Down Expand Up @@ -1000,7 +1004,7 @@ def execute(
)

kwargs["binding_params"] = self._connection._process_params_qmarks(
params, self
params, self, snowflake_type=snowflake_type
)

m = DESC_TABLE_RE.match(query)
Expand Down
95 changes: 95 additions & 0 deletions test/integ/test_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import calendar
import json
import tempfile
import time
from datetime import date, datetime
Expand Down Expand Up @@ -489,6 +490,100 @@ def test_binding_insert_date(conn_cnx, db_parameters):
assert cursor.execute(bind_query, bind_variables_2).fetchall() == [(None,)]


@pytest.mark.skipolddriver
def test_binding_variant(conn_cnx):
pytest.skip("Server-side binding of VARIANT type is not supported")
bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)"
with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor:
snowflake_type = "VARIANT"
cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});")
cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type)
cursor.execute(
bind_query,
params=("{'a': 1, 'b': 2, 'c': 3}",),
snowflake_type=snowflake_type,
)
cursor.execute(
bind_query,
params=({"a": 1, "b": 2, "c": 3},),
snowflake_type=snowflake_type,
)

results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall()

assert results[0][0] is None
assert results[1][0] is None
assert json.loads(results[2][0]) == [1, 2, 3]
assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3}
assert json.loads(results[4][0]) == {"a": 1, "b": 2, "c": 3}


@pytest.mark.skipolddriver
def test_binding_array_without_schema(conn_cnx):
bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)"
with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor:
snowflake_type = "ARRAY"
cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});")
cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=("[1, 2, 3]",), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=([1, 2, 3],), snowflake_type=snowflake_type)
cursor.execute(
bind_query, params=(["a", "b", "c"],), snowflake_type=snowflake_type
)
cursor.execute(bind_query, params=([1, "2", 3],), snowflake_type=snowflake_type)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was also an option to introduce a wrapper types or functions to point that a semi-structured binding should be used? I.e. in that case this line would look like:
cursor.execute(bind_query, params=(snow_array([1, "2", 3]),))
Or snow_object in the test below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think of it but that's a pretty elegant idea, let me adjust the code


results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall()

assert results[0][0] is None
assert results[1][0] is None
assert json.loads(results[2][0]) == [1, 2, 3]
assert json.loads(results[3][0]) == [1, 2, 3]
assert json.loads(results[4][0]) == ["a", "b", "c"]
assert json.loads(results[5][0]) == [1, "2", 3]


@pytest.mark.skipolddriver
def test_binding_object_without_schema(conn_cnx):
bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)"
with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor:
snowflake_type = "OBJECT"
cursor.execute(f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});")
cursor.execute(bind_query, params=(None,), snowflake_type=snowflake_type)
cursor.execute(bind_query, params=("",), snowflake_type=snowflake_type)
cursor.execute(
bind_query,
params=("{'a': 1, 'b': 2, 'c': 3}",),
snowflake_type=snowflake_type,
)
cursor.execute(
bind_query,
params=({"a": 1, "b": 2, "c": 3},),
snowflake_type=snowflake_type,
)

results = cursor.execute("SELECT * FROM TEST_TABLE1").fetchall()

assert results[0][0] is None
assert results[1][0] is None
assert json.loads(results[2][0]) == {"a": 1, "b": 2, "c": 3}
assert json.loads(results[3][0]) == {"a": 1, "b": 2, "c": 3}


@pytest.mark.skipolddriver
@pytest.mark.parametrize("snowflake_type", ("ARRAY", "OBJECT"))
def test_semi_structured_binding_fails_when_invalid_type(conn_cnx, snowflake_type):
bind_query = "INSERT INTO TEST_TABLE1 SELECT (?)"
with pytest.raises(ProgrammingError, match=r"Attempted to insert value"):
with conn_cnx(paramstyle="qmark") as cnx, cnx.cursor() as cursor:
cursor.execute(
f"CREATE OR REPLACE TABLE TEST_TABLE1 (col1 {snowflake_type});"
)
cursor.execute(bind_query, params=({1},), snowflake_type=snowflake_type)


@pytest.mark.skipolddriver
def test_bulk_insert_binding_fallback(conn_cnx):
"""When stage creation fails, bulk inserts falls back to server side binding and disables stage optimization."""
Expand Down
Loading