Thanks for fastapi-taskflow — really enjoying it. Hitting one rough edge integrating it into an asyncpg-based FastAPI app and wanted to flag it.
PostgresBackend (backends/postgres.py) is built around sync psycopg2 wrapped in asyncio.to_thread. For apps that are already on asyncpg (and don't otherwise pull in psycopg2) this has a few costs:
- Adds an unwanted DB driver —
fastapi-taskflow[postgres] brings in psycopg2-binary, which is otherwise not in our dep tree.
- No connection reuse — every public method calls
self._connect() → conn.close(). At sustained task volume that's a fresh PG handshake per save / load / claim_pending call, which adds latency and connection-slot pressure.
- Threadpool consumption — every backend call eats an
asyncio.to_thread worker. asyncio's default executor maxes out at min(32, cpu+4) threads; under load the snapshot writer competes with anything else the app puts on the default executor.
- Can't share the app's existing pool — the constructor takes a DSN only, so even apps that already maintain a healthy asyncpg pool can't hand it in.
A few directions that would solve this (any of them would be great):
- Native asyncpg backend alongside
PostgresBackend, e.g. AsyncpgBackend(url=..., pool=None, min_size=1, max_size=5).
- Accept an injected pool:
PostgresBackend(pool=existing_asyncpg_pool) as an alternative to url=, with the existing psycopg2 path kept for users who want the simple "give me a DSN" story.
- Internal connection pooling for the current psycopg2 path (e.g.
psycopg2.pool.ThreadedConnectionPool) — wouldn't fix the dep concern but would address #2 + #3.
Happy to put up a PR for the asyncpg backend if it's a direction you're open to — would just want to align on the API shape (separate class vs PostgresBackend(driver="asyncpg" | "psycopg2")) before writing it.
Thanks for fastapi-taskflow — really enjoying it. Hitting one rough edge integrating it into an asyncpg-based FastAPI app and wanted to flag it.
PostgresBackend(backends/postgres.py) is built around syncpsycopg2wrapped inasyncio.to_thread. For apps that are already onasyncpg(and don't otherwise pull in psycopg2) this has a few costs:fastapi-taskflow[postgres]brings inpsycopg2-binary, which is otherwise not in our dep tree.self._connect()→conn.close(). At sustained task volume that's a fresh PG handshake persave/load/claim_pendingcall, which adds latency and connection-slot pressure.asyncio.to_threadworker.asyncio's default executor maxes out atmin(32, cpu+4)threads; under load the snapshot writer competes with anything else the app puts on the default executor.A few directions that would solve this (any of them would be great):
PostgresBackend, e.g.AsyncpgBackend(url=..., pool=None, min_size=1, max_size=5).PostgresBackend(pool=existing_asyncpg_pool)as an alternative tourl=, with the existing psycopg2 path kept for users who want the simple "give me a DSN" story.psycopg2.pool.ThreadedConnectionPool) — wouldn't fix the dep concern but would address #2 + #3.Happy to put up a PR for the asyncpg backend if it's a direction you're open to — would just want to align on the API shape (separate class vs
PostgresBackend(driver="asyncpg" | "psycopg2")) before writing it.