Skip to content

Commit de162ca

Browse files
committed
mock-up version
0 parents  commit de162ca

16 files changed

+1244
-0
lines changed

DeTrusty/Configuration/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
__author__ = "Philipp D. Rohde"
2+
3+
4+
def get_datasource_description(endpoints):
5+
"""At this point in time, only the endpoint URLs are stored.
6+
In a later version, a more sophisticated datasource description will be implemented."""
7+
endpoints = [e.strip() for e in endpoints.split(',') if e]
8+
with open('/DeTrusty/Config/datasource_description.txt', 'w') as f:
9+
for endpoint in endpoints:
10+
f.write(endpoint + "\n")
11+
12+
13+
def parse_datasource_file(datasource_file):
14+
return [e.strip() for e in open(datasource_file, 'r').readlines()]

DeTrusty/Decomposer/Query.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
__author__ = "Philipp D. Rohde"
2+
3+
4+
class Query:
5+
"""Simple representation of a SPARQL query. Object to be changed in later versions."""
6+
7+
def __init__(self, query_string):
8+
self.query_string = query_string
9+
self.distinct = True if "distinct" in query_string.lower() else False
10+
11+
variables = query_string.lower().split("where", 1)[0]
12+
variables = variables.lower().replace("select", "").replace("distinct", "")
13+
variables = ''.join(variables.split())
14+
variables = variables.split("?")
15+
self.variables = []
16+
for variable in variables:
17+
if variable:
18+
self.variables.append(variable.strip())
19+
20+
prefixes = query_string.split("SELECT", 1)[0]
21+
if prefixes:
22+
prefixes = prefixes.split("\n")
23+
self.prefixes = [(prefix.strip()[7:prefix.strip().find(':')], prefix.strip()[prefix.strip().find(':')+1:]) for prefix in prefixes if prefix]
24+
else:
25+
self.prefixes = []
26+
27+
def get_variables(self):
28+
return self.variables
29+
30+
def get_query_string(self):
31+
return self.query_string
32+
33+
def get_distinct(self):
34+
return self.distinct
35+
36+
def get_prefixes(self):
37+
return self.prefixes

DeTrusty/Decomposer/__init__.py

Whitespace-only changes.

