Skip to content

Commit 40b2c0d

Browse files
sfc-gh-mmishchenkosfc-gh-mkubik
authored andcommitted
SNOW-2028051 introduce a new client_fetch_threads connection parameter to decouple threads number limitations on fetching and pre-fetching (#2255)
1 parent e2fd4a4 commit 40b2c0d

File tree

4 files changed

+25
-1
lines changed

4 files changed

+25
-1
lines changed

DESCRIPTION.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
2222
- Lower log levels from info to debug for some of the messages to make the output easier to follow.
2323
- Allow the connector to inherit a UUID4 generated upstream, provided in statement parameters (field: `requestId`), rather than automatically generate a UUID4 to use for the HTTP Request ID.
2424
- Fix expired S3 credentials update and increment retry when expired credentials are found.
25+
- Added `client_fetch_threads` experimental parameter to better utilize threads for fetching query results.
2526
- Add `snowflake_type` parameter to execute method of a cursor. When set to `OBJECT` or `ARRAY` it allows binding of these semi-structured types.
2627

2728
- v3.14.0(March 03, 2025)

src/snowflake/connector/connection.py

+13
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128

129129
DEFAULT_CLIENT_PREFETCH_THREADS = 4
130130
MAX_CLIENT_PREFETCH_THREADS = 10
131+
MAX_CLIENT_FETCH_THREADS = 1024
131132
DEFAULT_BACKOFF_POLICY = exponential_backoff()
132133

133134

@@ -231,6 +232,7 @@ def _get_private_bytes_from_file(
231232
(type(None), int),
232233
), # snowflake
233234
"client_prefetch_threads": (4, int), # snowflake
235+
"client_fetch_threads": (None, (type(None), int)),
234236
"numpy": (False, bool), # snowflake
235237
"ocsp_response_cache_filename": (None, (type(None), str)), # snowflake internal
236238
"converter_class": (DefaultConverterClass(), SnowflakeConverter),
@@ -420,6 +422,7 @@ class SnowflakeConnection:
420422
See the backoff_policies module for details and implementation examples.
421423
client_session_keep_alive_heartbeat_frequency: Heartbeat frequency to keep connection alive in seconds.
422424
client_prefetch_threads: Number of threads to download the result set.
425+
client_fetch_threads: Number of threads to fetch staged query results.
423426
rest: Snowflake REST API object. Internal use only. Maybe removed in a later release.
424427
application: Application name to communicate with Snowflake as. By default, this is "PythonConnector".
425428
errorhandler: Handler used with errors. By default, an exception will be raised on error.
@@ -684,6 +687,16 @@ def client_prefetch_threads(self, value) -> None:
684687
self._client_prefetch_threads = value
685688
self._validate_client_prefetch_threads()
686689

690+
@property
691+
def client_fetch_threads(self) -> int | None:
692+
return self._client_fetch_threads
693+
694+
@client_fetch_threads.setter
695+
def client_fetch_threads(self, value: None | int) -> None:
696+
if value is not None:
697+
value = min(max(1, value), MAX_CLIENT_FETCH_THREADS)
698+
self._client_fetch_threads = value
699+
687700
@property
688701
def rest(self) -> SnowflakeRestful | None:
689702
return self._rest

src/snowflake/connector/cursor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,8 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None:
12121212
self._result_set = ResultSet(
12131213
self,
12141214
result_chunks,
1215-
self._connection.client_prefetch_threads,
1215+
self._connection.client_fetch_threads
1216+
or self._connection.client_prefetch_threads,
12161217
)
12171218
self._rownumber = -1
12181219
self._result_state = ResultState.VALID

test/integ/test_connection.py

+9
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,15 @@ def test_client_prefetch_threads_setting(conn_cnx):
11411141
assert conn.client_prefetch_threads == new_thread_count
11421142

11431143

1144+
@pytest.mark.skipolddriver
1145+
def test_client_fetch_threads_setting(conn_cnx):
1146+
"""Tests whether client_fetch_threads is None by default and setting the parameter has effect."""
1147+
with conn_cnx() as conn:
1148+
assert conn.client_fetch_threads is None
1149+
conn.client_fetch_threads = 32
1150+
assert conn.client_fetch_threads == 32
1151+
1152+
11441153
@pytest.mark.external
11451154
def test_client_failover_connection_url(conn_cnx):
11461155
with conn_cnx("client_failover") as conn:

0 commit comments

Comments
 (0)