Skip to content

Commit

Permalink
bug fix in augment/sideviews and default path for opencloud
Browse files Browse the repository at this point in the history
  • Loading branch information
William Cohen committed Sep 24, 2015
1 parent aa42697 commit 4e1c581
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 13 deletions.
1 change: 1 addition & 0 deletions TODO.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ TODO - priorities

FUNCTIONALITY

- get rid of the KeyError for improper usage of params
- option(storedIn=FILE) - so you can retrieve and store work on s3
- add Stream(view1, through='shell command', shipping=[f1,..,fk])
- add StreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', shipping=[f1,..,fk])
Expand Down
9 changes: 9 additions & 0 deletions data/a.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
w,aw
x,ax
akey,akey-val
y,ay1
y,ay2
akey2,akey2-val1
akey2,akey2-val2
z,az1
z,az2
9 changes: 9 additions & 0 deletions data/a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
akey akey-val
akey2 akey2-val1
akey2 akey2-val2
w aw
x ax
y ay1
y ay2
z az1
z az2
9 changes: 9 additions & 0 deletions data/b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
bkey bkey-val
bkey2 bkey2-val1
bkey2 bkey2-val2
w bw1
w bw2
x bx
y by
z bz1
z bz2
5 changes: 5 additions & 0 deletions data/c.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
w cw
x cx1
x cx2
z cz

1 change: 1 addition & 0 deletions data/d.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5
10 changes: 10 additions & 0 deletions data/mytest.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
r1000.txt acq a again already amount acquired acquisition alameda
r10011.txt crude,nat-gas agency billion billion billion billion canada canadian canadian
r10015.txt veg-oil a added added against agricultural annual begins begins
r10037.txt earn cts cts cts cts mln mln mln mln
r10038.txt gas allow barrels barrels barrels basis beaumont beaumont beaumont
r1003.txt earn accounting adjustments august before company company cts cts
r10049.txt acq abuse abuse access access across actions advocated advocated
r10053.txt earn available cts cts cts data loss loss loss
r10057.txt earn already also april capital commonwealth commonwealth commonwealth commonwealth
r10058.txt acq acquired acquired acquisition alameda alameda alameda alameda california
9 changes: 9 additions & 0 deletions data/xA.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
0
1
2
3
4
6
7
8
9
23 changes: 15 additions & 8 deletions guineapig.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ class GPig(object):
SORT_COMMAND = 'LC_COLLATE=C sort' # use standard ascii ordering, not locale-specific one
HADOOP_LOC = 'hadoop' # assume hadoop is on the path at planning time
MY_LOC = 'guineapig.py' # the name of this file
VERSION = '1.3.2'
VERSION = '1.3.3'
COPYRIGHT = '(c) William Cohen 2014,2015'

#Global options for Guinea Pig can be passed in with the --opts
#command-line option, and these are the default values
#The location of the streaming jar is a special case,
#in that it's also settable via an environment variable.
defaultJar = '/home/hadoop/contrib/streaming/hadoop-streaming.jar'
#defaultJar = '/home/hadoop/contrib/streaming/hadoop-streaming.jar'
defaultJar = '/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar'
envjar = os.environ.get('GP_STREAMJAR', defaultJar)
defaultViewDir = 'gpig_views'
envViewDir = os.environ.get('GP_VIEWDIR',defaultViewDir )
Expand Down Expand Up @@ -103,7 +104,7 @@ def onlyRowOf(view):

@staticmethod
class SafeEvaluator(object):
"""Evaluates expressions that correspond to serialized guinea pig rows."""
"""Evaluates expressions that correzpond to serialized guinea pig rows."""
def __init__(self,restrictedBindings={}):
self.restrictedBindings = restrictedBindings
def eval(self,s):
Expand Down Expand Up @@ -267,6 +268,8 @@ def nonrecursiveStoragePlan(self):
plan = Plan()
plan.includeStepsOf(self.checkpointPlan())
plan.append(TransformStep(view=self,whatToDo='doStoreRows',srcs=[self.checkpoint()],dst=self.storedFile(),why=self.explanation()))
if self.storeMe=='distributedCache':
plan.append(DistributeStep(self))
return plan

