Skip to content

Commit

Permalink
a new test case and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
wwcohen committed Dec 3, 2015
1 parent dfa2100 commit 2510598
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 52 deletions.
59 changes: 9 additions & 50 deletions TODO.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ that it would be better with more asynchronous behavior. mrs_gp.py is
the working version.

TODO for mrs_gp.py:
- restrict opened files to a subdirectory - "./" + foo, forbid ".." -done
- add --port option, default port 8000? -done
- check if input file is a directory or not, and write to a file, not directory, if --numReduceTasks = 1,
so it can be used with regular gp - done

- test with gpig - done
- fix - gpig prefixes dirs in gpfs with / because it likes viewdir/foo.gp
- test on processes with errors, ....
- os.times() - can this show children's time/CPU pct so I can see actual load?
- server timeout, for security, in case I leave it running...?
Expand All @@ -28,19 +21,20 @@ ROBUSTNESS/ERRORS/DOCS:
-- document it needs unix, python 2.7
-- document lack of UDF's

FUNCTIONALITY - to add
FUNCTIONALITY

to add:
- add ComputedView(fun=f, shell=sh, inputs=[], ship=[]) --- just invokes a python/shell command....do I need any of the
StreamingMapReduce stuff below if I add this? I guess if I will allow optimizations....
- add StreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', sideviews=..., shipping=[f1,..,fk])
-- opt(1)
--- define class StreamingMapReduce(MapReduce): stores the commands
--- make this a special case for the compiler - abstract compiler _coreCommand(self,step,gp) would just be mapper-command or reduce-command
-- opt(2)
- allow --make as synonym for --store
- write and document an efficient Count (see mrs_test/mrs-naive-bayes.py)
- OutputOfShellCommand(command=[],sideviews=[],shipping=[])
- OutputOfStreamingMapReduce(view1, mapper='shell command', reducer='shell command', combiner='shell command', sideviews=..., shipping=[f1,..,fk])
--- define class StreamingMapReduce(MapReduce): which knows how to doExternalMap and doExternalReduce, etc via subprocesses
--- this could be a gpextras view

subclass of Augment, where inners=[], and rowGenerator runs the
shell command as subprocess, stores the result, and then uses the
ReadLines generator to produce the result

- add user-defined Reuse(FILE) ? (why do I want this again? for make-style pipelines? is it really needed?)
- gpextras, for debugging:
-- PPrint?
Expand All @@ -57,39 +51,4 @@ DOCS:
- some longer examples for the tutorial (phirl-naive? tfidfs?)
- document planner.ship, planner.setEvaluator

TODO - MAJOR

- a GPig.registerCompiler('key',factoryClass), for adding new targets other than hadoop?
- compiler for marime.py map-reducer with ramdisks (note: diskutil erasevolume HFS+ 'RAMDisk' `hdiutil attach -nomount ram://10315776`,
size is in 2048-byte blocks)

- multithreading ideas

1. for CPU intensive steps, include multithreaded ReplaceEach and Flatten ops,
which sets up a work queue, and for each row, adds a task, and
removes any completed tasks.

2. add another compiler, which generates marime/mime.py steps.

3. Implement: marime mr -m MAP -r RED -i IN -o OUT -k K
and: marime putsplit -i IN -o OUT -k K
marime getmerge -i IN -o OUT
where in and out are ramdisk directories

map setup if there are J files in I:
K queues, Qi for shard i - ie {k,v : H(k)%K == i}
J processes Pj, each will run 'MAP < in/shard.j | ...' -- or could use threads (subprocesses would be more modular)
J threads to read from Pj.stdout and route (k,v) to appropriate Qi
K threads, each to process inputs from one queue, into a shardBuffer

when all map processes are done:
K subprocesses, Ri, to run '... | RED > out/shard.k' -- or could use threads (subprocesses more modular)
K threads to print from the shardBuffer to Ri

- DESCRIBE(...) - could be just a pretty_print?

- ILLUSTRATE(view,[outputs]) - using definition of view, select the
inputs from the innerviews that produce those outputs. Then, do that
recursively to get a test case.


2 changes: 1 addition & 1 deletion mrs_test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ parallel-events:
time $(MRS) --task --input unsharded --output gpfs:sharded --mapper cat --reducer cat --numReduceTasks $(K)
echo training naive bayes in parallel
time $(MRS) --task --input gpfs:sharded --output gpfs:events --numReduceTasks $(K) --async $(ASYNC) \
--mapper 'python streamNaiveBayesLearner.py --streamTrain 100' \
--mapper 'python streamNaiveBayesLearner.py --streamTrain 0' \
--reducer 'python sum-events.py'
echo time $(MRS) --task --input gpfs:sharded --output gpfs:events --mapper 'python streamNaiveBayesLearner.py --streamTrain 100' --async $(ASYNC)
echo export
Expand Down
32 changes: 32 additions & 0 deletions mrs_test/mrs-naive-bayes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from guineapig import *
import sys
import gpextras

def partitionCounter(rows):
n = 0
for r in rows: n+= 1
yield n

class NaiveBayes(Planner):

D = GPig.getArgvParams(required=['train'])
def sumEventCounts(v):
return Group(v, by=lambda (e,n):e, retaining=lambda (e,n):n, reducingTo=ReduceToSum(), combiningTo=ReduceToSum())
def count(v,tag):
return ReplaceEachPartition(v, by=partitionCounter) | Group(by=lambda n:tag, reducingTo=ReduceToSum())

#format: (id,classLabelList,tokenList)
data = ReadLines(D['train']) \
| ReplaceEach(by=lambda line:line.strip().split("\t")) \
| ReplaceEach(by=lambda parts:(parts[0],parts[1].split(","),parts[2:]))
labelEvents = sumEventCounts(Flatten(data, by=lambda (docid,ys,ws): [(y,1) for y in ys]))
wordLabelEvents = sumEventCounts(Flatten(data, by=lambda (docid,ys,ws): [(y+'/'+w,1) for y in ys for w in ws]))
totalLines = count(data,'#lines')
totalWords = count(Flatten(data, lambda (docid,ys,ws): ws), '#words')

# always end like this
if __name__ == "__main__":
p = NaiveBayes()
p.registerCompiler('mrs',gpextras.MRSCompiler)
p.main(sys.argv)

2 changes: 1 addition & 1 deletion mrs_test/setup-mrs-server.bash
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ export GPHOME=`(cd ..;pwd)`
export PYTHONPATH=$GPHOME
export GP_MRS_COMMAND="python $GPHOME/mrs_gp.py --task"
rm -f server.log
python $GPHOME/mrs_gp.py --serve &
pypy $GPHOME/mrs_gp.py --serve &

2 changes: 2 additions & 0 deletions mrs_test/setup-mrs.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export PYTHONPATH=$PYTHONPATH:~/shared-home/code/GuineaPig
alias mrs='pypy ~/shared-home/code/GuineaPig/mrs_gp.py'

0 comments on commit 2510598

Please sign in to comment.