DeTrusty/Operators/Xdistinct.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
Created on Dec 11, 2013
3+
Implements the Xdistinct operator.
4+
The intermediate results are represented in a queue.
5+
@author: Maribel Acosta Deibe
6+
"""
7+
from multiprocessing import Queue
8+
9+
10+
class Xdistinct(object):
11+
12+
def __init__(self, vars):
13+
# self.input = Queue()
14+
self.qresults = Queue()
15+
self.vars = vars
16+
self.bag = {}
17+
18+
def execute(self, left, dummy, out, processqueue=Queue()):
19+
# Executes the Xdistinct.
20+
self.left = left
21+
self.qresults = out
22+
tuple = self.left.get(True)
23+
24+
while (not (tuple == "EOF")):
25+
str_tuple = str(sorted(tuple.items()))
26+
get = self.bag.get(str_tuple, False)
27+
28+
if not (get):
29+
self.qresults.put(tuple)
30+
self.bag.update({str_tuple: True})
31+
tuple = self.left.get(True)
32+
33+
# Put EOF in queue and exit.
34+
self.qresults.put("EOF")
35+
return

DeTrusty/Operators/Xunion.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""
2+
Created on Jul 10, 2011
3+
Implements the Xunion operator.
4+
The intermediate results are represented in a queue.
5+
@author: Maribel Acosta Deibe
6+
"""
7+
from multiprocessing import Queue
8+
from multiprocessing.queues import Empty
9+
10+
11+
class Xunion(object):
12+
13+
def __init__(self, vars_left, vars_right):
14+
self.left = Queue()
15+
self.right = Queue()
16+
self.qresults = Queue()
17+
self.vars_left = vars_left
18+
self.vars_right = vars_right
19+
self.count = 0
20+
21+
def instantiate(self, d):
22+
newvars_left = self.vars_left - set(d.keys())
23+
newvars_right = self.vars_right - set(d.keys())
24+
# return Xunion(newvars_left, newvars_right, self.distinct)
25+
return Xunion(newvars_left, newvars_right)
26+
27+
def instantiateFilter(self, instantiated_vars, filter_str):
28+
newvars_left = self.vars_left - set(instantiated_vars)
29+
newvars_right = self.vars_right - set(instantiated_vars)
30+
return Xunion(newvars_left, newvars_right)
31+
32+
def execute(self, left, right, out, processqueue=Queue()):
33+
# Executes the Xunion.
34+
self.left = left
35+
self.right = right
36+
self.qresults = out
37+
# print "left", hex(id(left)), "right", hex(id(right)), "out", hex(id(out))
38+
39+
# Identify the kind of union to perform.
40+
if self.vars_left == self.vars_right:
41+
self.sameVariables()
42+
else:
43+
self.differentVariables()
44+
45+
# Put EOF in queue and exit.
46+
self.qresults.put("EOF")
47+
48+
def sameVariables(self):
49+
# Executes the Xunion operator when the variables are the same.
50+
51+
# Initialize tuples.
52+
tuple1 = None
53+
tuple2 = None
54+
55+
# Get the tuples from the queues.
56+
while tuple1 != "EOF" or tuple2 != "EOF":
57+
if tuple1 != "EOF":
58+
try:
59+
tuple1 = self.left.get(False)
60+
if tuple1 != "EOF":
61+
self.count += 1
62+
self.qresults.put(tuple1)
63+
except Empty:
64+
# This catch:
65+
# Empty: in tuple1 = self.left.get(False), when the queue is empty.
66+
pass
67+
68+
if tuple2 != "EOF":
69+
try:
70+
tuple2 = self.right.get(False)
71+
if tuple2 != "EOF":
72+
self.count += 1
73+
self.qresults.put(tuple2)
74+
75+
except Empty:
76+
# This catch:
77+
# Empty: in tuple2 = self.right.get(False), when the queue is empty.
78+
pass
79+
80+
def differentVariables(self):
81+
# Executes the Xunion operator when the variables are not the same.
82+
83+
# Initialize tuples.
84+
tuple1 = None
85+
tuple2 = None
86+
87+
# Initialize empty tuples.
88+
v1 = {}
89+
v2 = {}
90+
91+
# Add empty values to variables of the other argument.
92+
for v in self.vars_right:
93+
v1.update({v: ''})
94+
95+
for v in self.vars_left:
96+
v2.update({v: ''})
97+
98+
# Get the tuples from the queues.
99+
while (not (tuple1 == "EOF") or not (tuple2 == "EOF")):
100+
101+
# Get tuple from left queue, and concatenate with empty tuple.
102+
if not (tuple1 == "EOF"):
103+
try:
104+
tuple1 = self.left.get(False)
105+
if not (tuple1 == "EOF"):
106+
res = {}
107+
res.update(v1)
108+
res.update(tuple1)
109+
self.qresults.put(res)
110+
# print(tuple1)
111+
except Exception:
112+
# This catch:
113+
# Empty: in tuple1 = self.left.get(False), when the queue is empty.
114+
pass
115+
116+
# Get tuple from right queue, and concatenate with empty tuple.
117+
if not (tuple2 == "EOF"):
118+
try:
119+
tuple2 = self.right.get(False)
120+
if not (tuple2 == "EOF"):
121+
res = {}
122+
res.update(v2)
123+
res.update(tuple2)
124+
self.qresults.put(res)
125+
# print(tuple2)
126+
except Exception:
127+
# This catch:
128+
# Empty: in tuple2 = self.right.get(False), when the queue is empty.
129+
pass

DeTrusty/Operators/__init__.py

