diff --git a/spyk.py b/spyk.py index e003ce6..09a1cd9 100644 --- a/spyk.py +++ b/spyk.py @@ -31,6 +31,7 @@ class SpykContext(object): def __init__(self,**kw): self.planner = guineapig.Planner(**kw) self.tagCodeIndex = 0 + self.cachedViews = set() def setSerializer(self,setSerializer): """Delegate to the SpykContext's planner.""" @@ -62,9 +63,12 @@ def list(self): def finalize(self): """Declare the SpykRDD and all RDD definitions complete. This must be - called in the __name__=="__main__" part of the code, because - it also executes substeps when called recursively.""" + called in the __name__=="__main__" part of the code, after all + transformations have been defined, because it also executes + substeps when called as part of a plan.""" self.planner.setup() + for rdd in self.cachedViews: + rdd.view.opts(stored=True) if guineapig.Planner.partOfPlan(sys.argv): self.planner.main(sys.argv) @@ -84,17 +88,13 @@ def __init__(self,tag,context,view): self.context.tagCodeIndex += 1 self.context.planner._setView("%s__%d" % (tag,self.context.tagCodeIndex), view) - #TODO this doesn't work, need to use a different mechanism, - #maybe with a wrapper around plan/execute def cache(self): - self.view = self.view.opts(stored=True) + """Mark this as to-be-cached on disk.""" + self.context.cachedViews.add(self) return self #transformations, which return new SpykRDD's - #TODO - #cogroup - def map(self,mapfun): """Analogous to the corresponding Spark transformation.""" return SpykRDD('map', self.context, guineapig.ReplaceEach(self.view,by=mapfun)) @@ -162,11 +162,8 @@ def countByKey(self): reducingTo = guineapig.ReduceToCount())) ############################################################################### - # # actions, which setup(), execute a plan, and retrieve the results. - # # TODO: include steps to download HDFS output - # ############################################################################### def collect(self): @@ -186,6 +183,7 @@ def _take(self,n): plan = self.view.storagePlan() plan.execute(self.context.planner, echo=self.context.planner.opts['echo']) k = 0 + #TODO: download from hdfs if needed for line in open(self.view.storedFile()): k += 1 if n<0 or k<=n: diff --git a/testspyk.py b/testspyk.py index d05a701..1c08760 100644 --- a/testspyk.py +++ b/testspyk.py @@ -3,7 +3,6 @@ if __name__ == "__main__" : sc = spyk.SpykContext() - xA = sc.textFile('data/xA.txt').map(lambda line:int(line.strip())) xHiA = xA.filter(lambda r:r>=3) xLoA = xA.filter(lambda r:r<=7) @@ -18,16 +17,19 @@ xDistinctInputs = xUnionPairs.map(lambda (a,b):a) xDistinctPairs = xUnionPairs.map(lambda (a,b):a).distinct() xSample = xA.sample(False,0.5) - + yA = sc.textFile('data/xA.txt').map(lambda line:int(line.strip())) + yB = yA.filter(lambda n:n%2==1) + yC = yB.map(lambda n:(n,n+1,2*n)) + yB.cache() # triples (id, classList, words) corpus = sc.textFile('data/mytest.txt') \ .map(lambda line:line.strip().split("\t")) \ .map(lambda parts:(parts[0],parts[1].split(","),parts[2:])) docterm = corpus.flatMap(lambda (docid,classes,words):[(docid,w) for w in words]) - sc.finalize() if sc.usermain(): + print '= xA',list(xA.collect()) print '= xHiA',list(xHiA.collect()) print '= xLoA',list(xLoA.collect()) @@ -43,26 +45,13 @@ print '= count xDistinctPairs',xDistinctPairs.count() print '= count xSample',list(xSample.collect()) print '= xA reduce to sum', xA.reduce(lambda a,b:a+b) - - tmp = [] - xA.foreach(lambda a:tmp.append(a)) - print '= xA copy',tmp - + print '= docterm',list(docterm.take(10)) + aCopy = [] + xA.foreach(lambda a:aCopy.append(a)) + print '= aCopy',aCopy xMidAPairs.save('tmp.txt') + midAPairCopy = [] for line in open('tmp.txt'): - print '= saved:',line.strip() - - print '= docterm',list(docterm.take(10)) - - xDistinctPairs.pprint() - print 'list xDistinctPairs',xDistinctPairs.plan() - - def myPrint(msg,xs): - for x in xs: - print msg,x - myPrint('plan:', xDistinctPairs.plan()) - - myPrint('xDistinctPairs step', xDistinctPairs.steps()) - myPrint('xDistinctPairs task', xDistinctPairs.tasks()) - myPrint('defined view', sc.list()) - + midAPairCopy.append(line.strip()) + print '= midAPairCopy',midAPairCopy +