Skip to content

Commit

Permalink
more complete version of spyk
Browse files Browse the repository at this point in the history
  • Loading branch information
William Cohen committed Sep 30, 2015
1 parent b047297 commit caa7188
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 23 deletions.
3 changes: 3 additions & 0 deletions guineapig.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,9 @@ def setEvaluator(self,rowEvaluator):
# rest of the API for the planner
#

def serialize(self,row):
return self._serializer.toString(row)

@staticmethod
def partOfPlan(argv):
"""True if the command line was generated as part of a storage plan."""
Expand Down
152 changes: 129 additions & 23 deletions spyk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,58 @@
import random

class SpykContext(object):
"""Analogous to a SparkContext, this allows a Spark-like syntax for
GuineaPig programs. A sample program would be:
import spyk
if __name__ == "__main__" :
sc = spyk.SpykContext()
#view definitions, using sc.textFile(), rdd.map(...) and other transformations
...
sc.finalize()
if sc.usermain():
#actions, eg collect(), take(n), first(), count(), ...
...
You cannot interleave actions and transformations - all
transformations need to be defined before finalize() is
called, and all actions afterward.
Corrently, actions can only be executed locally,
i.e., with target=shell.
"""

def __init__(self,**kw):
self.planner = guineapig.Planner(**kw)
self.tagCodeIndex = 0

#TODO setSerializer, setEvaluator, ship

def setSerializer(self,setSerializer):
"""Delegate to the SpykContext's planner."""
self.planner.setSerializer(setSerializer)

def setEvaluator(self,setEvaluator):
"""Delegate to the SpykContext's planner."""
self.planner.setEvaluator(setEvaluator)

def ship(self,fileName):
"""Delegate to the SpykContext's planner."""
self.planner.ship(fileName)

#returns a SpykRDD
def textFile(self,fileName):
"""Return a SpykRDD that contains the lines in the textfile."""
rdd = SpykRDD('textFile', self, guineapig.ReadLines(fileName))
return rdd

def wholeTextFiles(self,dirName):
#TODO find this in royals, and make it a gpextra
pass
assert False,'not implemented!'

#not in spark

def list(self):
"""Return a list of the names of all defined views."""
return self.planner.listViewNames()

def finalize(self):
"""Declare the SpykRDD and all RDD definitions complete. This must be
Expand All @@ -32,12 +69,15 @@ def finalize(self):
self.planner.main(sys.argv)

def usermain(self):
"""Use this in an if statement before any Spyk actions."""
"""Use this in an if statement in the __main__ of a code,
before any Spyk actions, but after all transformations have been
defined.."""
return not guineapig.Planner.partOfPlan(sys.argv)

class SpykRDD(object):

def __init__(self,tag,context,view):
"""Should not be called directly by users."""
self.view = view
self.context = context
self.view.planner = context.planner
Expand All @@ -53,68 +93,92 @@ def cache(self):
#transformations, which return new SpykRDD's

#TODO
#union
#intersection - gpextra?
# ... and for keyed views only
#cogroup

def map(self,mapfun):
return SpykRDD('map',self.context, guineapig.ReplaceEach(self.view,by=mapfun))
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('map', self.context, guineapig.ReplaceEach(self.view,by=mapfun))

def flatMap(self,mapfun):
return SpykRDD('flatMap',self.context, guineapig.Flatten(self.view,by=mapfun))
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('flatMap', self.context, guineapig.Flatten(self.view,by=mapfun))

def groupByKey(self):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('groupByKey',
self.context,
guineapig.Group(self.view,
by=lambda (key,val):key,
retaining=lambda (key,val):val))

def reduceByKey(self,initValue,reduceOp):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('reduceByKey',
self.context,
guineapig.Group(self.view,
by=lambda (key,val):key,
retaining=lambda (key,val):val,
reducingTo=guineapig.ReduceTo(initValue,reduceOp)))
reducingTo=guineapig.ReduceTo(lambda:initValue,reduceOp)))
def filter(self,filterfun):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('filter',self.context, guineapig.Filter(self.view,by=filterfun))

def sample(self,withReplacement,fraction):
"""Analogous to the corresponding Spark transformation, but defined only when withReplacement==False"""
assert not withReplacement, 'sampling with replacement is not implemented'
return SpykRDD('sample',self.context, guineapig.Filter(self.view,by=lambda x:1 if random.random()<fraction else 0))

def union(self,rdd):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('union', self.context, guineapig.Union(self.view,rdd.view))

def intersection(self,rdd):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('intersection',
self.context,
(guineapig.Join(guineapig.Jin(self.view,by=lambda row:row),
guineapig.Jin(rdd.view,by=lambda row:row))
| guineapig.ReplaceEach(by=lambda (row1,row2):row1)))

def join(self,rdd):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('join',
self.context,
(guineapig.Join(guineapig.Jin(self.view,by=lambda (k,v):k),
guineapig.Jin(rdd.view,by=lambda (k,v):k)) \
| guineapig.ReplaceEach(by=lambda ((k1,v1),(k2,v2)):(k1,(v1,v2)))))

def distinct(self):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('distinct', self.context, guineapig.Distinct(self.view))


#TODO
#actions, which setup(), store and return a python data structure
#can setup() be called more than once? note we need to initialize
#the argument view as part of sc.planner, and I guess mark it as
#re-useable.

#reduce
#save(path)
#countByKey
#foreach
def countByKey(self):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('countByKey',
self.context,
guineapig.Group(self.view,
by=lambda (k,v):k,
reducingTo = guineapig.ReduceToCount()))

###############################################################################
#
# actions, which setup(), execute a plan, and retrieve the results.
#
# TODO: include steps to download HDFS output
#
###############################################################################

def collect(self):
"""Returns a generator."""
"""Action which returns a generator of the rows in this transformation."""
for x in self._take(-1): yield x

def take(self,n):
"""Action which returns the first n things produced by collect()."""
return list(self._take(n))

def first(self):
"""Action which returns the first thing produced by collect()."""
return list(self.take(1))[0]

def _take(self,n):
Expand All @@ -127,14 +191,56 @@ def _take(self,n):
if n<0 or k<=n:
yield self.view.planner._serializer.fromString(line.strip())

def foreach(self,function):
"""Action which applies function(row) to each row produced by collect()."""
for row in self.collect():
function(row)

def reduce(self,reduceFunction):
"""Action which applies the pairwise reduction to each row produced by collect()."""
accum = None
for row in self.collect():
accum = reduceFunction(accum,row) if accum else row
return accum

def count(self):
"""Action which counts the number of rows produced by collect()."""
plan = self.view.storagePlan()
plan.execute(self.context.planner, echo=self.context.planner.opts['echo'])
n = 0
for line in open(self.view.storedFile()):
n += 1
return n

#debug - not in spark
#pprint, tasks, plan, list
def save(self,path):
"""Action which saves the rows produced by collect() in a local file."""
fp = open(path,'w')
for row in self.collect():
str = self.context.planner.serialize(row)
fp.write(str + '\n')
fp.close()

#not in spark

def store(self):
"""Execute the storage plan for this view, like Guineapig's --store option."""
self.view.storagePlan().execute(self.context.planner)

