Skip to content

Commit 38a0486

Browse files
committed
fix issue with VALUES clause containing multiple variables
1 parent 2235214 commit 38a0486

File tree

6 files changed

+146
-223
lines changed

6 files changed

+146
-223
lines changed

DeTrusty/Decomposer/Decomposer.py

+9-26
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
from enum import Enum
55
from functools import partial
66

7-
from DeTrusty.Logger import get_logger
87
from DeTrusty.Decomposer import Tree, utils
8+
from DeTrusty.Logger import get_logger
99
from DeTrusty.Sparql.Parser.services import Service, Triple, Filter, Optional, UnionBlock, JoinBlock, Values, Bind, Argument
1010

1111
logger = get_logger(__name__, '.decompositions.log')
12+
13+
1214
class Decomposer(object):
1315

1416
def __init__(self, query, config, decompType="STAR", joinstarslocally=True, sparql_one_dot_one=False):
@@ -105,7 +107,7 @@ def decomposeJoinBlock(self, jb):
105107
return None
106108

107109
fl1 = self.includeFilter(sl, fl)
108-
fl = list(set(fl) - set(fl1)) # in case VALUES with variables from multiple sources, the VALUES object won't be removed
110+
fl = list(set(fl) - set(fl1)) # in case VALUES with variables from multiple sources, the VALUES object won't be removed
109111
if sl:
110112
if len(sl) == 1 and isinstance(sl[0], UnionBlock) and fl != []:
111113
sl[0] = self.updateFilters(sl[0], fl)
@@ -695,10 +697,6 @@ def includeFilterAux(self, f, sl):
695697
if set(vars_s) & set(vars_f) == set(vars_f):
696698
s.include_filter(f)
697699
fl1 = fl1 + [f]
698-
elif type(f) is Values and set(vars_s) & set(vars_f) != set():
699-
fl2 = f.instantiate(set(vars_s) & set(vars_f))
700-
s.include_filter(fl2) # the new decomposed clause need to be reconsidered
701-
fl1 = fl1 + [fl2]
702700
return fl1
703701

704702
def includeFilterUnionBlock(self, jb, f):
@@ -708,11 +706,7 @@ def includeFilterUnionBlock(self, jb, f):
708706
if isinstance(jbUS, Service):
709707
vars_s = set(jbUS.getVars())
710708
vars_f = f.getVars()
711-
if type(f) is Values and set(vars_s) & set(vars_f) != set():
712-
fl2 = f.instantiate(set(vars_s) & set(vars_f))
713-
jbUS.include_filter(fl2)
714-
fl1 = fl1 + [fl2]
715-
elif set(vars_s) & set(vars_f) == set(vars_f):
709+
if set(vars_s) & set(vars_f) == set(vars_f):
716710
jbUS.include_filter(f)
717711
fl1 = fl1 + [f]
718712
return fl1
@@ -741,33 +735,22 @@ def includeFilterAuxSK(self, f, sl, sr):
741735
for s in sl:
742736
bgpvars.update(set(utils.getVars(s)))
743737
vars_s = set()
744-
if (isinstance(s, Triple)):
738+
if isinstance(s, Triple):
745739
vars_s.update(set(utils.getVars(s)))
746740
else:
747741
for t in s.triples:
748742
vars_s.update(set(utils.getVars(t)))
749743

750-
if set(vars_s) & set(vars_f) == set(vars_f): # Bind supposed to fall to this category
751-
serviceFilter = True
752-
753-
if type(f) is Values and set(vars_s) & set(vars_f) != set():
754-
fl2 = f.instantiate(set(vars_s) & set(vars_f))
744+
if set(vars_s) & set(vars_f) == set(vars_f): # Bind supposed to fall to this category
755745
serviceFilter = True
756746

757747
for v in bgpvars:
758748
if v in fvars:
759749
fvars[v] = True
760750

761-
# if type(f) is Bind: # why is this here? Investigate later
762-
# fvars[f.alias] = True
763-
764751
if serviceFilter:
765-
if type(f) is Values:
766-
sr.include_filter(fl2)
767-
fl1 = fl1 + [fl2]
768-
else:
769-
sr.include_filter(f)
770-
fl1 = fl1 + [f]
752+
sr.include_filter(f)
753+
fl1 = fl1 + [f]
771754
else:
772755
fs = [v for v in fvars if not fvars[v]]
773756
if len(fs) == 0:

