-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsimple.py
225 lines (182 loc) · 7.92 KB
/
simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""
Non-RDF interfaces to the thesaurus.
"""
from os.path import basename
import re
from dotenv import load_dotenv
from rdflib import OWL, SKOS, URIRef, Literal
from .thesaurus import BASE, Termset, Thesaurus
from collections.abc import Generator
load_dotenv()
HOMOSAURUS = Thesaurus().parse('homosaurus.ttl')
class Tokenizer:
DELIMITER = re.compile(r'[ -/()]')
@classmethod
def split(cls, phrase):
return filter(None, cls.DELIMITER.split(phrase))
def name_to_ref(name: str) -> URIRef:
return URIRef(BASE + name)
def ref_to_name(ref: URIRef) -> str:
return basename(ref)
def resolve_external_term(ref):
if ref.startswith('https://homosaurus.org/v3/'):
return resolve_homosaurus_term(ref)
return SimpleTerm(uri=str(ref))
def resolve_homosaurus_term(ref):
prefLabel = HOMOSAURUS.value(ref, SKOS.prefLabel)
altLabels = list(HOMOSAURUS.objects(ref, SKOS.altLabel))
return SimpleTerm(
uri=str(ref),
prefLabel=str(prefLabel),
altLabels=[str(l) for l in altLabels]
)
class SimpleTerm(dict):
@staticmethod
def from_subject(termset: Termset, subject: URIRef) -> "SimpleTerm":
"""Make a simple dict with the predicate-objects of a term in the thesaurus."""
return SimpleTerm(
name=ref_to_name(subject),
uri=str(subject),
prefLabel=str(termset.value(subject, SKOS.prefLabel)),
altLabels=[str(l) for l in termset.objects(subject, SKOS.altLabel)],
hiddenLabels=[str(l) for l in termset.objects(subject, SKOS.hiddenLabel)],
scopeNote=str(termset.value(subject, SKOS.scopeNote)),
# Relations to QLIT terms
broader=[ref_to_name(ref)
for ref in termset.objects(subject, SKOS.broader)],
narrower=[ref_to_name(ref)
for ref in termset.objects(subject, SKOS.narrower)],
related=[ref_to_name(ref)
for ref in termset.objects(subject, SKOS.related)],
# Relations to external terms
exactMatch=[resolve_external_term(ref) for ref in termset.objects(subject, SKOS.exactMatch)],
closeMatch=[resolve_external_term(ref) for ref in termset.objects(subject, SKOS.closeMatch)],
)
@staticmethod
def from_termset(termset: Termset) -> list["SimpleTerm"]:
"""Make simple dicts for the given set of terms."""
terms = [SimpleTerm.from_subject(termset, ref) for ref in termset.refs()]
terms.sort(key=lambda term: term['prefLabel'].lower())
return terms
def get_labels(self) -> Generator[Literal]:
"""Labels for the term (or for closely related concepts), in relevance order."""
match: SimpleTerm
if self.get('prefLabel'):
yield self['prefLabel']
if 'exactMatch' in self:
for match in self['exactMatch']:
yield from match.get_labels()
if self.get('altLabels'):
yield from self['altLabels']
if self.get('hiddenLabels'):
yield from self['hiddenLabels']
if 'closeMatch' in self:
for match in self['closeMatch']:
yield from match.get_labels()
def get_words(self) -> Generator[str]:
"""All the labels for this term, tokenized into words and lowercased."""
for label in self.get_labels():
for word in Tokenizer.split(label):
yield word.lower()
class SimpleThesaurus():
"""Like Thesaurus but with unqualified names as inputs and dicts as output."""
def __init__(self, thesaurus: Thesaurus):
self.t = thesaurus
self.th = Thesaurus()
self.th += self.t + HOMOSAURUS
def get(self, name: str) -> SimpleTerm:
ref = name_to_ref(name)
self.t.assert_term_exists(ref)
return SimpleTerm.from_subject(self.t, name_to_ref(name))
def get_roots(self) -> Termset:
"""Find all terms without parents."""
termset = self.t.get_roots()
return SimpleTerm.from_termset(termset)
def get_narrower(self, broader: str) -> list[SimpleTerm]:
ref = name_to_ref(broader)
self.t.assert_term_exists(ref)
termset = self.t.get_narrower(ref)
return SimpleTerm.from_termset(termset)
def get_broader(self, narrower: str) -> list[SimpleTerm]:
ref = name_to_ref(narrower)
self.t.assert_term_exists(ref)
termset = self.t.get_broader(ref)
return SimpleTerm.from_termset(termset)
def get_related(self, other: str) -> list[SimpleTerm]:
ref = name_to_ref(other)
self.t.assert_term_exists(ref)
termset = self.t.get_related(ref)
return SimpleTerm.from_termset(termset)
def search(self, s: str) -> Termset:
"""Find terms matching a user-given incremental (startswith) search string."""
qws = list(Tokenizer.split(s.lower()))
def match(label: str) -> float:
lws = Tokenizer.split(label.lower())
for i, lw in enumerate(lws):
if any(lw.startswith(qw) for qw in qws):
# Score more if match appears early in label
return 10 - min(i, 5)
return 0
hits = dict()
def add_hit(ref, score):
if not ref in hits:
hits[ref] = 0
hits[ref] = max(hits[ref], score)
# The different label fields should give different scores
fields = {
SKOS.prefLabel: 1,
SKOS.altLabel: .8,
SKOS.hiddenLabel: .6,
}
# Check all QLIT/Homosaurus terms
for ref in self.th.concepts():
for predicate, relevance in fields.items():
# Score each label against the search string
for label in self.th[ref:predicate]:
score = match(label) * relevance
if not score: continue
# Is a QLIT term: Record score for it
if (ref.startswith("https://queerlit")):
add_hit(ref, score)
# Is a Homosaurus term: Record score for the matching QLIT term
for sref in self.th.subjects(SKOS.exactMatch, ref):
add_hit(sref, score * .8)
for sref in self.th.subjects(SKOS.closeMatch, ref):
add_hit(sref, score * .5)
scored_hits = []
for ref, score in hits.items():
if self.th.value(ref, OWL.deprecated): continue
term = SimpleTerm.from_subject(self.t, ref)
term['score'] = score
scored_hits.append(term)
scored_hits.sort(key=lambda term: term['prefLabel'])
scored_hits.sort(key=lambda term: term['score'], reverse=True)
return scored_hits
def get_collections(self):
g = self.t.get_collections()
dicts = [dict(
name=ref_to_name(ref),
uri=str(ref),
prefLabel=g.value(ref, SKOS.prefLabel),
) for ref in g.collections()]
dicts.sort(key=lambda term: term['prefLabel'].lower())
return dicts
def get_collection(self, name, tree=False):
ref = name_to_ref(name)
self.t.assert_term_exists(ref)
termset = self.t.terms_if(lambda term: self.t[ref:SKOS.member:term])
terms = SimpleTerm.from_termset(termset)
if tree:
self.expand_narrower(terms)
return terms
def get_labels(self):
"""All term labels, keyed by corresponding term identifiers."""
return dict((ref_to_name(name), label) for (name, label) in self.t.subject_objects(SKOS.prefLabel))
def expand_narrower(self, terms: list[SimpleTerm]):
"""Instead of string names, look up and inflate narrower terms recursively."""
for term in terms:
expanded = []
for name in term["narrower"]:
expanded.append(self.get(name))
self.expand_narrower(expanded)
term["narrower"] = expanded