def applyDict(self,mapping,innerviewsOnly=False):
Expand Down Expand Up @@ -337,6 +340,7 @@ def pprint(self,depth=0,alreadyPrinted=None,sideview=False):
print tabStr + sideviewIndicator + tagStr + ' = ' + '...'
else:
sideviewInfo = " sideviews: {"+",".join(map(lambda x:x.tag, self.sideviews))+"}" if self.sideviews else ""
sideviewInfo += " *sideviews: {"+",".join(map(lambda x:x.tag, self.sideviewsNeeded()))+"}" if self.sideviewsNeeded() else ""
print tabStr + sideviewIndicator + tagStr + ' = ' + str(self) + sideviewInfo
alreadyPrinted.add(self.tag)
for inner in self.inners:
Expand Down Expand Up @@ -517,6 +521,7 @@ def __init__(self,inner=None,sideviews=None,sideview=None,loadedBy=lambda v:list

def enforceStorageConstraints(self):
for sv in self.sideviews:
logging.info('marking '+sv.tag+' to be placed in the distributedCache')
sv.storeMe = 'distributedCache'

def rowGenerator(self):
Expand All @@ -528,10 +533,9 @@ def checkpointPlan(self):
plan = Plan()
plan.includeStepsOf(self.inner.checkpointPlan())
#the sideviews should have been stored by the top-level
#planner already, but they will need to be moved to a
#distributable location
for sv in self.sideviews:
plan.append(DistributeStep(sv))
#planner already, and distributed, if marked as storeMe==distributedCache
#for sv in self.sideviews:
# plan.append(DistributeStep(sv))
return plan

def explanation(self):
Expand Down Expand Up @@ -891,7 +895,7 @@ def explain(self):
#

class DistributeStep(Step):
"""Prepare a stored view for the dDistributed cache."""
"""Prepare a stored view for the distributed cache."""

def __init__(self,view):
Step.__init__(self,view)
Expand Down Expand Up @@ -1109,6 +1113,7 @@ def simpleMapCommands(self,task,gp,mapCom,src,dst):
assert src,'Wrap not supported for hadoop'
hcom = self.HadoopCommandBuf(gp,task)
hcom.extendDef('-D','mapred.reduce.tasks=0')
hcom.extend('-cmdenv','PYTHONPATH=.')
hcom.extend('-input',src,'-output',dst)
hcom.extend("-mapper '%s'" % mapCom)
return [ self._hadoopCleanCommand(gp,dst), hcom.asEcho(), hcom.asString() ]
Expand All @@ -1128,6 +1133,7 @@ def midi(i): return midpoint + '-' + str(i)
for i in range(len(srcs)):
hcom = self.HadoopCommandBuf(gp,task)
hcom.extendDef('-D','mapred.reduce.tasks=%d' % gp.opts['parallel'])
hcom.extend('-cmdenv','PYTHONPATH=.')
hcom.extend('-input',srcs[i], '-output',midi(i))
hcom.extend("-mapper","'%s'" % mapComs[i])
subplan += [ self._hadoopCleanCommand(gp,midi(i)), hcom.asEcho(), hcom.asString() ]
Expand All @@ -1137,6 +1143,7 @@ def midi(i): return midpoint + '-' + str(i)
hcombineCom.extendDef('-jobconf','num.key.fields.for.partition=1')
for i in range(len(srcs)):
hcombineCom.extend('-input',midi(i))
hcombineCom.extend('-cmdenv','PYTHONPATH=.')
hcombineCom.extend('-output',dst)
hcombineCom.extend('-mapper','cat')
hcombineCom.extend('-reducer',"'%s'" % reduceCom)
Expand Down
9 changes: 4 additions & 5 deletions tutorial/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

class TFIDF(Planner):

idDoc = ReadLines("idcorpus.txt") | Map(by=lambda line:line.strip().split("\t"))
D = GPig.getArgvParams()
idDoc = ReadLines(D.get('corpus','idcorpus.txt')) | Map(by=lambda line:line.strip().split("\t"))
idWords = Map(idDoc, by=lambda (docid,doc): (docid,doc.lower().split()))
data = FlatMap(idWords, by=lambda (docid,words): map(lambda w:(docid,w),words))

Expand All @@ -22,17 +23,15 @@ class TFIDF(Planner):

udocvec1 = Join( Jin(data,by=lambda(docid,term):term), Jin(docFreq,by=lambda(term,df):term) )
udocvec2 = Map(udocvec1, by=lambda((docid,term1),(term2,df)):(docid,term1,df))
udocvec3 = Join( Jin(udocvec2,by=lambda row:'const'), Jin(ndoc,by=lambda row:'const'))
udocvec3 = Augment(udocvec2, sideview=ndoc, loadedBy=lambda v:GPig.onlyRowOf(v))
udocvec = Map(udocvec3, by=lambda((docid,term,df),(dummy,ndoc)):(docid,term,math.log(ndoc/df)))

sumSquareWeights = ReduceTo(float, lambda accum,(docid,term,weight): accum+weight*weight)

norm = Group( udocvec, by=lambda(docid,term,weight):docid,
retaining=lambda(docid,term,weight):weight*weight,
reducingTo=ReduceToSum() )

docvec = Join( Jin(norm,by=lambda(docid,z):docid), Jin(udocvec,by=lambda(docid,term,weight):docid) ) \
| Map( by=lambda((docid1,z),(docid2,term,weight)): (docid1,term,weight/math.sqrt(z)) )
| Map( by=lambda((docid1,z),(docid2,term,weight)): (docid1,term,weight/math.sqrt(z)) )

# always end like this
if __name__ == "__main__":
Expand Down

0 comments on commit 4e1c581

Please sign in to comment.