Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ FalkorDB Python client

see [docs](http://falkordb-py.readthedocs.io/)

## Installation
```sh
pip install FalkorDB
```
## Installation
```sh
pip install FalkorDB
```

Embedded mode (local in-process server via optional binaries):
```sh
pip install "FalkorDB[lite]"
```

## Usage

Expand All @@ -32,8 +37,8 @@ Or use [FalkorDB Cloud](https://app.falkordb.cloud)
```python
from falkordb import FalkorDB

# Connect to FalkorDB
db = FalkorDB(host='localhost', port=6379)
# Connect to FalkorDB
db = FalkorDB(host='localhost', port=6379)

# Select the social graph
g = db.select_graph('social')
Expand All @@ -49,9 +54,14 @@ nodes = g.ro_query('MATCH (n) RETURN n LIMIT 10').result_set
# Copy the Graph
copy_graph = g.copy('social_copy')

# Delete the Graph
g.delete()
```
# Delete the Graph
g.delete()

# Embedded FalkorDB (no external server)
with FalkorDB(embedded=True, db_path="/tmp/social.rdb") as embedded_db:
eg = embedded_db.select_graph("embedded_social")
eg.query('CREATE (:Person {name: "Alice"})')
```

### Asynchronous Example

Expand All @@ -63,8 +73,8 @@ from redis.asyncio import BlockingConnectionPool
async def main():

# Connect to FalkorDB
pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True)
db = FalkorDB(connection_pool=pool)
pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True)
db = FalkorDB(connection_pool=pool)

# Select the social graph
g = db.select_graph('social')
Expand All @@ -90,8 +100,13 @@ async def main():
print(f"Created Alice: {results[1].result_set[0][0]}")
print(f"Created Bob: {results[2].result_set[0][0]}")

# Close the connection when done
await pool.aclose()
# Close the connection when done
await pool.aclose()

# Embedded mode (same API)
async with FalkorDB(embedded=True) as embedded_db:
embedded_graph = embedded_db.select_graph("embedded_social")
await embedded_graph.query('CREATE (:Person {name: "Bob"})')

# Run the async example
if __name__ == "__main__":
Expand Down
107 changes: 83 additions & 24 deletions falkordb/asyncio/falkordb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import redis.asyncio as redis
from .cluster import *
from .graph import AsyncGraph
Expand Down Expand Up @@ -64,31 +65,77 @@ def __init__(
reinitialize_steps=5,
read_from_replicas=False,
address_remap=None,
embedded=False,
db_path=None,
embedded_config=None,
startup_timeout=10.0,
connection_acquire_timeout=5.0,
):
self._embedded_server = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

AttributeError: 'FalkorDB' object has no attribute '_embedded_server' — CI test failure.

Although __init__ sets self._embedded_server = None at line 80, the CI test fails with AttributeError at line 282, indicating instances are reaching aclose() without _embedded_server set (e.g., via object.__new__, a patched __init__, or mock-created instances in existing tests). Add a class-level declaration as a safe default:

🐛 Proposed fix
 class FalkorDB:
     """
     Asynchronous FalkorDB Class for interacting with a FalkorDB server.
     ...
     """
+    _embedded_server = None
+
     def __init__(
         self,
         ...
🧰 Tools
🪛 GitHub Actions: Lint

[error] Ruff format check would reformat this file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@falkordb/asyncio/falkordb.py` at line 80, The class lacks a class-level
default for _embedded_server which leads to AttributeError when instances are
created without running __init__ (f.e. in tests); add a class attribute
declaration on the FalkorDB class like "_embedded_server = None" so any instance
(including ones created via object.__new__ or mocks) always has this attribute
defined, and ensure the aclose() method still checks self._embedded_server
before using it (referencing FalkorDB, _embedded_server, and aclose).


conn = redis.Redis(host=host, port=port, db=0, password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding, encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error, ssl=ssl,
ssl_keyfile=ssl_keyfile, ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name, lib_name=lib_name,
lib_version=lib_version, username=username,
retry=retry, redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol)
if embedded:
from ..lite.server import EmbeddedServer

if max_connections is None:
max_connections = 16

server = EmbeddedServer(
db_path=db_path,
config=embedded_config,
startup_timeout=startup_timeout,
)
self._embedded_server = server
connection_pool = redis.BlockingConnectionPool(
connection_class=redis.UnixDomainSocketConnection,
path=server.unix_socket_path,
max_connections=max_connections,
timeout=connection_acquire_timeout,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
retry=retry,
health_check_interval=health_check_interval,
client_name=client_name,
lib_name=lib_name,
lib_version=lib_version,
username=username,
password=password,
credential_provider=credential_provider,
protocol=protocol,
)
Comment on lines +84 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking synchronous calls inside async __init__ will stall the event loop.

EmbeddedServer() (line 80) is fully synchronous — it spawns a subprocess and polls with time.sleep() in a loop for up to startup_timeout seconds (server.py lines 52–82). Calling this directly in the __init__ of an async client blocks the running event loop for the entire startup duration.

Consider wrapping the server startup in asyncio.to_thread() (or loop.run_in_executor()) and providing an async factory method instead of doing this work in __init__:

Example: async factory method
`@classmethod`
async def create(cls, *, embedded=False, db_path=None, embedded_config=None,
                 startup_timeout=10.0, max_connections=None, **kwargs):
    if embedded:
        import asyncio
        from ..lite.server import EmbeddedServer

        if max_connections is None:
            max_connections = 16

        server = await asyncio.to_thread(
            EmbeddedServer,
            db_path=db_path,
            config=embedded_config,
            startup_timeout=startup_timeout,
        )
        kwargs["connection_pool"] = redis.BlockingConnectionPool(
            connection_class=redis.UnixDomainSocketConnection,
            path=server.unix_socket_path,
            max_connections=max_connections,
            ...
        )
        instance = cls(**kwargs)
        instance._embedded_server = server
        return instance
    else:
        return cls(**kwargs)


if embedded:
conn = redis.Redis(connection_pool=connection_pool,
single_connection_client=single_connection_client)
else:
conn = redis.Redis(host=host, port=port, db=0, password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding, encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error, ssl=ssl,
ssl_keyfile=ssl_keyfile, ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name, lib_name=lib_name,
lib_version=lib_version, username=username,
retry=retry, redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol)

if Is_Cluster(conn):
conn = Cluster_Conn(
Expand All @@ -106,6 +153,19 @@ def __init__(
self.flushdb = conn.flushdb
self.execute_command = conn.execute_command

async def close(self):
if hasattr(self, "connection") and self.connection is not None:
await self.connection.aclose()
if self._embedded_server is not None:
await asyncio.to_thread(self._embedded_server.stop)
self._embedded_server = None

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
await self.close()

@classmethod
def from_url(cls, url: str, **kwargs) -> "FalkorDB":
"""
Expand Down Expand Up @@ -287,4 +347,3 @@ async def udf_delete(self, lib: str):
resp = await self.connection.execute_command(UDF_CMD, "DELETE", lib)

return resp

148 changes: 107 additions & 41 deletions falkordb/falkordb.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,48 +72,95 @@ def __init__(
dynamic_startup_nodes=True,
url=None,
address_remap=None,
embedded=False,
db_path=None,
embedded_config=None,
startup_timeout=10.0,
):
self._embedded_server = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

AttributeError: 'FalkorDB' object has no attribute '_embedded_server' — CI test failure.

Same root cause as the async variant: instances reaching close() (line 204) without _embedded_server set, because __init__ was bypassed by test infrastructure. Add a class-level default:

🐛 Proposed fix
 class FalkorDB:
     """
     FalkorDB Class for interacting with a FalkorDB server.
     ...
     """
+    _embedded_server = None
+
     def __init__(
         self,
         ...
🧰 Tools
🪛 GitHub Actions: Lint

[error] Ruff format check would reformat this file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@falkordb/falkordb.py` at line 86, The class FalkorDB is missing a class-level
default for _embedded_server causing AttributeError when __init__ is bypassed;
add a class attribute _embedded_server = None on the FalkorDB class (so
instances that skip __init__ still have the attribute) and leave the existing
instance assignments in __init__ and the close() method unchanged; reference the
FalkorDB class, the _embedded_server attribute, the __init__ method, and the
close method when making this change.


conn = redis.Redis(
host=host,
port=port,
db=0,
password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
ssl=ssl,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_path=ssl_ca_path,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
ssl_password=ssl_password,
ssl_validate_ocsp=ssl_validate_ocsp,
ssl_validate_ocsp_stapled=ssl_validate_ocsp_stapled,
ssl_ocsp_context=ssl_ocsp_context,
ssl_ocsp_expected_cert=ssl_ocsp_expected_cert,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name,
lib_name=lib_name,
lib_version=lib_version,
username=username,
retry=retry,
redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol,
)
if embedded:
from .lite.server import EmbeddedServer

if max_connections is None:
max_connections = 16

server = EmbeddedServer(
db_path=db_path,
config=embedded_config,
startup_timeout=startup_timeout,
)
self._embedded_server = server

connection_pool = redis.ConnectionPool(
connection_class=redis.UnixDomainSocketConnection,
path=server.unix_socket_path,
max_connections=max_connections,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
retry=retry,
health_check_interval=health_check_interval,
client_name=client_name,
lib_name=lib_name,
lib_version=lib_version,
username=username,
password=password,
credential_provider=credential_provider,
protocol=protocol,
)

if embedded:
conn = redis.Redis(
connection_pool=connection_pool,
single_connection_client=single_connection_client,
)
else:
conn = redis.Redis(
host=host,
port=port,
db=0,
password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
ssl=ssl,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_path=ssl_ca_path,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
ssl_password=ssl_password,
ssl_validate_ocsp=ssl_validate_ocsp,
ssl_validate_ocsp_stapled=ssl_validate_ocsp_stapled,
ssl_ocsp_context=ssl_ocsp_context,
ssl_ocsp_expected_cert=ssl_ocsp_expected_cert,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name,
lib_name=lib_name,
lib_version=lib_version,
username=username,
retry=retry,
redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol,
)

if Is_Sentinel(conn):
self.sentinel, self.service_name = Sentinel_Conn(conn, ssl)
Expand All @@ -137,6 +184,26 @@ def __init__(
self.flushdb = conn.flushdb
self.execute_command = conn.execute_command

def close(self) -> None:
"""Close the current DB connection and stop the embedded server if present."""
if hasattr(self, "connection") and self.connection is not None:
self.connection.close()
if self._embedded_server is not None:
self._embedded_server.stop()
self._embedded_server = None

def __enter__(self) -> "FalkorDB":
return self

def __exit__(self, exc_type, exc, tb):
self.close()

def __del__(self):
try:
self.close()
except Exception:
pass

@classmethod
def from_url(cls, url: str, **kwargs) -> "FalkorDB":
"""
Expand Down Expand Up @@ -326,4 +393,3 @@ def udf_delete(self, lib: str):
resp = self.connection.execute_command(UDF_CMD, "DELETE", lib)

return resp

7 changes: 7 additions & 0 deletions falkordb/lite/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Embedded FalkorDB support.

Install with:
pip install falkordb[lite]
"""

Loading