TaskIQ-Asyncpg is a plugin for taskiq that adds a new result backend and broker based on PostgreSQL and Asyncpg.
To use this project you must have installed core taskiq library:
pip install taskiqThis project can be installed using pip:
pip install taskiq-asyncpgusing poetry:
poetry add taskiq-asyncpgusing rye:
rye add taskiq-asyncpgusing uv:
uv add taskiq-asyncpgLet's see the example with the redis broker and PostgreSQL Asyncpg result backend:
# broker.py
import asyncio
from taskiq_asyncpg import AsyncpgResultBackend, AsyncpgBroker
result_backend = AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
)
broker = AsyncpgBroker(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
).with_result_backend(result_backend)
@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(5.5)
print("All problems are solved!")
async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())dsn: connection string to PostgreSQL.keep_results: flag to not remove results from Redis after reading.table_name: name of the table in PostgreSQL to store TaskIQ results.field_for_task_id: type of a field fortask_id, you may need it if you want to have length of task_id more than 255 symbols.serializer: type ofTaskiqAsyncpgSerializerdefault isPickleSerializer**connect_kwargs: additional connection parameters, you can read more about it in Asyncpg repository.