def pprint(self):
"""Print the underlying Guineapig view."""
self.view.pprint()

def steps(self):
"""Return list of steps in the storage plan for this view, like Guineapig's --steps option."""
return self.view.storagePlan().steps

def tasks(self):
"""Return list of tasks in the storage plan for this view, like Guineapig's --tasks option."""
p = self.view.storagePlan()
p.buildTasks()
return p.tasks

def plan(self):
"""Return list of shell commands in the storage plan for this view, like Guineapig's --plan option."""
return self.view.storagePlan().compile(self.context.planner)

68 changes: 68 additions & 0 deletions testspyk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import spyk

if __name__ == "__main__" :

sc = spyk.SpykContext()

xA = sc.textFile('data/xA.txt').map(lambda line:int(line.strip()))
xHiA = xA.filter(lambda r:r>=3)
xLoA = xA.filter(lambda r:r<=7)
xMidA = xHiA.intersection(xLoA)
xHiAPairs = xHiA.map(lambda r:(r,2*r))
xLoAPairs = xLoA.map(lambda r:(r,3*r))
xMidAPairs = xHiAPairs.join(xLoAPairs)
xUnionPairs = xHiAPairs.union(xLoAPairs)
xGroupPairs = xUnionPairs.groupByKey()
xGroupSizes = xUnionPairs.countByKey()
xSumPairs = xUnionPairs.reduceByKey(0,lambda accum,x:accum+x)
xDistinctInputs = xUnionPairs.map(lambda (a,b):a)
xDistinctPairs = xUnionPairs.map(lambda (a,b):a).distinct()
xSample = xA.sample(False,0.5)

# triples (id, classList, words)
corpus = sc.textFile('data/mytest.txt') \
.map(lambda line:line.strip().split("\t")) \
.map(lambda parts:(parts[0],parts[1].split(","),parts[2:]))
docterm = corpus.flatMap(lambda (docid,classes,words):[(docid,w) for w in words])

sc.finalize()

if sc.usermain():
print '= xA',list(xA.collect())
print '= xHiA',list(xHiA.collect())
print '= xLoA',list(xLoA.collect())
print '= xMidA',list(xMidA.collect())
print '= xMidAPairs',list(xMidAPairs.collect())
print '= xUnionPairs',list(xUnionPairs.collect())
print '= xGroupPairs',list(xGroupPairs.collect())
print '= xGroupSizes',list(xGroupSizes.collect())
print '= xSumPairs',list(xSumPairs.collect())
print '= xDistinctPairs',list(xDistinctPairs.collect())
print '= xDistinctInputs',list(xDistinctInputs.collect())
print '= count xDistinctInputs',xDistinctInputs.count()
print '= count xDistinctPairs',xDistinctPairs.count()
print '= count xSample',list(xSample.collect())
print '= xA reduce to sum', xA.reduce(lambda a,b:a+b)

tmp = []
xA.foreach(lambda a:tmp.append(a))
print '= xA copy',tmp

xMidAPairs.save('tmp.txt')
for line in open('tmp.txt'):
print '= saved:',line.strip()

print '= docterm',list(docterm.take(10))

xDistinctPairs.pprint()
print 'list xDistinctPairs',xDistinctPairs.plan()

def myPrint(msg,xs):
for x in xs:
print msg,x
myPrint('plan:', xDistinctPairs.plan())

myPrint('xDistinctPairs step', xDistinctPairs.steps())
myPrint('xDistinctPairs task', xDistinctPairs.tasks())
myPrint('defined view', sc.list())

0 comments on commit caa7188

Please sign in to comment.