From caa7188d3fc3cf4f611641fab06238244c328e8b Mon Sep 17 00:00:00 2001 From: William Cohen Date: Wed, 30 Sep 2015 14:36:00 -0400 Subject: [PATCH] more complete version of spyk --- guineapig.py | 3 + spyk.py | 152 +++++++++++++++++++++++++++++++++++++++++++-------- testspyk.py | 68 +++++++++++++++++++++++ 3 files changed, 200 insertions(+), 23 deletions(-) create mode 100644 testspyk.py diff --git a/guineapig.py b/guineapig.py index c71e617..753df1b 100644 --- a/guineapig.py +++ b/guineapig.py @@ -1409,6 +1409,9 @@ def setEvaluator(self,rowEvaluator): # rest of the API for the planner # + def serialize(self,row): + return self._serializer.toString(row) + @staticmethod def partOfPlan(argv): """True if the command line was generated as part of a storage plan.""" diff --git a/spyk.py b/spyk.py index 6976dab..e003ce6 100644 --- a/spyk.py +++ b/spyk.py @@ -7,21 +7,58 @@ import random class SpykContext(object): + """Analogous to a SparkContext, this allows a Spark-like syntax for + GuineaPig programs. A sample program would be: + + import spyk + if __name__ == "__main__" : + sc = spyk.SpykContext() + #view definitions, using sc.textFile(), rdd.map(...) and other transformations + ... + sc.finalize() + if sc.usermain(): + #actions, eg collect(), take(n), first(), count(), ... + ... + + You cannot interleave actions and transformations - all + transformations need to be defined before finalize() is + called, and all actions afterward. + + Corrently, actions can only be executed locally, + i.e., with target=shell. + """ def __init__(self,**kw): self.planner = guineapig.Planner(**kw) self.tagCodeIndex = 0 - - #TODO setSerializer, setEvaluator, ship + + def setSerializer(self,setSerializer): + """Delegate to the SpykContext's planner.""" + self.planner.setSerializer(setSerializer) + + def setEvaluator(self,setEvaluator): + """Delegate to the SpykContext's planner.""" + self.planner.setEvaluator(setEvaluator) + + def ship(self,fileName): + """Delegate to the SpykContext's planner.""" + self.planner.ship(fileName) #returns a SpykRDD def textFile(self,fileName): + """Return a SpykRDD that contains the lines in the textfile.""" rdd = SpykRDD('textFile', self, guineapig.ReadLines(fileName)) return rdd def wholeTextFiles(self,dirName): #TODO find this in royals, and make it a gpextra - pass + assert False,'not implemented!' + + #not in spark + + def list(self): + """Return a list of the names of all defined views.""" + return self.planner.listViewNames() def finalize(self): """Declare the SpykRDD and all RDD definitions complete. This must be @@ -32,12 +69,15 @@ def finalize(self): self.planner.main(sys.argv) def usermain(self): - """Use this in an if statement before any Spyk actions.""" + """Use this in an if statement in the __main__ of a code, + before any Spyk actions, but after all transformations have been + defined..""" return not guineapig.Planner.partOfPlan(sys.argv) class SpykRDD(object): def __init__(self,tag,context,view): + """Should not be called directly by users.""" self.view = view self.context = context self.view.planner = context.planner @@ -53,18 +93,18 @@ def cache(self): #transformations, which return new SpykRDD's #TODO - #union - #intersection - gpextra? - # ... and for keyed views only #cogroup def map(self,mapfun): - return SpykRDD('map',self.context, guineapig.ReplaceEach(self.view,by=mapfun)) + """Analogous to the corresponding Spark transformation.""" + return SpykRDD('map', self.context, guineapig.ReplaceEach(self.view,by=mapfun)) def flatMap(self,mapfun): - return SpykRDD('flatMap',self.context, guineapig.Flatten(self.view,by=mapfun)) + """Analogous to the corresponding Spark transformation.""" + return SpykRDD('flatMap', self.context, guineapig.Flatten(self.view,by=mapfun)) def groupByKey(self): + """Analogous to the corresponding Spark transformation.""" return SpykRDD('groupByKey', self.context, guineapig.Group(self.view, @@ -72,20 +112,36 @@ def groupByKey(self): retaining=lambda (key,val):val)) def reduceByKey(self,initValue,reduceOp): + """Analogous to the corresponding Spark transformation.""" return SpykRDD('reduceByKey', self.context, guineapig.Group(self.view, by=lambda (key,val):key, retaining=lambda (key,val):val, - reducingTo=guineapig.ReduceTo(initValue,reduceOp))) + reducingTo=guineapig.ReduceTo(lambda:initValue,reduceOp))) def filter(self,filterfun): + """Analogous to the corresponding Spark transformation.""" return SpykRDD('filter',self.context, guineapig.Filter(self.view,by=filterfun)) def sample(self,withReplacement,fraction): + """Analogous to the corresponding Spark transformation, but defined only when withReplacement==False""" assert not withReplacement, 'sampling with replacement is not implemented' return SpykRDD('sample',self.context, guineapig.Filter(self.view,by=lambda x:1 if random.random()=3) + xLoA = xA.filter(lambda r:r<=7) + xMidA = xHiA.intersection(xLoA) + xHiAPairs = xHiA.map(lambda r:(r,2*r)) + xLoAPairs = xLoA.map(lambda r:(r,3*r)) + xMidAPairs = xHiAPairs.join(xLoAPairs) + xUnionPairs = xHiAPairs.union(xLoAPairs) + xGroupPairs = xUnionPairs.groupByKey() + xGroupSizes = xUnionPairs.countByKey() + xSumPairs = xUnionPairs.reduceByKey(0,lambda accum,x:accum+x) + xDistinctInputs = xUnionPairs.map(lambda (a,b):a) + xDistinctPairs = xUnionPairs.map(lambda (a,b):a).distinct() + xSample = xA.sample(False,0.5) + + # triples (id, classList, words) + corpus = sc.textFile('data/mytest.txt') \ + .map(lambda line:line.strip().split("\t")) \ + .map(lambda parts:(parts[0],parts[1].split(","),parts[2:])) + docterm = corpus.flatMap(lambda (docid,classes,words):[(docid,w) for w in words]) + + sc.finalize() + + if sc.usermain(): + print '= xA',list(xA.collect()) + print '= xHiA',list(xHiA.collect()) + print '= xLoA',list(xLoA.collect()) + print '= xMidA',list(xMidA.collect()) + print '= xMidAPairs',list(xMidAPairs.collect()) + print '= xUnionPairs',list(xUnionPairs.collect()) + print '= xGroupPairs',list(xGroupPairs.collect()) + print '= xGroupSizes',list(xGroupSizes.collect()) + print '= xSumPairs',list(xSumPairs.collect()) + print '= xDistinctPairs',list(xDistinctPairs.collect()) + print '= xDistinctInputs',list(xDistinctInputs.collect()) + print '= count xDistinctInputs',xDistinctInputs.count() + print '= count xDistinctPairs',xDistinctPairs.count() + print '= count xSample',list(xSample.collect()) + print '= xA reduce to sum', xA.reduce(lambda a,b:a+b) + + tmp = [] + xA.foreach(lambda a:tmp.append(a)) + print '= xA copy',tmp + + xMidAPairs.save('tmp.txt') + for line in open('tmp.txt'): + print '= saved:',line.strip() + + print '= docterm',list(docterm.take(10)) + + xDistinctPairs.pprint() + print 'list xDistinctPairs',xDistinctPairs.plan() + + def myPrint(msg,xs): + for x in xs: + print msg,x + myPrint('plan:', xDistinctPairs.plan()) + + myPrint('xDistinctPairs step', xDistinctPairs.steps()) + myPrint('xDistinctPairs task', xDistinctPairs.tasks()) + myPrint('defined view', sc.list()) +