Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: wellcomecollection/catalogue-pipeline
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 965f8aef4af38347c8c116bc79c1d9cf2b89fa9c
Choose a base ref
..
head repository: wellcomecollection/catalogue-pipeline
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: ad99305f2cba82628f2b34bd436119b08a3a8abb
Choose a head ref
3 changes: 1 addition & 2 deletions catalogue_graph/src/models/indexable_concept.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
from pydantic import BaseModel

from models.catalogue_concept import CatalogueConcept
from models.graph_node import ConceptSource

# Query

@@ -32,7 +31,7 @@ class ConceptDisplayIdentifierType(BaseModel):
type: str = "IdentifierType"

@classmethod
def from_source_type(cls, source_type: ConceptSource) -> "ConceptDisplayIdentifierType":
def from_source_type(cls, source_type: str) -> "ConceptDisplayIdentifierType":
if source_type == "label-derived":
label = "Identifier derived from the label of the referent"
elif source_type == "nlm-mesh":
4 changes: 2 additions & 2 deletions catalogue_graph/src/sources/catalogue/concepts_source.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from sources.gzip_source import GZipSource


def extract_concepts_from_work(raw_work: dict) -> Generator[(dict, WorkConceptKey)]:
def extract_concepts_from_work(raw_work: dict) -> Generator[tuple[dict, WorkConceptKey]]:
for subject in raw_work.get("subjects", []):
for concept in subject.get("concepts", []):
yield concept, "subjects"
@@ -24,7 +24,7 @@ class CatalogueConceptsSource(BaseSource):
def __init__(self, url: str):
self.url = url

def stream_raw(self) -> Generator[dict]:
def stream_raw(self) -> Generator[tuple[dict, WorkConceptKey]]:
"""Streams raw concept nodes from a work's subjects, genres, and contributors."""
catalogue_source = GZipSource(self.url)
for work in catalogue_source.stream_raw():
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ def __init__(self, url: str):
)
self.id_lookup: set = set()

def transform_node(self, raw_data: (dict, WorkConceptKey)) -> Concept | None:
def transform_node(self, raw_data: tuple[dict, WorkConceptKey]) -> Concept | None:
raw_concept = RawCatalogueConcept(raw_data[0], self.id_label_checker)

if not raw_concept.is_concept:
@@ -37,7 +37,7 @@ def transform_node(self, raw_data: (dict, WorkConceptKey)) -> Concept | None:
type=raw_concept.type,
)

def extract_edges(self, raw_data: (dict, WorkConceptKey)) -> Generator[ConceptHasSourceConcept]:
def extract_edges(self, raw_data: tuple[dict, WorkConceptKey]) -> Generator[ConceptHasSourceConcept]:
raw_concept = RawCatalogueConcept(raw_data[0], self.id_label_checker)

if not raw_concept.is_concept:
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ def _add_catalogue_request() -> None:
)


def _check_edge(all_edges: list[BaseEdge], from_id: str, to_id: str, expected_edge: BaseEdge):
def _check_edge(all_edges: list[BaseEdge], from_id: str, to_id: str, expected_edge: BaseEdge) -> None:
filtered_edges = [edge for edge in all_edges if edge.from_id == from_id and edge.to_id == to_id]
assert len(filtered_edges) == 1
assert filtered_edges[0] == expected_edge
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from transformers.catalogue.works_transformer import CatalogueWorksTransformer


def _check_edge(all_edges: list[BaseEdge], from_id: str, to_id: str, expected_edge: BaseEdge):
def _check_edge(all_edges: list[BaseEdge], from_id: str, to_id: str, expected_edge: BaseEdge) -> None:
filtered_edges = [edge for edge in all_edges if edge.from_id == from_id and edge.to_id == to_id]
assert len(filtered_edges) == 1
assert filtered_edges[0] == expected_edge