Skip to content

Commit

Permalink
tweaking spyk
Browse files Browse the repository at this point in the history
  • Loading branch information
William Cohen committed Sep 30, 2015
1 parent caa7188 commit 962e7be
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 35 deletions.
20 changes: 9 additions & 11 deletions spyk.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SpykContext(object):
def __init__(self,**kw):
self.planner = guineapig.Planner(**kw)
self.tagCodeIndex = 0
self.cachedViews = set()

def setSerializer(self,setSerializer):
"""Delegate to the SpykContext's planner."""
Expand Down Expand Up @@ -62,9 +63,12 @@ def list(self):

def finalize(self):
"""Declare the SpykRDD and all RDD definitions complete. This must be
called in the __name__=="__main__" part of the code, because
it also executes substeps when called recursively."""
called in the __name__=="__main__" part of the code, after all
transformations have been defined, because it also executes
substeps when called as part of a plan."""
self.planner.setup()
for rdd in self.cachedViews:
rdd.view.opts(stored=True)
if guineapig.Planner.partOfPlan(sys.argv):
self.planner.main(sys.argv)

Expand All @@ -84,17 +88,13 @@ def __init__(self,tag,context,view):
self.context.tagCodeIndex += 1
self.context.planner._setView("%s__%d" % (tag,self.context.tagCodeIndex), view)

#TODO this doesn't work, need to use a different mechanism,
#maybe with a wrapper around plan/execute
def cache(self):
self.view = self.view.opts(stored=True)
"""Mark this as to-be-cached on disk."""
self.context.cachedViews.add(self)
return self

#transformations, which return new SpykRDD's

#TODO
#cogroup

def map(self,mapfun):
"""Analogous to the corresponding Spark transformation."""
return SpykRDD('map', self.context, guineapig.ReplaceEach(self.view,by=mapfun))
Expand Down Expand Up @@ -162,11 +162,8 @@ def countByKey(self):
reducingTo = guineapig.ReduceToCount()))

###############################################################################
#
# actions, which setup(), execute a plan, and retrieve the results.
#
# TODO: include steps to download HDFS output
#
###############################################################################

def collect(self):
Expand All @@ -186,6 +183,7 @@ def _take(self,n):
plan = self.view.storagePlan()
plan.execute(self.context.planner, echo=self.context.planner.opts['echo'])
k = 0
#TODO: download from hdfs if needed
for line in open(self.view.storedFile()):
k += 1
if n<0 or k<=n:
Expand Down
37 changes: 13 additions & 24 deletions testspyk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
if __name__ == "__main__" :

sc = spyk.SpykContext()

xA = sc.textFile('data/xA.txt').map(lambda line:int(line.strip()))
xHiA = xA.filter(lambda r:r>=3)
xLoA = xA.filter(lambda r:r<=7)
Expand All @@ -18,16 +17,19 @@
xDistinctInputs = xUnionPairs.map(lambda (a,b):a)
xDistinctPairs = xUnionPairs.map(lambda (a,b):a).distinct()
xSample = xA.sample(False,0.5)

yA = sc.textFile('data/xA.txt').map(lambda line:int(line.strip()))
yB = yA.filter(lambda n:n%2==1)
yC = yB.map(lambda n:(n,n+1,2*n))
yB.cache()
# triples (id, classList, words)
corpus = sc.textFile('data/mytest.txt') \
.map(lambda line:line.strip().split("\t")) \
.map(lambda parts:(parts[0],parts[1].split(","),parts[2:]))
docterm = corpus.flatMap(lambda (docid,classes,words):[(docid,w) for w in words])

sc.finalize()

if sc.usermain():

print '= xA',list(xA.collect())
print '= xHiA',list(xHiA.collect())
print '= xLoA',list(xLoA.collect())
Expand All @@ -43,26 +45,13 @@
print '= count xDistinctPairs',xDistinctPairs.count()
print '= count xSample',list(xSample.collect())
print '= xA reduce to sum', xA.reduce(lambda a,b:a+b)

tmp = []
xA.foreach(lambda a:tmp.append(a))
print '= xA copy',tmp

print '= docterm',list(docterm.take(10))
aCopy = []
xA.foreach(lambda a:aCopy.append(a))
print '= aCopy',aCopy
xMidAPairs.save('tmp.txt')
midAPairCopy = []
for line in open('tmp.txt'):
print '= saved:',line.strip()

print '= docterm',list(docterm.take(10))

xDistinctPairs.pprint()
print 'list xDistinctPairs',xDistinctPairs.plan()

def myPrint(msg,xs):
for x in xs:
print msg,x
myPrint('plan:', xDistinctPairs.plan())

myPrint('xDistinctPairs step', xDistinctPairs.steps())
myPrint('xDistinctPairs task', xDistinctPairs.tasks())
myPrint('defined view', sc.list())

midAPairCopy.append(line.strip())
print '= midAPairCopy',midAPairCopy

0 comments on commit 962e7be

Please sign in to comment.