diff --git a/TODO.txt b/TODO.txt index 81a5765..d6151d0 100644 --- a/TODO.txt +++ b/TODO.txt @@ -2,6 +2,7 @@ TODO - priorities FUNCTIONALITY + - get rid of the KeyError for improper usage of params - option(storedIn=FILE) - so you can retrieve and store work on s3 - add Stream(view1, through='shell command', shipping=[f1,..,fk]) - add StreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', shipping=[f1,..,fk]) diff --git a/data/a.csv b/data/a.csv new file mode 100644 index 0000000..8a473d4 --- /dev/null +++ b/data/a.csv @@ -0,0 +1,9 @@ +w,aw +x,ax +akey,akey-val +y,ay1 +y,ay2 +akey2,akey2-val1 +akey2,akey2-val2 +z,az1 +z,az2 diff --git a/data/a.txt b/data/a.txt new file mode 100644 index 0000000..f89b33b --- /dev/null +++ b/data/a.txt @@ -0,0 +1,9 @@ +akey akey-val +akey2 akey2-val1 +akey2 akey2-val2 +w aw +x ax +y ay1 +y ay2 +z az1 +z az2 diff --git a/data/b.txt b/data/b.txt new file mode 100644 index 0000000..a62705d --- /dev/null +++ b/data/b.txt @@ -0,0 +1,9 @@ +bkey bkey-val +bkey2 bkey2-val1 +bkey2 bkey2-val2 +w bw1 +w bw2 +x bx +y by +z bz1 +z bz2 diff --git a/data/c.txt b/data/c.txt new file mode 100644 index 0000000..91d6769 --- /dev/null +++ b/data/c.txt @@ -0,0 +1,5 @@ +w cw +x cx1 +x cx2 +z cz + diff --git a/data/d.txt b/data/d.txt new file mode 100644 index 0000000..7ed6ff8 --- /dev/null +++ b/data/d.txt @@ -0,0 +1 @@ +5 diff --git a/data/mytest.txt b/data/mytest.txt new file mode 100644 index 0000000..33fa896 --- /dev/null +++ b/data/mytest.txt @@ -0,0 +1,10 @@ +r1000.txt acq a again already amount acquired acquisition alameda +r10011.txt crude,nat-gas agency billion billion billion billion canada canadian canadian +r10015.txt veg-oil a added added against agricultural annual begins begins +r10037.txt earn cts cts cts cts mln mln mln mln +r10038.txt gas allow barrels barrels barrels basis beaumont beaumont beaumont +r1003.txt earn accounting adjustments august before company company cts cts +r10049.txt acq abuse abuse access access across actions advocated advocated +r10053.txt earn available cts cts cts data loss loss loss +r10057.txt earn already also april capital commonwealth commonwealth commonwealth commonwealth +r10058.txt acq acquired acquired acquisition alameda alameda alameda alameda california diff --git a/data/xA.txt b/data/xA.txt new file mode 100644 index 0000000..60f2ab9 --- /dev/null +++ b/data/xA.txt @@ -0,0 +1,9 @@ +0 +1 +2 +3 +4 +6 +7 +8 +9 diff --git a/guineapig.py b/guineapig.py index 4c75460..c71e617 100644 --- a/guineapig.py +++ b/guineapig.py @@ -24,14 +24,15 @@ class GPig(object): SORT_COMMAND = 'LC_COLLATE=C sort' # use standard ascii ordering, not locale-specific one HADOOP_LOC = 'hadoop' # assume hadoop is on the path at planning time MY_LOC = 'guineapig.py' # the name of this file - VERSION = '1.3.2' + VERSION = '1.3.3' COPYRIGHT = '(c) William Cohen 2014,2015' #Global options for Guinea Pig can be passed in with the --opts #command-line option, and these are the default values #The location of the streaming jar is a special case, #in that it's also settable via an environment variable. - defaultJar = '/home/hadoop/contrib/streaming/hadoop-streaming.jar' + #defaultJar = '/home/hadoop/contrib/streaming/hadoop-streaming.jar' + defaultJar = '/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar' envjar = os.environ.get('GP_STREAMJAR', defaultJar) defaultViewDir = 'gpig_views' envViewDir = os.environ.get('GP_VIEWDIR',defaultViewDir ) @@ -103,7 +104,7 @@ def onlyRowOf(view): @staticmethod class SafeEvaluator(object): - """Evaluates expressions that correspond to serialized guinea pig rows.""" + """Evaluates expressions that correzpond to serialized guinea pig rows.""" def __init__(self,restrictedBindings={}): self.restrictedBindings = restrictedBindings def eval(self,s): @@ -267,6 +268,8 @@ def nonrecursiveStoragePlan(self): plan = Plan() plan.includeStepsOf(self.checkpointPlan()) plan.append(TransformStep(view=self,whatToDo='doStoreRows',srcs=[self.checkpoint()],dst=self.storedFile(),why=self.explanation())) + if self.storeMe=='distributedCache': + plan.append(DistributeStep(self)) return plan def applyDict(self,mapping,innerviewsOnly=False): @@ -337,6 +340,7 @@ def pprint(self,depth=0,alreadyPrinted=None,sideview=False): print tabStr + sideviewIndicator + tagStr + ' = ' + '...' else: sideviewInfo = " sideviews: {"+",".join(map(lambda x:x.tag, self.sideviews))+"}" if self.sideviews else "" + sideviewInfo += " *sideviews: {"+",".join(map(lambda x:x.tag, self.sideviewsNeeded()))+"}" if self.sideviewsNeeded() else "" print tabStr + sideviewIndicator + tagStr + ' = ' + str(self) + sideviewInfo alreadyPrinted.add(self.tag) for inner in self.inners: @@ -517,6 +521,7 @@ def __init__(self,inner=None,sideviews=None,sideview=None,loadedBy=lambda v:list def enforceStorageConstraints(self): for sv in self.sideviews: + logging.info('marking '+sv.tag+' to be placed in the distributedCache') sv.storeMe = 'distributedCache' def rowGenerator(self): @@ -528,10 +533,9 @@ def checkpointPlan(self): plan = Plan() plan.includeStepsOf(self.inner.checkpointPlan()) #the sideviews should have been stored by the top-level - #planner already, but they will need to be moved to a - #distributable location - for sv in self.sideviews: - plan.append(DistributeStep(sv)) + #planner already, and distributed, if marked as storeMe==distributedCache + #for sv in self.sideviews: + # plan.append(DistributeStep(sv)) return plan def explanation(self): @@ -891,7 +895,7 @@ def explain(self): # class DistributeStep(Step): - """Prepare a stored view for the dDistributed cache.""" + """Prepare a stored view for the distributed cache.""" def __init__(self,view): Step.__init__(self,view) @@ -1109,6 +1113,7 @@ def simpleMapCommands(self,task,gp,mapCom,src,dst): assert src,'Wrap not supported for hadoop' hcom = self.HadoopCommandBuf(gp,task) hcom.extendDef('-D','mapred.reduce.tasks=0') + hcom.extend('-cmdenv','PYTHONPATH=.') hcom.extend('-input',src,'-output',dst) hcom.extend("-mapper '%s'" % mapCom) return [ self._hadoopCleanCommand(gp,dst), hcom.asEcho(), hcom.asString() ] @@ -1128,6 +1133,7 @@ def midi(i): return midpoint + '-' + str(i) for i in range(len(srcs)): hcom = self.HadoopCommandBuf(gp,task) hcom.extendDef('-D','mapred.reduce.tasks=%d' % gp.opts['parallel']) + hcom.extend('-cmdenv','PYTHONPATH=.') hcom.extend('-input',srcs[i], '-output',midi(i)) hcom.extend("-mapper","'%s'" % mapComs[i]) subplan += [ self._hadoopCleanCommand(gp,midi(i)), hcom.asEcho(), hcom.asString() ] @@ -1137,6 +1143,7 @@ def midi(i): return midpoint + '-' + str(i) hcombineCom.extendDef('-jobconf','num.key.fields.for.partition=1') for i in range(len(srcs)): hcombineCom.extend('-input',midi(i)) + hcombineCom.extend('-cmdenv','PYTHONPATH=.') hcombineCom.extend('-output',dst) hcombineCom.extend('-mapper','cat') hcombineCom.extend('-reducer',"'%s'" % reduceCom) diff --git a/tutorial/tfidf.py b/tutorial/tfidf.py index b235fd1..06ab267 100644 --- a/tutorial/tfidf.py +++ b/tutorial/tfidf.py @@ -7,7 +7,8 @@ class TFIDF(Planner): - idDoc = ReadLines("idcorpus.txt") | Map(by=lambda line:line.strip().split("\t")) + D = GPig.getArgvParams() + idDoc = ReadLines(D.get('corpus','idcorpus.txt')) | Map(by=lambda line:line.strip().split("\t")) idWords = Map(idDoc, by=lambda (docid,doc): (docid,doc.lower().split())) data = FlatMap(idWords, by=lambda (docid,words): map(lambda w:(docid,w),words)) @@ -22,17 +23,15 @@ class TFIDF(Planner): udocvec1 = Join( Jin(data,by=lambda(docid,term):term), Jin(docFreq,by=lambda(term,df):term) ) udocvec2 = Map(udocvec1, by=lambda((docid,term1),(term2,df)):(docid,term1,df)) - udocvec3 = Join( Jin(udocvec2,by=lambda row:'const'), Jin(ndoc,by=lambda row:'const')) + udocvec3 = Augment(udocvec2, sideview=ndoc, loadedBy=lambda v:GPig.onlyRowOf(v)) udocvec = Map(udocvec3, by=lambda((docid,term,df),(dummy,ndoc)):(docid,term,math.log(ndoc/df))) - sumSquareWeights = ReduceTo(float, lambda accum,(docid,term,weight): accum+weight*weight) - norm = Group( udocvec, by=lambda(docid,term,weight):docid, retaining=lambda(docid,term,weight):weight*weight, reducingTo=ReduceToSum() ) docvec = Join( Jin(norm,by=lambda(docid,z):docid), Jin(udocvec,by=lambda(docid,term,weight):docid) ) \ - | Map( by=lambda((docid1,z),(docid2,term,weight)): (docid1,term,weight/math.sqrt(z)) ) + | Map( by=lambda((docid1,z),(docid2,term,weight)): (docid1,term,weight/math.sqrt(z)) ) # always end like this if __name__ == "__main__":