Whitespace-only changes.
+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import urllib.parse
2+
import urllib.request
3+
import logging
4+
5+
6+
logger = logging.getLogger(__name__)
7+
logger.setLevel(logging.INFO)
8+
handler = logging.StreamHandler()
9+
handler.setLevel(logging.INFO)
10+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
11+
handler.setFormatter(formatter)
12+
logger.addHandler(handler)
13+
14+
15+
def contact_source(server, query, queue, buffersize=16384, limit=-1):
16+
# Contacts the datasource (i.e. real endpoint).
17+
# Every tuple in the answer is represented as Python dictionaries
18+
# and is stored in a queue.
19+
logger.info("Contacting endpoint: " + server)
20+
b = None
21+
cardinality = 0
22+
23+
referer = server
24+
server = server.split("http://")[1]
25+
if '/' in server:
26+
(server, path) = server.split("/", 1)
27+
else:
28+
path = ""
29+
host_port = server.split(":")
30+
port = 80 if len(host_port) == 1 else host_port[1]
31+
32+
if limit == -1:
33+
b, cardinality = contact_source_aux(referer, server, path, port, query, queue)
34+
else:
35+
# Contacts the datasource (i.e. real endpoint) incrementally,
36+
# retrieving partial result sets combining the SPARQL sequence
37+
# modifiers LIMIT and OFFSET.
38+
39+
# Set up the offset.
40+
offset = 0
41+
42+
while True:
43+
query_copy = query + " LIMIT " + str(limit) + " OFFSET " + str(offset)
44+
b, card = contact_source_aux(referer, server, path, port, query_copy, queue)
45+
cardinality += card
46+
if card < limit:
47+
break
48+
49+
offset = offset + limit
50+
51+
# Close the queue
52+
queue.put("EOF")
53+
return b
54+
55+
56+
def contact_source_aux(referer, server, path, port, query, queue):
57+
# Setting variables to return.
58+
b = None
59+
cardinality = 0
60+
61+
if '0.0.0.0' in server:
62+
server = server.replace('0.0.0.0', 'localhost')
63+
64+
js = "application/sparql-results+json"
65+
params = {'query': query, 'format': js}
66+
headers = {"User-Agent":
67+
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36",
68+
"Accept": js}
69+
try:
70+
data = urllib.parse.urlencode(params)
71+
data = data.encode('utf-8')
72+
req = urllib.request.Request(referer, data, headers)
73+
with urllib.request.urlopen(req) as response:
74+
resp = response.read()
75+
resp = resp.decode()
76+
res = resp.replace("false", "False")
77+
res = res.replace("true", "True")
78+
res = eval(res)
79+
if type(res) == dict:
80+
b = res.get('boolean', None)
81+
82+
if 'results' in res:
83+
# print "raw results from endpoint", res
84+
for x in res['results']['bindings']:
85+
for key, props in x.items():
86+
# Handle typed-literals and language tags
87+
suffix = ''
88+
if props['type'] == 'typed-literal':
89+
if isinstance(props['datatype'], bytes):
90+
suffix = "^^<" + props['datatype'].decode('utf-8') + ">"
91+
else:
92+
suffix = "^^<" + props['datatype'] + ">"
93+
elif "xml:lang" in props:
94+
suffix = '@' + props['xml:lang']
95+
try:
96+
if isinstance(props['value'], bytes):
97+
x[key] = props['value'].decode('utf-8') + suffix
98+
else:
99+
x[key] = props['value'] + suffix
100+
except:
101+
x[key] = props['value'] + suffix
102+
103+
queue.put(x)
104+
cardinality += 1
105+
# Every tuple is added to the queue.
106+
#for elem in reslist:
107+
# print elem
108+
#queue.put(elem)
109+
110+
else:
111+
print("the source " + str(server) + " answered in " + res.getheader("content-type") + " format, instead of"
112+
+ " the JSON format required, then that answer will be ignored")
113+
except Exception as e:
114+
logger.error("Exception while sending request to ", referer, "msg:", e)
115+
print("Exception while sending request to ", referer, "msg:", e)
116+
117+
# print "b - ", b
118+
# print server, query, len(reslist)
119+
120+
# print "Contact Source returned: ", len(reslist), ' results'
121+
return b, cardinality

DeTrusty/Wrapper/__init__.py

Whitespace-only changes.

DeTrusty/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)