diff --git a/TODO.txt b/TODO.txt index 9d83ef3..7f3437e 100644 --- a/TODO.txt +++ b/TODO.txt @@ -9,13 +9,6 @@ that it would be better with more asynchronous behavior. mrs_gp.py is the working version. TODO for mrs_gp.py: - - restrict opened files to a subdirectory - "./" + foo, forbid ".." -done - - add --port option, default port 8000? -done - - check if input file is a directory or not, and write to a file, not directory, if --numReduceTasks = 1, - so it can be used with regular gp - done - - - test with gpig - done - - fix - gpig prefixes dirs in gpfs with / because it likes viewdir/foo.gp - test on processes with errors, .... - os.times() - can this show children's time/CPU pct so I can see actual load? - server timeout, for security, in case I leave it running...? @@ -28,19 +21,20 @@ ROBUSTNESS/ERRORS/DOCS: -- document it needs unix, python 2.7 -- document lack of UDF's -FUNCTIONALITY - to add +FUNCTIONALITY to add: - - add ComputedView(fun=f, shell=sh, inputs=[], ship=[]) --- just invokes a python/shell command....do I need any of the - StreamingMapReduce stuff below if I add this? I guess if I will allow optimizations.... - - add StreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', sideviews=..., shipping=[f1,..,fk]) - -- opt(1) - --- define class StreamingMapReduce(MapReduce): stores the commands - --- make this a special case for the compiler - abstract compiler _coreCommand(self,step,gp) would just be mapper-command or reduce-command - -- opt(2) + - allow --make as synonym for --store + - write and document an efficient Count (see mrs_test/mrs-naive-bayes.py) + - OutputOfShellCommand(command=[],sideviews=[],shipping=[]) + - OutputOfStreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', sideviews=..., shipping=[f1,..,fk]) --- define class StreamingMapReduce(MapReduce): which knows how to doExternalMap and doExternalReduce, etc via subprocesses --- this could be a gpextras view + subclass of Augment, where inners=[], and rowGenerator runs the + shell command as subprocess, stores the result, and then uses the + ReadLines generator to produce the result + - add user-defined Reuse(FILE) ? (why do I want this again? for make-style pipelines? is it really needed?) - gpextras, for debugging: -- PPrint? @@ -57,39 +51,4 @@ DOCS: - some longer examples for the tutorial (phirl-naive? tfidfs?) - document planner.ship, planner.setEvaluator -TODO - MAJOR - -- a GPig.registerCompiler('key',factoryClass), for adding new targets other than hadoop? - - compiler for marime.py map-reducer with ramdisks (note: diskutil erasevolume HFS+ 'RAMDisk' `hdiutil attach -nomount ram://10315776`, - size is in 2048-byte blocks) - -- multithreading ideas - - 1. for CPU intensive steps, include multithreaded ReplaceEach and Flatten ops, - which sets up a work queue, and for each row, adds a task, and - removes any completed tasks. - - 2. add another compiler, which generates marime/mime.py steps. - - 3. Implement: marime mr -m MAP -r RED -i IN -o OUT -k K - and: marime putsplit -i IN -o OUT -k K - marime getmerge -i IN -o OUT - where in and out are ramdisk directories - - map setup if there are J files in I: - K queues, Qi for shard i - ie {k,v : H(k)%K == i} - J processes Pj, each will run 'MAP < in/shard.j | ...' -- or could use threads (subprocesses would be more modular) - J threads to read from Pj.stdout and route (k,v) to appropriate Qi - K threads, each to process inputs from one queue, into a shardBuffer - - when all map processes are done: - K subprocesses, Ri, to run '... | RED > out/shard.k' -- or could use threads (subprocesses more modular) - K threads to print from the shardBuffer to Ri - - - DESCRIBE(...) - could be just a pretty_print? - - - ILLUSTRATE(view,[outputs]) - using definition of view, select the - inputs from the innerviews that produce those outputs. Then, do that - recursively to get a test case. - diff --git a/mrs_test/Makefile b/mrs_test/Makefile index 5e46087..a76507d 100644 --- a/mrs_test/Makefile +++ b/mrs_test/Makefile @@ -122,7 +122,7 @@ parallel-events: time $(MRS) --task --input unsharded --output gpfs:sharded --mapper cat --reducer cat --numReduceTasks $(K) echo training naive bayes in parallel time $(MRS) --task --input gpfs:sharded --output gpfs:events --numReduceTasks $(K) --async $(ASYNC) \ - --mapper 'python streamNaiveBayesLearner.py --streamTrain 100' \ + --mapper 'python streamNaiveBayesLearner.py --streamTrain 0' \ --reducer 'python sum-events.py' echo time $(MRS) --task --input gpfs:sharded --output gpfs:events --mapper 'python streamNaiveBayesLearner.py --streamTrain 100' --async $(ASYNC) echo export diff --git a/mrs_test/mrs-naive-bayes.py b/mrs_test/mrs-naive-bayes.py new file mode 100644 index 0000000..866f8ec --- /dev/null +++ b/mrs_test/mrs-naive-bayes.py @@ -0,0 +1,32 @@ +from guineapig import * +import sys +import gpextras + +def partitionCounter(rows): + n = 0 + for r in rows: n+= 1 + yield n + +class NaiveBayes(Planner): + + D = GPig.getArgvParams(required=['train']) + def sumEventCounts(v): + return Group(v, by=lambda (e,n):e, retaining=lambda (e,n):n, reducingTo=ReduceToSum(), combiningTo=ReduceToSum()) + def count(v,tag): + return ReplaceEachPartition(v, by=partitionCounter) | Group(by=lambda n:tag, reducingTo=ReduceToSum()) + + #format: (id,classLabelList,tokenList) + data = ReadLines(D['train']) \ + | ReplaceEach(by=lambda line:line.strip().split("\t")) \ + | ReplaceEach(by=lambda parts:(parts[0],parts[1].split(","),parts[2:])) + labelEvents = sumEventCounts(Flatten(data, by=lambda (docid,ys,ws): [(y,1) for y in ys])) + wordLabelEvents = sumEventCounts(Flatten(data, by=lambda (docid,ys,ws): [(y+'/'+w,1) for y in ys for w in ws])) + totalLines = count(data,'#lines') + totalWords = count(Flatten(data, lambda (docid,ys,ws): ws), '#words') + +# always end like this +if __name__ == "__main__": + p = NaiveBayes() + p.registerCompiler('mrs',gpextras.MRSCompiler) + p.main(sys.argv) + diff --git a/mrs_test/setup-mrs-server.bash b/mrs_test/setup-mrs-server.bash index 6d3f869..be92efb 100644 --- a/mrs_test/setup-mrs-server.bash +++ b/mrs_test/setup-mrs-server.bash @@ -3,5 +3,5 @@ export GPHOME=`(cd ..;pwd)` export PYTHONPATH=$GPHOME export GP_MRS_COMMAND="python $GPHOME/mrs_gp.py --task" rm -f server.log -python $GPHOME/mrs_gp.py --serve & +pypy $GPHOME/mrs_gp.py --serve & diff --git a/mrs_test/setup-mrs.bash b/mrs_test/setup-mrs.bash new file mode 100644 index 0000000..600e7c2 --- /dev/null +++ b/mrs_test/setup-mrs.bash @@ -0,0 +1,2 @@ +export PYTHONPATH=$PYTHONPATH:~/shared-home/code/GuineaPig +alias mrs='pypy ~/shared-home/code/GuineaPig/mrs_gp.py'