DeTrusty/Decomposer/Planner.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
from DeTrusty.Operators.AnapsidOperators.Xorderby import Xorderby
1919
from DeTrusty.Operators.AnapsidOperators.Xproject import Xproject
2020
from DeTrusty.Operators.AnapsidOperators.Xunion import Xunion
21+
from DeTrusty.Operators.AnapsidOperators.Xvalues import Xvalues
2122
from DeTrusty.Operators.BlockingOperators.Union import Union
2223
from DeTrusty.Operators.NonBlockingOperators.NestedHashJoinFilter import NestedHashJoinFilter as NestedHashJoin
2324
from DeTrusty.Operators.NonBlockingOperators.NestedHashOptionalFilter import NestedHashOptionalFilter as NestedHashOptional
24-
from DeTrusty.Sparql.Parser.services import Bind, Filter, Service, Optional, UnionBlock, JoinBlock
25+
from DeTrusty.Sparql.Parser.services import Bind, Filter, Values, Service, Optional, UnionBlock, JoinBlock
2526

2627

2728
class Planner(object):
@@ -52,7 +53,7 @@ def createPlan(self):
5253
raise SyntaxError('Query contains projections that are neither grouping variables nor aggregates.')
5354

5455
# Adds the group by operator to the plan.
55-
if (len(query.group_by) > 0 or over_all_triples):
56+
if (len(query.group_by) > 0) or over_all_triples:
5657
operatorTree = TreePlan(Xgroupby(query.group_by, over_all_triples), operatorTree.vars, operatorTree)
5758

5859
# Adds the having operator to the plan.
@@ -188,6 +189,8 @@ def includePhysicalOperators(self, tree):
188189
if set(n.vars) & set(vars_f) == set(vars_f):
189190
if isinstance(f, Filter):
190191
n = TreePlan(Xfilter(f), n.vars, n)
192+
elif isinstance(f, Values):
193+
n = TreePlan(Xvalues(f), n.vars, n)
191194
if isinstance(f, Bind):
192195
n.vars = set(n.vars) | set(vars_f)
193196
if n.vars & set(vars_f) == set(vars_f):

DeTrusty/Operators/AnapsidOperators/Xvalues.py

+26-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
from multiprocessing import Queue
21
import datetime
2+
from multiprocessing import Queue
3+
4+
from DeTrusty.Sparql.Parser.services import Argument
35

46
data_types = {
57
'integer': (int, 'numerical'),
@@ -23,6 +25,8 @@
2325
'positiveInteger': (int, 'numerical')
2426
}
2527

28+
UNDEF = Argument('UNDEF')
29+
2630

2731
class Xvalues(object):
2832

@@ -31,49 +35,50 @@ class Xvalues(object):
3135
def __init__(self, values):
3236
self.input = Queue()
3337
self.qresults = Queue()
34-
self.values = values
38+
self.values = values
39+
self.left = None
3540

3641
def execute(self, left, dummy, out, processqueue=Queue()):
3742
self.left = left
3843
self.qresults = out
39-
tuple = self.left.get(True)
44+
tuple_ = self.left.get(True)
4045

41-
while (tuple != "EOF"):
42-
shouldInclude = self.filterByValues(tuple)
43-
if shouldInclude:
44-
self.qresults.put(tuple)
46+
while tuple_ != "EOF":
47+
should_include = self.filterByValues(tuple_)
48+
if should_include:
49+
self.qresults.put(tuple_)
4550

46-
tuple = self.left.get(True)
51+
tuple_ = self.left.get(True)
4752

4853
# Put EOF in queue and exit.
4954
self.qresults.put("EOF")
5055

51-
def filterByValues(self, tuple):
56+
def filterByValues(self, tuple_):
5257
for row in self.values.data_block_val:
53-
isValid = True
58+
is_valid = True
5459
for idx, variable in enumerate(self.values.var):
55-
value = tuple[variable.name[1:]]
56-
rowArg = row[idx]
57-
if rowArg is None:
60+
value = tuple_[variable.name[1:]]['value']
61+
row_arg = row[idx]
62+
if row_arg == UNDEF:
5863
continue
59-
extractedRowValue = self.extractValue(rowArg.name)
60-
rowValue = extractedRowValue[1:-1]
64+
extracted_row_value = self.extractValue(row_arg.name)
65+
row_value = extracted_row_value[1:-1]
6166

62-
if value != rowValue:
63-
isValid = False
67+
if value != row_value:
68+
is_valid = False
6469
break
65-
if isValid:
70+
if is_valid:
6671
return True
6772
return False
6873

6974
def extractValue(self, val):
7075
pos = val.find("^^")
7176
# Handles when the literal is typed.
72-
if (pos > -1):
77+
if pos > -1:
7378
for t in data_types.keys():
74-
if (t in val[pos:]):
79+
if t in val[pos:]:
7580
(python_type, general_type) = data_types[t]
76-
if (general_type == bool):
81+
if general_type == bool:
7782
return val[:pos]
7883
else:
7984
return python_type(val[:pos])

0 commit comments

Comments
 (0)