-
Notifications
You must be signed in to change notification settings - Fork 4
Alternative proposal for concurrent.futures support in effectful
#400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging-llm
Are you sure you want to change the base?
Changes from 4 commits
6fde909
3dd752d
42a4d81
58bc40b
ba879e3
5c6f3c4
5af3568
bd650b7
bfb7d7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,255 @@ | ||
| """ | ||
| Futures handler for effectful - provides integration with concurrent.futures. | ||
|
|
||
| This module provides operations for working with concurrent.futures, allowing | ||
| effectful operations to be executed asynchronously in thread pools with | ||
| automatic preservation of interpretation context. | ||
| """ | ||
|
|
||
| import concurrent.futures as futures | ||
| import functools | ||
| from collections.abc import Callable, Iterable | ||
| from concurrent.futures import Future, ThreadPoolExecutor | ||
| from dataclasses import dataclass | ||
| from typing import Literal | ||
|
|
||
| from effectful.ops.semantics import defop | ||
| from effectful.ops.syntax import ObjectInterpretation, defdata, implements | ||
| from effectful.ops.types import NotHandled, Term | ||
|
|
||
|
|
||
| class Executor: | ||
| """Namespace for executor-related operations.""" | ||
|
|
||
| @staticmethod | ||
| @defop # type: ignore | ||
| def submit[**P, T]( | ||
| task: Callable[P, T], *args: P.args, **kwargs: P.kwargs | ||
| ) -> Future[T]: | ||
| """ | ||
| Submit a task for asynchronous execution. | ||
|
|
||
| This operation should be handled by providing a FuturesInterpretation | ||
| which automatically preserves the interpretation context across thread boundaries. | ||
|
|
||
| :param task: The callable to execute asynchronously | ||
| :param args: Positional arguments for the task | ||
| :param kwargs: Keyword arguments for the task | ||
| :return: A Future representing the asynchronous computation | ||
|
|
||
| Example: | ||
| >>> from concurrent.futures import ThreadPoolExecutor | ||
| >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation | ||
| >>> from effectful.ops.semantics import handler | ||
| >>> | ||
| >>> pool = ThreadPoolExecutor() | ||
| >>> with handler(ThreadPoolFuturesInterpretation(pool)): | ||
| >>> future = Executor.submit(my_function, arg1, arg2) | ||
| """ | ||
| raise NotHandled | ||
|
|
||
| @staticmethod | ||
| @defop | ||
| def map[T, R]( | ||
| func: Callable[[T], R], | ||
| *iterables: Iterable[T], | ||
| timeout: float | None = None, | ||
| chunksize: int = 1, | ||
| ) -> Iterable[R]: | ||
| """ | ||
| Map a function over iterables, executing asynchronously. | ||
|
|
||
| Returns an iterator yielding results as they complete. Equivalent to | ||
| map(func, *iterables) but executes asynchronously. | ||
|
|
||
| This operation should be handled by providing a FuturesInterpretation | ||
| which automatically preserves the interpretation context across thread boundaries. | ||
|
|
||
| :param func: The function to map over the iterables | ||
| :param iterables: One or more iterables to map over | ||
| :param timeout: Maximum time to wait for a result (default: None) | ||
| :param chunksize: Size of chunks for ProcessPoolExecutor (default: 1) | ||
| :return: An iterator yielding results | ||
|
|
||
| Example: | ||
| >>> from effectful.handlers.futures import ThreadPoolFuturesInterpretation | ||
| >>> from effectful.ops.semantics import handler | ||
| >>> | ||
| >>> def square(x): | ||
| >>> return x ** 2 | ||
| >>> | ||
| >>> with handler(ThreadPoolFuturesInterpretation()): | ||
| >>> results = list(Executor.map(square, range(10))) | ||
| >>> print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] | ||
| """ | ||
| raise NotHandled | ||
|
|
||
|
|
||
| class FuturesInterpretation(ObjectInterpretation): | ||
| """ | ||
| Base interpretation for concurrent.futures executors. | ||
|
|
||
| This interpretation automatically preserves the effectful interpretation context | ||
| when submitting tasks to worker threads, ensuring that effectful operations | ||
| work correctly across thread boundaries. | ||
| """ | ||
|
|
||
| def __init__(self, executor: futures.Executor): | ||
| """ | ||
| Initialize the futures interpretation. | ||
|
|
||
| :param executor: The executor to use (ThreadPoolExecutor or ProcessPoolExecutor) | ||
| """ | ||
| super().__init__() | ||
| self.executor: futures.Executor = executor | ||
|
|
||
| def shutdown(self, *args, **kwargs): | ||
| self.executor.shutdown(*args, **kwargs) | ||
|
|
||
| @implements(Executor.submit) | ||
| def submit(self, task: Callable, *args, **kwargs) -> Future: | ||
| """ | ||
| Submit a task to the executor with automatic context preservation. | ||
|
|
||
| Captures the current interpretation context and ensures it is restored | ||
| in the worker thread before executing the task. | ||
| """ | ||
| from effectful.internals.runtime import get_interpretation, interpreter | ||
|
|
||
| # Capture the current interpretation context | ||
| context = get_interpretation() | ||
|
|
||
| @functools.wraps(task) | ||
| def wrapped_task(*task_args, **task_kwargs): | ||
| # Restore the interpretation context in the worker thread | ||
| with interpreter(context): | ||
| return task(*task_args, **task_kwargs) | ||
|
|
||
| # Submit the wrapped task to the underlying executor | ||
| return self.executor.submit(wrapped_task, *args, **kwargs) | ||
|
|
||
| @implements(Executor.map) | ||
| def map(self, func: Callable, *iterables, timeout=None, chunksize=1): | ||
| """ | ||
| Map a function over iterables with automatic context preservation. | ||
|
|
||
| Captures the current interpretation context and ensures it is restored | ||
| in each worker thread before executing the function. | ||
| """ | ||
| from effectful.internals.runtime import get_interpretation, interpreter | ||
|
|
||
| # Capture the current interpretation context | ||
| context = get_interpretation() | ||
|
|
||
| @functools.wraps(func) | ||
| def wrapped_func(*args, **kwargs): | ||
| # Restore the interpretation context in the worker thread | ||
| with interpreter(context): | ||
| return func(*args, **kwargs) | ||
|
|
||
| # Call the executor's map with the wrapped function | ||
| return self.executor.map( | ||
| wrapped_func, *iterables, timeout=timeout, chunksize=chunksize | ||
| ) | ||
|
|
||
|
|
||
| class ThreadPoolFuturesInterpretation(FuturesInterpretation): | ||
| """ | ||
| Interpretation for ThreadPoolExecutor with automatic context preservation. | ||
|
|
||
| Example: | ||
| >>> from concurrent.futures import ThreadPoolExecutor, Future | ||
| >>> from effectful.ops.semantics import defop, handler | ||
| >>> from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation | ||
| >>> | ||
| >>> @defop | ||
| >>> def async_pow(n: int, k: int) -> Future[int]: | ||
| >>> return Executor.submit(pow, n, k) | ||
| >>> | ||
| >>> pool = ThreadPoolExecutor() | ||
| >>> with handler(ThreadPoolFuturesInterpretation(pool)): | ||
| >>> result = async_pow(2, 10).result() | ||
| >>> print(result) # 1024 | ||
| """ | ||
|
|
||
| def __init__(self, max_workers=None): | ||
|
kiranandcode marked this conversation as resolved.
Outdated
|
||
| """ | ||
| Initialize with a ThreadPoolExecutor. | ||
|
|
||
| :param max_workers: Maximum number of worker threads (default: None, uses default from ThreadPoolExecutor) | ||
| """ | ||
| super().__init__(ThreadPoolExecutor(max_workers=max_workers)) | ||
|
|
||
|
|
||
| type ReturnOptions = Literal["All_COMPLETED", "FIRST_COMPLETED", "FIRST_EXCEPTION"] | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class DoneAndNotDoneFutures[T]: | ||
| done: set[Future[T]] | ||
| not_done: set[Future[T]] | ||
|
Comment on lines
+181
to
+184
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this defined in the concurrent.futures library? Do we need our own dataclass definition?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a namedtuple in the library, I added this dataclass because
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There shouldn't be a behavioral difference in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yep will do! |
||
|
|
||
|
|
||
| @defdata.register(DoneAndNotDoneFutures) | ||
| class _DoneAndNotDoneFuturesTerm[T](Term[DoneAndNotDoneFutures[T]]): | ||
| """Term representing a DoneAndNotDoneFutures result.""" | ||
|
|
||
| def __init__(self, op, *args, **kwargs): | ||
| self._op = op | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
|
|
||
| @property | ||
| def op(self): | ||
| return self._op | ||
|
|
||
| @property | ||
| def args(self): | ||
| return self._args | ||
|
|
||
| @property | ||
| def kwargs(self): | ||
| return self._kwargs | ||
|
|
||
| @defop # type: ignore[prop-decorator] | ||
| @property | ||
| def done(self) -> set[Future[T]]: | ||
| """Get the set of done futures.""" | ||
| if not isinstance(self, Term): | ||
| return self.done | ||
| else: | ||
| raise NotHandled | ||
|
|
||
| @defop # type: ignore[prop-decorator] | ||
| @property | ||
| def not_done(self) -> set[Future[T]]: | ||
| """Get the set of not done futures.""" | ||
| if not isinstance(self, Term): | ||
| return self.not_done | ||
| else: | ||
| raise NotHandled | ||
|
|
||
|
|
||
| @defop | ||
| def wait[T]( | ||
| fs: Iterable[Future[T]], | ||
| timeout: int | None = None, | ||
| return_when: ReturnOptions = futures.ALL_COMPLETED, # type: ignore | ||
| ) -> DoneAndNotDoneFutures[T]: | ||
| if ( | ||
| isinstance(timeout, Term) | ||
| or isinstance(return_when, Term) | ||
| or any(not isinstance(t, Future) for t in fs) | ||
| ): | ||
| raise NotHandled | ||
| return futures.wait(fs, timeout, return_when) # type: ignore | ||
|
|
||
|
|
||
| @defop | ||
| def as_completed[T]( | ||
| fs: Iterable[Future[T]], | ||
| timeout: int | None = None, | ||
| ) -> Iterable[Future[T]]: | ||
| if isinstance(timeout, Term) or any(isinstance(t, Term) for t in fs): | ||
| raise NotHandled | ||
| return futures.as_completed(fs, timeout) | ||
Uh oh!
There was an error while loading. Please reload this page.