diff --git a/_test_unstructured_client/integration/test_integration.py b/_test_unstructured_client/integration/test_integration.py index 0f6e8d7b..5692a896 100644 --- a/_test_unstructured_client/integration/test_integration.py +++ b/_test_unstructured_client/integration/test_integration.py @@ -50,18 +50,9 @@ def test_partition_strategies(split_pdf, strategy, client, doc_path): assert len(response.elements) -@pytest.fixture(scope="session") -def event_loop(): - """Make the loop session scope to use session async fixtures.""" - policy = asyncio.get_event_loop_policy() - loop = policy.new_event_loop() - yield loop - loop.close() - - @pytest.mark.parametrize("split_pdf", [True, False]) @pytest.mark.parametrize("error", [(500, ServerError), (403, SDKError), (422, HTTPValidationError)]) -def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path, event_loop): +def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path): """ Mock different error responses, assert that the client throws the correct error """ diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 12792c69..9d951604 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -339,6 +339,7 @@ def test_unit_get_page_range_returns_valid_range(page_range, expected_result): async def _request_mock( async_client: httpx.AsyncClient, # not used by mock + limiter: asyncio.Semaphore, # not used by mock fails: bool, content: str) -> requests.Response: response = requests.Response() @@ -405,6 +406,7 @@ async def test_unit_disallow_failed_coroutines( async def _fetch_canceller_error( async_client: httpx.AsyncClient, # not used by mock + limiter: asyncio.Semaphore, # not used by mock fails: bool, content: str, cancelled_counter: Counter): @@ -414,7 +416,7 @@ async def _fetch_canceller_error( print("Doesn't fail") else: print("Fails") - return await _request_mock(async_client=async_client, fails=fails, content=content) + return await _request_mock(async_client=async_client, limiter=limiter, fails=fails, content=content) except asyncio.CancelledError: cancelled_counter.update(["cancelled"]) print(cancelled_counter["cancelled"]) diff --git a/gen.yaml b/gen.yaml index 17a4452f..04b4f767 100644 --- a/gen.yaml +++ b/gen.yaml @@ -25,7 +25,6 @@ python: aiofiles: '>=24.1.0' cryptography: '>=3.1' httpx: '>=0.27.0' - nest-asyncio: '>=1.6.0' pypdf: '>=4.0' requests-toolbelt: '>=1.0.0' authors: diff --git a/poetry.lock b/poetry.lock index 8445ceae..7b55b1b7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -585,18 +585,6 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] -[[package]] -name = "nest-asyncio" -version = "1.6.0" -description = "Patch asyncio to allow nested event loops" -optional = false -python-versions = ">=3.5" -groups = ["main"] -files = [ - {file = "nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c"}, - {file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"}, -] - [[package]] name = "orderly-set" version = "5.2.2" @@ -1140,4 +1128,4 @@ test = ["aiohttp (>=3.10.5)", "flake8 (>=5.0,<6.0)", "mypy (>=0.800)", "psutil", [metadata] lock-version = "2.1" python-versions = ">=3.9" -content-hash = "4898d7795a3536100b31253940f9356d99ba3c82debbb25cf03426e6fb0627dc" +content-hash = "0ccecae9ed522811dc4b06d8ee7e64246d0727d7c88b7f2a3bb41e73f5a7caa1" diff --git a/pyproject.toml b/pyproject.toml index d9e443bc..c94976b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,6 @@ dependencies = [ "cryptography >=3.1", "eval-type-backport >=0.2.0", "httpx >=0.27.0", - "nest-asyncio >=1.6.0", "pydantic >=2.10.3", "pypdf >=4.0", "python-dateutil >=2.8.2", diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 0b6cff20..3b126464 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -9,13 +9,13 @@ import tempfile import uuid from collections.abc import Awaitable +from concurrent import futures from functools import partial from pathlib import Path from typing import Any, Coroutine, Optional, Tuple, Union, cast, Generator, BinaryIO import aiofiles import httpx -import nest_asyncio # type: ignore from httpx import AsyncClient from pypdf import PdfReader, PdfWriter @@ -56,6 +56,11 @@ HI_RES_STRATEGY = 'hi_res' MAX_PAGE_LENGTH = 4000 +def _run_coroutines_in_separate_thread( + coroutines_task: Coroutine[Any, Any, list[tuple[int, httpx.Response]]], +) -> list[tuple[int, httpx.Response]]: + return asyncio.run(coroutines_task) + async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]: response = await coro @@ -64,7 +69,8 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons async def run_tasks( coroutines: list[partial[Coroutine[Any, Any, httpx.Response]]], - allow_failed: bool = False + allow_failed: bool = False, + concurrency_level: int = 10, ) -> list[tuple[int, httpx.Response]]: """Run a list of coroutines in parallel and return the results in order. @@ -80,13 +86,14 @@ async def run_tasks( # Use a variable to adjust the httpx client timeout, or default to 30 minutes # When we're able to reuse the SDK to make these calls, we can remove this var # The SDK timeout will be controlled by parameter + limiter = asyncio.Semaphore(concurrency_level) client_timeout_minutes = 60 if timeout_var := os.getenv("UNSTRUCTURED_CLIENT_TIMEOUT_MINUTES"): client_timeout_minutes = int(timeout_var) client_timeout = httpx.Timeout(60 * client_timeout_minutes) async with httpx.AsyncClient(timeout=client_timeout) as client: - armed_coroutines = [coro(async_client=client) for coro in coroutines] # type: ignore + armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore if allow_failed: responses = await asyncio.gather(*armed_coroutines, return_exceptions=False) return list(enumerate(responses, 1)) @@ -163,8 +170,10 @@ def __init__(self) -> None: self.coroutines_to_execute: dict[ str, list[partial[Coroutine[Any, Any, httpx.Response]]] ] = {} + self.concurrency_level: dict[str, int] = {} self.api_successful_responses: dict[str, list[httpx.Response]] = {} self.api_failed_responses: dict[str, list[httpx.Response]] = {} + self.executors: dict[str, futures.ThreadPoolExecutor] = {} self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {} self.allow_failed: bool = DEFAULT_ALLOW_FAILED self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA @@ -268,10 +277,6 @@ def before_request( logger.warning("Splitting is currently incompatible with uvloop. Continuing without splitting.") return request - # This allows us to use an event loop in an env with an existing loop - # Temporary fix until we can improve the async splitting behavior - nest_asyncio.apply() - # This is our key into coroutines_to_execute # We need to pass it on to after_success so # we know which results are ours @@ -315,13 +320,15 @@ def before_request( fallback_value=DEFAULT_ALLOW_FAILED, ) - concurrency_level = form_utils.get_split_pdf_concurrency_level_param( + self.concurrency_level[operation_id] = form_utils.get_split_pdf_concurrency_level_param( form_data, key=PARTITION_FORM_CONCURRENCY_LEVEL_KEY, fallback_value=DEFAULT_CONCURRENCY_LEVEL, max_allowed=MAX_CONCURRENCY_LEVEL, ) - limiter = asyncio.Semaphore(concurrency_level) + + executor = futures.ThreadPoolExecutor(max_workers=1) + self.executors[operation_id] = executor self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( form_data, @@ -344,7 +351,7 @@ def before_request( page_count = page_range_end - page_range_start + 1 split_size = get_optimal_split_size( - num_pages=page_count, concurrency_level=concurrency_level + num_pages=page_count, concurrency_level=self.concurrency_level[operation_id] ) # If the doc is small enough, and we aren't slicing it with a page range: @@ -387,7 +394,6 @@ def before_request( # in `after_success`. coroutine = partial( self.call_api_partial, - limiter=limiter, operation_id=operation_id, pdf_chunk_request=pdf_chunk_request, pdf_chunk_file=pdf_chunk_file, @@ -605,10 +611,16 @@ def _await_elements(self, operation_id: str) -> Optional[list]: if tasks is None: return None - ioloop = asyncio.get_event_loop() - task_responses: list[tuple[int, httpx.Response]] = ioloop.run_until_complete( - run_tasks(tasks, allow_failed=self.allow_failed) - ) + concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) + coroutines = run_tasks(tasks, allow_failed=self.allow_failed, concurrency_level=concurrency_level) + + # sending the coroutines to a separate thread to avoid blocking the current event loop + # this operation should be removed when the SDK is updated to support async hooks + executor = self.executors.get(operation_id) + if executor is None: + raise RuntimeError("Executor not found for operation_id") + task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines) + task_responses = task_responses_future.result() if task_responses is None: return None @@ -715,6 +727,10 @@ def _clear_operation(self, operation_id: str) -> None: """ self.coroutines_to_execute.pop(operation_id, None) self.api_successful_responses.pop(operation_id, None) + self.concurrency_level.pop(operation_id, None) + executor = self.executors.pop(operation_id, None) + if executor is not None: + executor.shutdown(wait=True) tempdir = self.tempdirs.pop(operation_id, None) if tempdir: tempdir.cleanup()