-
Notifications
You must be signed in to change notification settings - Fork 12
Description
TL/DR: How to trigger loading initial data from the source streams using the Scala API?
Using DBToaster 2.2 (rev. 3387), I was experimenting with the Scala API. I went through the following steps:
Compilation and run from the command line
Following the docs in Sec.1. Compiling and running a query, I successfully compiled and run the examples/queries/simple/rst.sql query from the command line, which gave the expected result:
$ bin/dbtoaster -l scala -c test.jar examples/queries/simple/rst.sql
$ java -classpath "test.jar:lib/dbt_scala/*" ddbt.gen.Dbtoaster
Java 1.8.0_162, Scala 2.10.2-20140310-140650-2481f036a5
Time: 0.002s (30/0)
ATIMESD:
306
Using the Scala API
Then I went on to the example code in Sec.2. Scala API Guide, we created a project to use the example query from Scala, which gave exactly the same output as given in the docs:
Insert a tuple into R.
Insert a tuple into S.
Insert a tuple into T.
Result after this step: 20
Insert another tuple into T.
Final Result: 45
The Scala project has access at in-the-query specified path to the initial stream data for the rst query, but the results show it doesn't load that: initial data gave ATIMESD 306, but the Scala call resulted in 20 and 45 after inserting a few more tuples, but without deleting any.
Is there a way to trigger loading the initial stream data specified in the CREATE STREAM statement through the Scala API?
Inspecting the vanilla Scala generated code
I tried to inspect the generated vanilla Scala code for the query:
$ bin/dbtoaster -l vscala -o rst.scala examples/queries/simple/rst.sql
I found that the def main of the main class invokes def execute, which feeds the streaming relations from the input files, but I found no way to invoke this by sending some events.
def main and def execute from the generated vscala code:
def execute(args: Array[String], f: List[Any] => Unit) = bench(args, (dataset: String, parallelMode: Int, timeout: Long, batchSize: Int) => run[Rst](Seq(
(new java.io.FileInputStream("examples/data/simple/r.dat"),new Adaptor.CSV("R","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split()),
(new java.io.FileInputStream("examples/data/simple/s.dat"),new Adaptor.CSV("S","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split()),
(new java.io.FileInputStream("examples/data/simple/t.dat"),new Adaptor.CSV("T","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split())
), parallelMode, timeout, batchSize), f)
def main(args: Array[String]) {
execute(args, (res: List[Any]) => {
println("ATIMESD:\n" + M3Map.toStr(res(0))+"\n")
})
}I was wondering if StreamInit event were intended to initialize initialize the stream from the predefined stream sources, or there are some events missing:
def receive = {
case TupleEvent(TupleInsert, "R", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddR(v0,v1)
case TupleEvent(TupleDelete, "R", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelR(v0,v1)
case TupleEvent(TupleInsert, "S", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddS(v0,v1)
case TupleEvent(TupleDelete, "S", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelS(v0,v1)
case TupleEvent(TupleInsert, "T", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddT(v0,v1)
case TupleEvent(TupleDelete, "T", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelT(v0,v1)
case StreamInit(timeout) =>
onSystemReady();
t0 = System.nanoTime;
if (timeout > 0) t1 = t0 + timeout * 1000000L
case EndOfStream | GetSnapshot(_) =>
t1 = System.nanoTime;
sender ! (StreamStat(t1 - t0, tN, tS), List(ATIMESD))
}