diff --git a/DESCRIPTION.md b/DESCRIPTION.md index 4d7fd13fc..247548afe 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -8,6 +8,10 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne # Release Notes +- v3.12.0(TBD) + - Optimized `to_pandas()` performance by fully parallel downloading logic. + + - v3.11.0(June 17,2024) - Added support for `token_file_path` connection parameter to read an OAuth token from a file when connecting to Snowflake. - Added support for `debug_arrow_chunk` connection parameter to allow debugging raw arrow data in case of arrow data parsing failure. diff --git a/src/snowflake/connector/result_set.py b/src/snowflake/connector/result_set.py index 5b40e7d57..89fd756f8 100644 --- a/src/snowflake/connector/result_set.py +++ b/src/snowflake/connector/result_set.py @@ -6,7 +6,7 @@ import inspect from collections import deque -from concurrent.futures import Future +from concurrent.futures import ALL_COMPLETED, Future, wait from concurrent.futures.thread import ThreadPoolExecutor from logging import getLogger from typing import ( @@ -63,41 +63,20 @@ def result_set_iterator( """ with ThreadPoolExecutor(prefetch_thread_num) as pool: - # Fill up window - logger.debug("beginning to schedule result batch downloads") - - for _ in range(min(prefetch_thread_num, len(unfetched_batches))): + while unfetched_batches: logger.debug( f"queuing download of result batch id: {unfetched_batches[0].id}" ) - unconsumed_batches.append( - pool.submit(unfetched_batches.popleft().create_iter, **kw) - ) - + future = pool.submit(unfetched_batches.popleft().create_iter, **kw) + unconsumed_batches.append(future) yield from first_batch_iter - + _, _ = wait(unconsumed_batches, return_when=ALL_COMPLETED) i = 1 while unconsumed_batches: - logger.debug(f"user requesting to consume result batch {i}") - - # Submit the next un-fetched batch to the pool - if unfetched_batches: - logger.debug( - f"queuing download of result batch id: {unfetched_batches[0].id}" - ) - future = pool.submit(unfetched_batches.popleft().create_iter, **kw) - unconsumed_batches.append(future) - - future = unconsumed_batches.popleft() - - # this will raise an exception if one has occurred - batch_iterator = future.result() - logger.debug(f"user began consuming result batch {i}") - yield from batch_iterator - logger.debug(f"user finished consuming result batch {i}") - + yield from unconsumed_batches.popleft().result() + logger.debug(f"user began consuming result batch {i}") i += 1 final()