Skip to content

Commit c8ce574

Browse files
committed
Added debugging for repeat queries
1 parent 172c362 commit c8ce574

File tree

6 files changed

+41
-27
lines changed

6 files changed

+41
-27
lines changed

src/flink/scala/edu/uta/diql/QueryCodeGenerator.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ abstract class QueryCodeGenerator {
5050
if (diql_explain)
5151
println("Optimized term:\n"+pretty_print(oe.toString))
5252
cg.typecheck(oe,env)
53-
Provenance.exprs = Nil
5453
val de = if (debug)
5554
normalizeAll(Call("debug",
5655
List(Provenance.embedLineage(oe,cg.isDistributed(_)),

src/main/scala/edu/uta/diql/Lineage.scala

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ object Provenance {
2222
/** The nodes of the query AST */
2323
var exprs: List[String] = Nil
2424

25+
/** variables in the repeat pattern */
26+
private var repeatVars: List[String] = Nil
27+
2528
private def label ( e: Expr ): Int = {
2629
exprs = exprs:+nodeLabel(e)
2730
exprs.length-1
@@ -48,6 +51,18 @@ object Provenance {
4851
}
4952
}
5053

54+
/** return the value from the result */
55+
private def value ( e: Expr ): Expr
56+
= flatMap(Lambda(CallPat("ResultValue",List(VarPat("v"),VarPat("p"))),
57+
Elem(Var("v"))),
58+
e)
59+
60+
/** return the lineage from the result */
61+
private def lineage ( e: Expr ): Expr
62+
= flatMap(Lambda(CallPat("ResultValue",List(VarPat("v"),VarPat("p"))),
63+
Elem(Var("p"))),
64+
e)
65+
5166
/** map {((k,v),p)} to {(k,(v,p))} */
5267
private def flip ( e: Expr ): Expr
5368
= flatMap(Lambda(CallPat("ResultValue",List(TuplePat(List(VarPat("k"),VarPat("v"))),VarPat("p"))),
@@ -90,9 +105,6 @@ object Provenance {
90105
}
91106
}
92107

93-
private def prov1 ( expr: Expr, value: Expr, provenance: Expr ): Expr
94-
= prov(expr,value,Call("List",List(provenance)))
95-
96108
private def prov ( expr: Expr, value: Expr, left: Expr, right: Expr ): Expr = {
97109
val loc = label(expr)
98110
value match {
@@ -112,10 +124,12 @@ object Provenance {
112124
}
113125

114126
/** Lift the expression e of type {t} to {(t,provenance)} */
115-
def embed ( e: Expr ): Expr
127+
private def embed ( e: Expr ): Expr
116128
= e match {
117129
case repeat(Lambda(p,u),x,Lambda(_,c),n)
118-
=> repeat(Lambda(p,embed(u)),x,Lambda(p,embed(c)),n)
130+
=> repeatVars = patvars(p)++repeatVars
131+
val nc = repeatVars.foldRight(c){ case (x,r) => subst(x,value(Var(x)),r) }
132+
repeat(Lambda(p,embed(u)),embed(x),Lambda(p,nc),n)
119133
case flatMap(Lambda(p,b),x)
120134
=> val v = newvar
121135
val w = newvar
@@ -193,12 +207,7 @@ object Provenance {
193207
=> val nv = newvar
194208
MatchE(embed(x),
195209
List(Case(VarPat(nv),BoolConst(true),
196-
prov(e,reduce(m,flatMap(Lambda(CallPat("ResultValue",List(VarPat("v"),VarPat("p"))),
197-
Elem(Var("v"))),
198-
Var(nv))),
199-
flatMap(Lambda(CallPat("ResultValue",List(VarPat("v"),VarPat("p"))),
200-
Elem(Var("p"))),
201-
Var(nv))))))
210+
prov(e,reduce(m,value(Var(nv))),lineage(Var(nv))))))
202211
case SmallDataSet(x)
203212
=> embed(x)
204213
case Merge(x,y)
@@ -218,6 +227,7 @@ object Provenance {
218227
Var(xv))),
219228
flatMap(Lambda(VarPat("v"),Call("propagateLineage",List(Var("v")))),
220229
Var(yv)))))))))
230+
case Var(v) if repeatVars.contains(v) => e
221231
case _
222232
=> val loc = label(e)
223233
flatMap(Lambda(VarPat("v"),
@@ -229,69 +239,76 @@ object Provenance {
229239
}
230240

231241
/** Lift the expression e of type t to (t,provenance) */
232-
def embedLineage ( e: Expr, isDistr: Expr => Boolean ): Expr
242+
private def embed_lineage ( e: Expr, isDistr: Expr => Boolean ): Expr
233243
= if (isDistr(e))
234244
embed(e)
235245
else e match {
236246
case Tuple(s)
237-
=> val ns = s.map(embedLineage(_,isDistr))
247+
=> val ns = s.map(embed_lineage(_,isDistr))
238248
val vs = s.map(_ => (newvar,newvar))
239249
MatchE(Tuple(ns),
240250
List(Case(TuplePat(vs.map{ case (v,p) => CallPat("ResultValue",List(VarPat(v),VarPat(p))) }),
241251
BoolConst(true),
242252
prov(e,Tuple(vs.map{ case (v,p) => Var(v) }),
243253
Call("List",vs.map{ case (v,p) => Var(p) })))))
244254
case Call(f,s)
245-
=> val ns = s.map(embedLineage(_,isDistr))
255+
=> val ns = s.map(embed_lineage(_,isDistr))
246256
val vs = s.map(_ => (newvar,newvar))
247257
MatchE(Tuple(ns),
248258
List(Case(TuplePat(vs.map{ case (v,p) => CallPat("ResultValue",List(VarPat(v),VarPat(p))) }),
249259
BoolConst(true),
250260
prov(e,Call(f,vs.map{ case (v,p) => Var(v) }),
251261
Call("List",vs.map{ case (v,p) => Var(p) })))))
252262
case Constructor(f,s)
253-
=> val ns = s.map(embedLineage(_,isDistr))
263+
=> val ns = s.map(embed_lineage(_,isDistr))
254264
val vs = s.map(_ => (newvar,newvar))
255265
MatchE(Tuple(ns),
256266
List(Case(TuplePat(vs.map{ case (v,p) => CallPat("ResultValue",List(VarPat(v),VarPat(p))) }),
257267
BoolConst(true),
258268
prov(e,Constructor(f,vs.map{ case (v,p) => Var(v) }),
259269
Call("List",vs.map{ case (v,p) => Var(p) })))))
260270
case MethodCall(o,m,s)
261-
=> val os = embedLineage(o,isDistr)
262-
val ns = s.map(embedLineage(_,isDistr))
271+
=> val os = embed_lineage(o,isDistr)
272+
val ns = s.map(embed_lineage(_,isDistr))
263273
val vs = (o::s).map(_ => (newvar,newvar))
264274
MatchE(Tuple(os::ns),
265275
List(Case(TuplePat(vs.map{ case (v,p) => CallPat("ResultValue",List(VarPat(v),VarPat(p))) }),
266276
BoolConst(true),
267277
prov(e,MethodCall(Var(vs(0)._1),m,vs.tail.map{ case (v,p) => Var(v) }),
268278
Call("List",vs.map{ case (v,p) => Var(p) })))))
269279
case IfE(p,t,f)
270-
=> MatchE(Tuple(List(embedLineage(p,isDistr),embedLineage(t,isDistr),embedLineage(f,isDistr))),
280+
=> MatchE(Tuple(List(embed_lineage(p,isDistr),embed_lineage(t,isDistr),embed_lineage(f,isDistr))),
271281
List(Case(TuplePat(List(VarPat("p"),VarPat("t"),VarPat("f"))),
272282
BoolConst(true),
273283
prov(e,IfE(Nth(Var("p"),1),Nth(Var("t"),1),Nth(Var("e"),1)),
274284
Call("List",List(Nth(Var("p"),2),Nth(Var("t"),2),Nth(Var("e"),2)))))))
275285
case MatchE(x,cs)
276-
=> val ns = cs.map{ case Case(p,b,n) => Case(p,b,embedLineage(n,isDistr)) }
286+
=> val ns = cs.map{ case Case(p,b,n) => Case(p,b,embed_lineage(n,isDistr)) }
277287
val vs = (x::cs).map(_ => (newvar,newvar))
278-
MatchE(embedLineage(x,isDistr),
288+
MatchE(embed_lineage(x,isDistr),
279289
List(Case(VarPat("x"),
280290
BoolConst(true),
281291
MatchE(MatchE(Nth(Var("x"),1),ns),
282292
List(Case(VarPat("v"),
283293
BoolConst(true),
284294
prov(e,Nth(Var("v"),1),Nth(Var("x"),2),Nth(Var("v"),2))))))))
285295
case Nth(x,n)
286-
=> MatchE(embedLineage(x,isDistr),
296+
=> MatchE(embed_lineage(x,isDistr),
287297
List(Case(VarPat("x"),
288298
BoolConst(true),
289299
prov(e,Nth(Nth(Var("x"),1),n),Nth(Var("x"),2)))))
290300
case Elem(x)
291-
=> MatchE(embedLineage(x,isDistr),
301+
=> MatchE(embed_lineage(x,isDistr),
292302
List(Case(VarPat("x"),
293303
BoolConst(true),
294304
prov(e,Elem(Nth(Var("x"),1)),Nth(Var("x"),2)))))
295305
case _ => MatchE(e,List(Case(VarPat("v"),BoolConst(true),prov(e,Var("v"),Call("List",List())))))
296306
}
307+
308+
/** Lift the expression e of type t to (t,provenance) */
309+
def embedLineage ( e: Expr, isDistr: Expr => Boolean ): Expr = {
310+
exprs = Nil
311+
repeatVars = Nil
312+
embed_lineage(e,isDistr)
313+
}
297314
}

src/scalding/scala/edu/uta/diql/QueryCodeGenerator.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ abstract class QueryCodeGenerator {
5050
if (diql_explain)
5151
println("Optimized term:\n"+pretty_print(oe.toString))
5252
cg.typecheck(oe,env)
53-
Provenance.exprs = Nil
5453
val de = if (debug)
5554
normalizeAll(Call("debug",
5655
List(Provenance.embedLineage(oe,cg.isDistributed(_)),

src/spark/scala/edu/uta/diql/QueryCodeGenerator.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ abstract class QueryCodeGenerator {
5050
if (diql_explain)
5151
println("Optimized term:\n"+pretty_print(oe.toString))
5252
cg.typecheck(oe,env)
53-
Provenance.exprs = Nil
5453
val de = if (debug)
5554
normalizeAll(Call("debug",
5655
List(Provenance.embedLineage(oe,cg.isDistributed(_)),

tests/flink/build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ for I in ${FLINK_HOME}/lib/*.jar; do
1212
done
1313

1414
mkdir -p classes
15-
env JAVA_OPTS="-Xmx1G" scalac -d classes -cp ${JARS}:${DIQL_HOME}/lib/diql-flink.jar $*
15+
env JAVA_OPTS="-Xmx2G" scalac -d classes -cp ${JARS}:${DIQL_HOME}/lib/diql-flink.jar $*

tests/scalding/test.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ object Test extends ExecutionApp {
9595
select (x,y,z) from x <- S, y <- R, z <- S where x._2==y._2 && y._2== z._1;
9696
select (x,y) from x <- S, y <- R where y._1=="x2" && x._2==y._2 && x._1==45;
9797
select (k,avg/j) from x <-- S, (i,j,s) <- S where x._2==j group by k: x._1;
98-
let x = (select x from x <- S where x._1<2) in x++x;
98+
// let x = (select x from x <- S where x._1<2) in x++x;
9999
repeat s = select i from (i,_,_) <- S step select i+1 from i <- s limit 10;
100100
repeat s = List(1,2,3) step s.map(_+1) until (+/s) > 60;
101101
// repeat s = select i from (i,_,_) <- S step select i+1 from i <- s until all x<-s: x<10 limit 10;

0 commit comments

Comments
 (0)