Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 80 additions & 49 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class ArangoRDF(AbstractArangoRDF):
:param logging_lvl: Defaults to logging.INFO. Other useful options are
logging.DEBUG (more verbose), and logging.WARNING (less verbose).
:type logging_lvl: str | int
:param rdf_attribute_prefix: The prefix for RDF attributes (e.g., _uri, _value,
_rdftype, etc.). Defaults to the original "_" symbol, but please NOTE
that using an underscore "_", results in these attributes being treated
as ArangoDB system attributes. Using "$" is an alternative non-system prefix.
:type rdf_attribute_prefix: str
:raise TypeError: On invalid parameter types
"""

Expand All @@ -71,6 +76,7 @@ def __init__(
db: StandardDatabase,
controller: ArangoRDFController = ArangoRDFController(),
logging_lvl: Union[str, int] = logging.INFO,
rdf_attribute_prefix: str = "_",
):
self.set_logging(logging_lvl)

Expand All @@ -86,6 +92,18 @@ def __init__(
self.__cntrl = controller
self.__cntrl.db = db

# Set the RDF attribute prefix
self.__rdf_attribute_prefix = rdf_attribute_prefix

# RDF attribute names using the configurable prefix
self.__rdf_uri_attr = f"{rdf_attribute_prefix}uri"
self.__rdf_value_attr = f"{rdf_attribute_prefix}value"
self.__rdf_type_attr = f"{rdf_attribute_prefix}rdftype"
self.__rdf_label_attr = f"{rdf_attribute_prefix}label"
self.__rdf_sub_graph_uri_attr = f"{rdf_attribute_prefix}sub_graph_uri"
self.__rdf_lang_attr = f"{rdf_attribute_prefix}lang"
self.__rdf_datatype_attr = f"{rdf_attribute_prefix}datatype"

# An RDF to ArangoDB variable used as a buffer
# to store the to-be-inserted ArangoDB documents (RDF-to-ArangoDB).
self.__adb_docs: ADBDocs
Expand Down Expand Up @@ -148,6 +166,10 @@ def db(self) -> StandardDatabase:
def controller(self) -> ArangoRDFController:
return self.__cntrl # pragma: no cover

@property
def rdf_attribute_prefix(self) -> str:
return self.__rdf_attribute_prefix # pragma: no cover

def set_logging(self, level: Union[int, str]) -> None:
logger.setLevel(level)

Expand Down Expand Up @@ -281,13 +303,13 @@ def arangodb_to_rdf(
"_id",
"_key",
"_rev",
"_rdftype",
"_uri",
"_value",
"_label",
"_from",
"_to",
"_sub_graph_uri",
self.__rdf_type_attr,
self.__rdf_uri_attr,
self.__rdf_value_attr,
self.__rdf_label_attr,
self.__rdf_sub_graph_uri_attr,
}

adb_e_cols = set(metagraph.get("edgeCollections", {}))
Expand All @@ -301,9 +323,11 @@ def arangodb_to_rdf(
if self.db.has_collection("Property"):
doc: Json
for doc in self.db.collection("Property"):
if doc.keys() >= {"_uri", "_label"}:
if doc.keys() >= {self.__rdf_uri_attr, self.__rdf_label_attr}:
# TODO: What if 2+ URIs have the same local name?
self.__uri_map[doc["_label"]] = URIRef(doc["_uri"])
self.__uri_map[doc[self.__rdf_label_attr]] = URIRef(
doc[self.__rdf_uri_attr]
)

# Re-bind the namespace prefixes of **rdf_graph**
if namespace_collection_name:
Expand Down Expand Up @@ -1118,7 +1142,7 @@ def write_adb_col_statements(
raise ValueError(m)

for doc in self.__db.collection(uri_map_collection_name):
uri = URIRef(doc["_uri"])
uri = URIRef(doc[self.__rdf_uri_attr])
collection = str(doc["collection"])
self.__add_adb_col_statement(uri, collection, True)

Expand Down Expand Up @@ -1288,8 +1312,8 @@ def migrate_edges_to_attributes(
edge_collection_name: str,
attribute_name: Optional[str] = None,
edge_direction: str = "OUTBOUND",
sort_clause: Optional[str] = "v._label",
return_clause: str = "v._label",
sort_clause: Optional[str] = None,
return_clause: Optional[str] = None,
) -> int:
"""RDF --> ArangoDB (PGT): Migrate all edges in the specified edge collection to
attributes. This method is useful when combined with the
Expand All @@ -1305,18 +1329,20 @@ def migrate_edges_to_attributes(
:param edge_collection_name: The name of the edge collection to migrate.
:type edge_collection_name: str
:param attribute_name: The name of the attribute to migrate the edges to.
Defaults to **edge_collection_name**, prefixed with an underscore (_).
Defaults to **edge_collection_name**, prefixed with the
**rdf_attribute_prefix** parameter set in the constructor.
:type attribute_name: Optional[str]
:param edge_direction: The direction of the edges to migrate.
Defaults to **OUTBOUND**.
:type edge_direction: str
:param sort_clause: A SORT statement to order the traversed vertices.
Defaults to "v._label". If set to None, the vertex values will
be ordered based on their traversal order.
Defaults to f"v.{self.__rdf_attribute_prefix}label". If set to None,
the vertex values will be ordered based on their traversal order.
:type sort_clause: Optional[str]
:param return_clause: A RETURN statement to return the specific value
to add as an attribute from the traversed vertices.
Defaults to "v._label". Another option can be "v._uri".
Defaults to f"v.{self.__rdf_attribute_prefix}label".
Another option can be f"v.{self.__rdf_attribute_prefix}uri".
:type return_clause: str
:return: The number of documents updated.
:rtype: int
Expand All @@ -1328,9 +1354,6 @@ def migrate_edges_to_attributes(
if edge_direction.upper() not in {"OUTBOUND", "INBOUND", "ANY"}:
raise ValueError(f"Invalid edge direction: {edge_direction}")

if not return_clause:
raise ValueError("**return_clause** cannot be empty")

graph = self.db.graph(graph_name)

target_e_d = {}
Expand All @@ -1344,7 +1367,13 @@ def migrate_edges_to_attributes(
raise ValueError(m)

if not attribute_name:
attribute_name = f"_{edge_collection_name}"
attribute_name = f"{self.__rdf_attribute_prefix}{edge_collection_name}"

if not sort_clause:
sort_clause = f"v.{self.__rdf_label_attr}"

if not return_clause:
return_clause = f"v.{self.__rdf_label_attr}"

with_cols = set(target_e_d["to_vertex_collections"])
with_cols_str = "WITH " + ", ".join(with_cols)
Expand Down Expand Up @@ -1580,7 +1609,7 @@ def __process_adb_vertex(
if type(term) is Literal:
return term

sg = URIRef(adb_v.get("_sub_graph_uri", "")) or None
sg = URIRef(adb_v.get(self.__rdf_sub_graph_uri_attr, "")) or None
self.__unpack_adb_doc(adb_v, v_col, term, sg)

if self.__infer_type_from_adb_v_col:
Expand Down Expand Up @@ -1625,12 +1654,12 @@ def __process_adb_edge(
"""
_from: str = adb_e["_from"]
_to: str = adb_e["_to"]
_uri = adb_e.get("_uri", "")
_uri = adb_e.get(self.__rdf_uri_attr, "")

subject = self.__get_rdf_term_of_adb_doc(_from)
predicate = URIRef(_uri) or e_col_uri
object = self.__get_rdf_term_of_adb_doc(_to)
sg = URIRef(adb_e.get("_sub_graph_uri", "")) or None
sg = URIRef(adb_e.get(self.__rdf_sub_graph_uri_attr, "")) or None

# TODO: Revisit when rdflib introduces RDF-star support
# edge_uri = (subject, predicate, object, sg)
Expand Down Expand Up @@ -1668,12 +1697,12 @@ def __adb_doc_to_rdf_term(self, doc: Json, col: str) -> RDFTerm:
:rtype: URIRef | BNode | Literal
"""
key_map = {
"URIRef": "_uri",
"Literal": "_value",
"URIRef": self.__rdf_uri_attr,
"Literal": self.__rdf_value_attr,
"BNode": "_key",
}

rdf_type = doc.get("_rdftype", "URIRef") # Default to URIRef
rdf_type = doc.get(self.__rdf_type_attr, "URIRef") # Default to URIRef
val = doc.get(key_map[rdf_type], f"{self.__graph_ns}/{col}#{doc['_key']}")

if rdf_type == "URIRef":
Expand All @@ -1683,11 +1712,11 @@ def __adb_doc_to_rdf_term(self, doc: Json, col: str) -> RDFTerm:
return BNode(val)

elif rdf_type == "Literal":
if "_lang" in doc:
return Literal(val, lang=doc["_lang"])
if self.__rdf_lang_attr in doc:
return Literal(val, lang=doc[self.__rdf_lang_attr])

elif "_datatype" in doc:
return Literal(val, datatype=doc["_datatype"])
elif self.__rdf_datatype_attr in doc:
return Literal(val, datatype=doc[self.__rdf_datatype_attr])

else:
return Literal(val)
Expand Down Expand Up @@ -2008,18 +2037,18 @@ def __rpt_process_term(self, t: RDFTerm) -> RDFTermMeta:

self.__adb_docs[t_col][t_key] = {
"_key": t_key,
"_uri": t_str,
"_label": t_label,
"_rdftype": "URIRef",
self.__rdf_uri_attr: t_str,
self.__rdf_label_attr: t_label,
self.__rdf_type_attr: "URIRef",
}

elif type(t) is BNode:
t_col = self.__BNODE_COL

self.__adb_docs[t_col][t_key] = {
"_key": t_key,
"_label": "",
"_rdftype": "BNode",
self.__rdf_label_attr: "",
self.__rdf_type_attr: "BNode",
}

elif type(t) is Literal:
Expand All @@ -2028,18 +2057,20 @@ def __rpt_process_term(self, t: RDFTerm) -> RDFTermMeta:
t_label = t_value

self.__adb_docs[t_col][t_key] = {
"_value": t_value,
"_label": t_label, # TODO: REVISIT
"_rdftype": "Literal",
self.__rdf_value_attr: t_value,
self.__rdf_label_attr: t_label, # TODO: REVISIT
self.__rdf_type_attr: "Literal",
}

if self.__use_hashed_literals_as_keys:
self.__adb_docs[t_col][t_key]["_key"] = t_key

if t.language:
self.__adb_docs[t_col][t_key]["_lang"] = t.language
self.__adb_docs[t_col][t_key][self.__rdf_lang_attr] = t.language
elif t.datatype:
self.__adb_docs[t_col][t_key]["_datatype"] = str(t.datatype)
self.__adb_docs[t_col][t_key][self.__rdf_datatype_attr] = str(
t.datatype
)

else:
raise ValueError(f"Unable to process {t}") # pragma: no cover
Expand Down Expand Up @@ -2467,9 +2498,9 @@ def __pgt_process_rdf_term(
elif type(t) is URIRef:
self.__adb_docs[t_col][t_key] = {
"_key": t_key,
"_uri": str(t),
"_label": t_label,
"_rdftype": "URIRef",
self.__rdf_uri_attr: str(t),
self.__rdf_label_attr: t_label,
self.__rdf_type_attr: "URIRef",
}

if (
Expand All @@ -2480,14 +2511,14 @@ def __pgt_process_rdf_term(
self.__adb_docs[uri_col][t_key] = {
"_key": t_key,
"collection": t_col,
"_uri": str(t),
self.__rdf_uri_attr: str(t),
}

elif type(t) is BNode:
self.__adb_docs[t_col][t_key] = {
"_key": t_key,
"_label": "",
"_rdftype": "BNode",
self.__rdf_label_attr: "",
self.__rdf_type_attr: "BNode",
}

elif type(t) is Literal and all([s_col, s_key, p_label]):
Expand Down Expand Up @@ -2534,7 +2565,7 @@ def __pgt_process_rdf_literal(
self.__pgt_rdf_val_to_adb_val(doc, p_label, val, process_val_as_serialized_list)

if sg_str:
doc["_sub_graph_uri"] = sg_str
doc[self.__rdf_sub_graph_uri_attr] = sg_str

def __pgt_process_object(
self, s_meta: RDFTermMeta, p_meta: RDFTermMeta, o_meta: RDFTermMeta, sg_str: str
Expand Down Expand Up @@ -3162,13 +3193,13 @@ def __add_adb_edge(
"_key": key,
"_from": _from,
"_to": _to,
"_uri": _uri,
"_label": _label,
"_rdftype": "URIRef",
self.__rdf_uri_attr: _uri,
self.__rdf_label_attr: _label,
self.__rdf_type_attr: "URIRef",
}

if _sg:
self.__adb_docs[col][key]["_sub_graph_uri"] = _sg
self.__adb_docs[col][key][self.__rdf_sub_graph_uri_attr] = _sg

def __build_explicit_type_map(
self, adb_adb_col_statement: Callable[..., None] = empty_func
Expand Down