Skip to content

Commit e2798de

Browse files
authored
Merge pull request #3676 from EPajares/main
fix(processes): prevent memory accumulation in DuckLake connections
2 parents bad7752 + 76c7155 commit e2798de

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

apps/processes/src/processes/services/analytics_service.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,21 @@ def preview_sql(
758758
logger.exception("Error previewing SQL")
759759
return {"success": False, "columns": [], "rows": [], "error": str(e)}
760760
finally:
761+
# Explicitly DETACH DuckLake before closing to ensure the extension
762+
# cleans up its internal PostgreSQL connection, libpq buffers, and
763+
# SSL contexts. Without this, con.close() may not fully release
764+
# extension-internal memory, causing slow leaks in long-running services.
765+
if ducklake_layers:
766+
try:
767+
con.execute("DETACH lake")
768+
except Exception:
769+
pass
761770
con.close()
771+
# Force Python GC to trigger C++ destructors for DuckDB/extension
772+
# objects that may hold memory outside Python's allocator
773+
import gc
774+
775+
gc.collect()
762776

763777

764778
# Singleton instance

packages/python/goatlib/src/goatlib/storage/ducklake.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,23 @@ class DuckLakeSettings(Protocol):
136136

137137

138138
class BaseDuckLakeManager:
139-
"""Single DuckDB connection with lock for thread-safety."""
139+
"""Single DuckDB connection with lock for thread-safety.
140+
141+
Connections are automatically recycled after MAX_CONNECTION_AGE_SECONDS
142+
to prevent accumulation of DuckLake metadata cache, libpq buffers,
143+
and SSL contexts in long-running services.
144+
"""
140145

141146
REQUIRED_EXTENSIONS = ["spatial", "httpfs", "postgres", "ducklake"]
142147

148+
# Max age before connection is recycled. Prevents unbounded growth of
149+
# DuckLake metadata cache and libpq/SSL state in long-running processes.
150+
MAX_CONNECTION_AGE_SECONDS = 300 # 5 minutes
151+
143152
def __init__(self: "BaseDuckLakeManager", read_only: bool = False) -> None:
144153
self._connection: duckdb.DuckDBPyConnection | None = None
145154
self._lock = threading.Lock()
155+
self._created_at: float = 0.0
146156
self._postgres_uri: str | None = None
147157
self._storage_path: str | None = None
148158
self._catalog_schema: str | None = None
@@ -208,6 +218,8 @@ def init_from_params(
208218

209219
def _create_connection(self: "BaseDuckLakeManager") -> None:
210220
"""Create and configure the DuckDB connection."""
221+
import time
222+
211223
con = duckdb.connect()
212224
if self._memory_limit:
213225
con.execute(f"SET memory_limit='{self._memory_limit}'")
@@ -223,10 +235,15 @@ def _create_connection(self: "BaseDuckLakeManager") -> None:
223235
self._setup_s3(con)
224236
self._attach_ducklake(con)
225237
self._connection = con
238+
self._created_at = time.time()
226239

227240
def close(self: "BaseDuckLakeManager") -> None:
228-
"""Close the connection."""
241+
"""Close the connection, explicitly detaching DuckLake first."""
229242
if self._connection:
243+
try:
244+
self._connection.execute("DETACH lake")
245+
except Exception:
246+
pass
230247
self._connection.close()
231248
self._connection = None
232249
logger.info("DuckLake connection closed")
@@ -245,14 +262,47 @@ def attach_catalog(
245262
self._setup_s3(con)
246263
self._attach_ducklake(con)
247264

265+
def _recycle_if_stale(self: "BaseDuckLakeManager") -> None:
266+
"""Recreate connection if it has exceeded MAX_CONNECTION_AGE_SECONDS.
267+
268+
Must be called while holding self._lock.
269+
Prevents unbounded growth of DuckLake metadata cache, libpq buffers,
270+
and SSL contexts in long-running services.
271+
"""
272+
import time
273+
274+
if not self._connection or not self._created_at:
275+
return
276+
age = time.time() - self._created_at
277+
if age > self.MAX_CONNECTION_AGE_SECONDS:
278+
logger.info(
279+
"Recycling DuckLake connection (age %.0fs > %ds)",
280+
age,
281+
self.MAX_CONNECTION_AGE_SECONDS,
282+
)
283+
try:
284+
self._connection.execute("DETACH lake")
285+
except Exception:
286+
pass
287+
try:
288+
self._connection.close()
289+
except Exception:
290+
pass
291+
self._create_connection()
292+
248293
@contextmanager
249294
def connection(
250295
self: "BaseDuckLakeManager",
251296
) -> Generator[duckdb.DuckDBPyConnection, None, None]:
252-
"""Get DuckDB connection (with lock)."""
297+
"""Get DuckDB connection (with lock).
298+
299+
Automatically recycles the connection if it has exceeded
300+
MAX_CONNECTION_AGE_SECONDS to prevent memory accumulation.
301+
"""
253302
if not self._connection:
254303
raise RuntimeError("DuckLakeManager not initialized")
255304
with self._lock:
305+
self._recycle_if_stale()
256306
yield self._connection
257307

258308
@contextmanager
@@ -289,6 +339,10 @@ def reconnect(self: "BaseDuckLakeManager") -> None:
289339
"""Reconnect to DuckLake."""
290340
with self._lock:
291341
if self._connection:
342+
try:
343+
self._connection.execute("DETACH lake")
344+
except Exception:
345+
pass
292346
try:
293347
self._connection.close()
294348
except Exception:

0 commit comments

Comments
 (0)