Skip to content

Commit

Permalink
add property database interface (#58)
Browse files Browse the repository at this point in the history
* add property database interface

* add basic sqlite functionality

* add basic sql queries

* refactor and simplify queries

* update function references

* add license

* add unit tests

* improve field loading interface

* Update gqlalchemy/disk_storage.py

Co-authored-by: Jure Bajic <[email protected]>

* resolve review requests

* fix styling and linting

* add tests to delete properties

* remove default for for the on disk database file

* add test that enters a node into memgraph

* add failing test when inhereting an object whose fields cannot be pickled

* bump year to 2022

* add saving properties to on disk storage and loading properties from on disk storage when saving and loading graph objects

* remove unnecessary print statement

* add multiprocessing test

* use multiprocessing pool instead of individual processes

Co-authored-by: Marko Budiselić <[email protected]>

* separate loading and saving on disk properties in separate functions

* define a custom exception for a missing property on disk database

* add possibility to add on disk database to memgraph by providing an argument to the on disk databases init

* rename add_on_disk_storage to init_disk_storage

* handle memgraph write collisions

Co-authored-by: Jure Bajic <[email protected]>
Co-authored-by: Marko Budiselić <[email protected]>
  • Loading branch information
3 people authored Jan 19, 2022
1 parent eee17e0 commit f80a8b0
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# temp files
tests/on_disk_storage.db

# Byte-compiled / optimized / DLL files
**/__pycache__/
*.py[cod]
Expand Down
1 change: 1 addition & 0 deletions gqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Path,
Relationship,
)
from .disk_storage import SQLitePropertyDatabase # noqa F401
from .query_builder import ( # noqa F401
Call,
Create,
Expand Down
148 changes: 148 additions & 0 deletions gqlalchemy/disk_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) 2016-2022 Memgraph Ltd. [https://memgraph.com]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlite3
import contextlib

from abc import ABC
from typing import Optional, List


class OnDiskPropertyDatabase(ABC):
def save_node_property(self, node_id: int, property_name: str, property_value: str) -> None:
pass

def load_node_property(self, node_id: int, property_name: str, property_value: str) -> Optional[str]:
pass

def delete_node_property(self, node_id: int, property_name: str, property_value: str) -> None:
pass

def save_relationship_property(self, relationship_id: int, property_name: str, property_value: str) -> None:
pass

def load_relationship_property(
self, relationship_id: int, property_name: str, property_value: str
) -> Optional[str]:
pass

def delete_relationship_property(self, node_id: int, property_name: str, property_value: str) -> None:
pass

def drop_database(self) -> None:
pass


class SQLitePropertyDatabase(OnDiskPropertyDatabase):
def __init__(self, database_path: str, memgraph: "Memgraph" = None): # noqa F821
self.database_name = database_path
self.create_node_property_table()
self.create_relationship_property_table()
if memgraph is not None:
memgraph.init_disk_storage(self)

def execute_query(self, query: str) -> List[str]:
with contextlib.closing(sqlite3.connect(self.database_name)) as conn:
with conn: # autocommit changes
with contextlib.closing(conn.cursor()) as cursor:
cursor.execute(query)
return cursor.fetchall()

def create_node_property_table(self) -> None:
self.execute_query(
"CREATE TABLE IF NOT EXISTS node_properties ("
"node_id integer NOT NULL,"
"property_name text NOT NULL,"
"property_value text NOT NULL,"
"PRIMARY KEY (node_id, property_name)"
");"
)

def create_relationship_property_table(self) -> None:
self.execute_query(
"CREATE TABLE IF NOT EXISTS relationship_properties ("
"relationship_id integer NOT NULL,"
"property_name text NOT NULL,"
"property_value text NOT NULL,"
"PRIMARY KEY (relationship_id, property_name)"
");"
)

def drop_database(self) -> None:
self.execute_query("DELETE FROM node_properties;")
self.execute_query("DELETE FROM relationship_properties;")

def save_node_property(self, node_id: int, property_name: str, property_value: str) -> None:
self.execute_query(
"INSERT INTO node_properties (node_id, property_name, property_value) "
f"VALUES({node_id}, '{property_name}', '{property_value}') "
"ON CONFLICT(node_id, property_name) "
"DO UPDATE SET property_value=excluded.property_value;"
)

def load_node_property(self, node_id: int, property_name: str) -> Optional[str]:
result = self.execute_query(
"SELECT property_value "
"FROM node_properties AS db "
f"WHERE db.node_id = {node_id} "
f"AND db.property_name = '{property_name}'"
)

if len(result) == 0:
return None

# primary key is unique
assert len(result) == 1 and len(result[0]) == 1

return result[0][0]

def delete_node_property(self, node_id: int, property_name: str) -> None:
self.execute_query(
"DELETE "
"FROM node_properties AS db "
f"WHERE db.node_id = {node_id} "
f"AND db.property_name = '{property_name}'"
)

def save_relationship_property(self, relationship_id: int, property_name: str, property_value: str) -> None:
self.execute_query(
"INSERT INTO relationship_properties (relationship_id, property_name, property_value) "
f"VALUES({relationship_id}, '{property_name}', '{property_value}') "
"ON CONFLICT(relationship_id, property_name) "
"DO UPDATE SET property_value=excluded.property_value;"
)

def load_relationship_property(self, relationship_id: int, property_name: str) -> Optional[str]:
result = self.execute_query(
"SELECT property_value "
"FROM relationship_properties AS db "
f"WHERE db.relationship_id = {relationship_id} "
f"AND db.property_name = '{property_name}'"
)

if len(result) == 0:
return None

# primary key is unique
assert len(result) == 1 and len(result[0]) == 1

return result[0][0]

def delete_relationship_property(self, relationship_id: int, property_name: str) -> None:
self.execute_query(
"DELETE "
"FROM relationship_properties AS db "
f"WHERE db.relationship_id = {relationship_id} "
f"AND db.property_name = '{property_name}'"
)
18 changes: 18 additions & 0 deletions gqlalchemy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
'{cls.__name__}' will be used until you create a subclass.
"""

ON_DISK_PROPERTY_DATABASE_NOT_DEFINED_ERROR = """
Error: Saving a node with an on_disk property without specifying an on disk database.
Add an on_disk_db like this:
from gqlalchemy import Memgraph, SQLitePropertyDatabase
db = Memgraph()
SQLitePropertyDatabase("path-to-sqlite-db", db)
"""


class GQLAlchemyWarning(Warning):
pass
Expand All @@ -29,8 +40,15 @@ class GQLAlchemyUniquenessConstraintError(GQLAlchemyError):

class GQLAlchemyDatabaseMissingInFieldError(GQLAlchemyError):
def __init__(self, constraint, field, field_type):
super().__init__()
self.message = DATABASE_MISSING_IN_FIELD_ERROR_MESSAGE.format(
constraint=constraint,
field=field,
field_type=field_type,
)


class GQLAlchemyOnDiskPropertyDatabaseNotDefinedError(GQLAlchemyError):
def __init__(self):
super().__init__()
self.message = ON_DISK_PROPERTY_DATABASE_NOT_DEFINED_ERROR
104 changes: 91 additions & 13 deletions gqlalchemy/memgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

import os
import sqlite3
from typing import Any, Dict, Iterator, List, Optional, Union

from .connection import Connection
from .disk_storage import OnDiskPropertyDatabase
from .models import (
MemgraphConstraint,
MemgraphConstraintExists,
Expand All @@ -27,7 +29,11 @@
Relationship,
)

from .exceptions import GQLAlchemyError, GQLAlchemyUniquenessConstraintError
from .exceptions import (
GQLAlchemyError,
GQLAlchemyUniquenessConstraintError,
GQLAlchemyOnDiskPropertyDatabaseNotDefinedError,
)

__all__ = ("Memgraph",)

Expand Down Expand Up @@ -62,6 +68,7 @@ def __init__(
self._password = password
self._encrypted = encrypted
self._cached_connection: Optional[Connection] = None
self._on_disk_db = None

def execute_and_fetch(self, query: str, connection: Connection = None) -> Iterator[Dict[str, Any]]:
"""Executes Cypher query and returns iterator of results."""
Expand Down Expand Up @@ -201,6 +208,12 @@ def new_connection(self) -> Connection:
)
return Connection.create(**args)

def init_disk_storage(self, on_disk_db: OnDiskPropertyDatabase):
self.on_disk_db = on_disk_db

def remove_on_disk_storage(self):
self.on_disk_db = None

def _get_nodes_with_unique_fields(self, node: Node) -> Optional[Node]:
return self.execute_and_fetch(
f"MATCH (node: {node._label})"
Expand All @@ -226,8 +239,9 @@ def create_node(self, node: Node) -> Optional[Node]:
return self.get_variable_assume_one(results, "node")

def save_node(self, node: Node):
result = None
if node._id is not None:
return self._save_node_with_id(node)
result = self._save_node_with_id(node)
elif node.has_unique_fields():
matching_nodes = list(self._get_nodes_with_unique_fields(node))
if len(matching_nodes) > 1:
Expand All @@ -236,11 +250,25 @@ def save_node(self, node: Node):
)
elif len(matching_nodes) == 1:
node._id = matching_nodes[0]["node"]._id
return self.save_node_with_id(node)
result = self.save_node_with_id(node)
else:
return self.create_node(node)
result = self.create_node(node)
else:
return self.create_node(node)
result = self.create_node(node)

result = self._save_node_properties_on_disk(node, result)
return result

def _save_node_properties_on_disk(self, node: Node, result: Node) -> Node:
for field in node.__fields__:
value = getattr(node, field, None)
if value is not None and "on_disk" in node.__fields__[field].field_info.extra:
if self.on_disk_db is None:
raise GQLAlchemyOnDiskPropertyDatabaseNotDefinedError()
self.on_disk_db.save_node_property(result._id, field, value)
setattr(result, field, value)

return result

def save_node_with_id(self, node: Node) -> Optional[Node]:
results = self.execute_and_fetch(
Expand All @@ -254,14 +282,31 @@ def save_node_with_id(self, node: Node) -> Optional[Node]:

def load_node(self, node: Node) -> Optional[Node]:
if node._id is not None:
return self.load_node_with_id(node)
result = self.load_node_with_id(node)
elif node.has_unique_fields():
matching_node = self.get_variable_assume_one(
query_result=self._get_nodes_with_unique_fields(node), variable_name="node"
)
return matching_node
result = matching_node
else:
return self.load_node_with_all_properties(node)
result = self.load_node_with_all_properties(node)

result = self._load_node_properties_on_disk(result)
return result

def _load_node_properties_on_disk(self, result: Node) -> Node:
for field in result.__fields__:
value = getattr(result, field, None)
if "on_disk" in result.__fields__[field].field_info.extra:
if self.on_disk_db is None:
raise GQLAlchemyOnDiskPropertyDatabaseNotDefinedError()
try:
new_value = self.on_disk_db.load_node_property(result._id, field)
except sqlite3.OperationalError:
new_value = value
setattr(result, field, new_value)

return result

def load_node_with_all_properties(self, node: Node) -> Optional[Node]:
results = self.execute_and_fetch(
Expand All @@ -276,11 +321,27 @@ def load_node_with_id(self, node: Node) -> Optional[Node]:

def load_relationship(self, relationship: Relationship) -> Optional[Relationship]:
if relationship._id is not None:
return self.load_relationship_with_id(relationship)
result = self.load_relationship_with_id(relationship)
elif relationship._start_node_id is not None and relationship._end_node_id is not None:
return self.load_relationship_with_start_node_id_and_end_node_id(relationship)
result = self.load_relationship_with_start_node_id_and_end_node_id(relationship)
else:
raise GQLAlchemyError("Can't load a relationship without a start_node_id and end_node_id.")
result = self._load_relationship_properties_on_disk(result)
return result

def _load_relationship_properties_on_disk(self, result: Relationship) -> Relationship:
for field in result.__fields__:
value = getattr(result, field, None)
if "on_disk" in result.__fields__[field].field_info.extra:
if self.on_disk_db is None:
raise GQLAlchemyOnDiskPropertyDatabaseNotDefinedError()
try:
new_value = self.on_disk_db.load_relationship_property(result._id, field)
except sqlite3.OperationalError:
new_value = value
setattr(result, field, new_value)

return result

def load_relationship_with_id(self, relationship: Relationship) -> Optional[Relationship]:
results = self.execute_and_fetch(
Expand All @@ -295,11 +356,14 @@ def load_relationship_with_id(self, relationship: Relationship) -> Optional[Rela
def load_relationship_with_start_node_id_and_end_node_id(
self, relationship: Relationship
) -> Optional[Relationship]:
and_block = relationship._get_cypher_fields_and_block("relationship")
if and_block.strip():
and_block = " AND " + and_block
results = self.execute_and_fetch(
f"MATCH (start_node)-[relationship:{relationship._type}]->(end_node)"
+ f" WHERE id(start_node) = {relationship._start_node_id}"
+ f" AND id(end_node) = {relationship._end_node_id}"
+ f" AND {relationship._get_cypher_fields_and_block()}"
+ and_block
+ " RETURN relationship;"
)
return self.get_variable_assume_one(results, "relationship")
Expand All @@ -318,12 +382,26 @@ def load_relationship_with_start_node_and_end_node(

def save_relationship(self, relationship: Relationship) -> Optional[Relationship]:
if relationship._id is not None:
self.save_relationship_with_id(relationship)
result = self.save_relationship_with_id(relationship)
elif relationship._start_node_id is not None and relationship._end_node_id is not None:
return self.create_relationship(relationship)
result = self.create_relationship(relationship)
else:
raise GQLAlchemyError("Can't create a relationship without start_node_id and end_node_id.")

result = self._save_relationship_properties_on_disk(relationship, result)
return result

def _save_relationship_properties_on_disk(self, relationship: Relationship, result: Relationship) -> Relationship:
for field in relationship.__fields__:
value = getattr(relationship, field, None)
if value is not None and "on_disk" in relationship.__fields__[field].field_info.extra:
if self.on_disk_db is None:
raise GQLAlchemyOnDiskPropertyDatabaseNotDefinedError()
self.on_disk_db.save_relationship_property(result._id, field, value)
setattr(result, field, value)

return result

def save_relationship_with_id(self, relationship: Relationship) -> Optional[Relationship]:
results = self.execute_and_fetch(
f"MATCH (start_node)-[relationship: {relationship._type}]->(end_node)"
Expand Down
Loading

0 comments on commit f80a8b0

Please sign in to comment.