diff --git a/gpextras.py b/gpextras.py index f4c8722..7fd72be 100644 --- a/gpextras.py +++ b/gpextras.py @@ -105,18 +105,18 @@ def simpleMapCommands(self,task,gp,mapCom,src,dst): def simpleMapReduceCommands(self,task,gp,mapCom,reduceCom,src,dst): """A map-reduce job with one input.""" p = task.reduceParallel(gp) - return [ "%s --input %s --output %s --mapper '%s' --numReduceTasks %d --reducer '%s'" \ - % (self.mrsCommand,src,dst,mapCom,p,reduceCom) ] + return [ "%s --input %s --output %s --numReduceTasks %d --mapper '%s' --reducer '%s'" \ + % (self.mrsCommand,src,dst,p,mapCom,reduceCom) ] def joinCommands(self,task,gp,mapComs,reduceCom,srcs,midpoint,dst): """A map-reduce job with several inputs.""" p = task.reduceParallel(gp) - def midi(i): return midpoint + '-' + str(i) + def mid(i): return midpoint + '-' + str(i) subplan = [] for i in range(len(srcs)): subplan.append("%s --input %s --output %s --mapper '%s'" \ % (self.mrsCommand,srcs[i],mid(i),mapComs[i])) allMidpoints = ",".join([mid(i) for i in range(len(srcs))]) - subplan.append("%s --inputs %s --output %s --mapper cat --numReduceTasks %d --reducer '%s'" \ + subplan.append("%s --inputs %s --output %s --numReduceTasks %d --mapper cat --reducer '%s'" \ % (self.mrsCommand,allMidpoints,dst,p,reduceCom)) return subplan diff --git a/mrs_gp.py b/mrs_gp.py index 923b3cf..c6de859 100644 --- a/mrs_gp.py +++ b/mrs_gp.py @@ -686,9 +686,12 @@ def setupFiles(indirList,outdir,numReduceTasks): single output, it will also be a file. """ #clear space/make directory for output, if necessary - if os.path.exists(outdir): + if os.path.exists(outdir): logging.warn('removing %s' % (outdir)) - shutil.rmtree(outdir) + if os.path.isdir(outdir): + shutil.rmtree(outdir) + else: + os.remove(outdir) indirs = [] infiles = [] diff --git a/mrs_test/Makefile b/mrs_test/Makefile index f20f7a5..c9ea1cf 100644 --- a/mrs_test/Makefile +++ b/mrs_test/Makefile @@ -4,7 +4,7 @@ ASYNC = 0 # clean: - rm -rf tmp/* parallel-events.txt + rm -rf tmp tmp.txt sharded parallel-events.txt server.log test test2 gpig_views backup: cp ../mrs_gp_v4.py ../backup/`date "+%H:%M:%S"`-mrs_gp_v4.py diff --git a/mrs_test/Makefile.stress b/mrs_test/Makefile.stress deleted file mode 100644 index 76fb3d6..0000000 --- a/mrs_test/Makefile.stress +++ /dev/null @@ -1,79 +0,0 @@ -#comments: with full_test.txt, about 100M -# disk and ram times are comparable -# ram is fastest w/o parallelism (K=1), then it's about 2x speed of disk -# with full_train.txt, about 1G -# ram is much faster -# not much speedup w parallelism (K=1 is best) - - -RAM=/mnt/ramdisk/mrs/ -K=60 -BIGFILE=/afs/cs.cmu.edu/user/wcohen/bigML/RCV1/RCV1.full_test.txt -#BIGFILE=/afs/cs.cmu.edu/user/wcohen/bigML/RCV1/RCV1.full_train.txt -DIR=$(RAM) -NBCOUNT='python streamNaiveBayesLearner.py --streamTrain 100' -#NBCOUNT='python streamNaiveBayesLearner.py --train' -SUM='python sum-events.py' - -MRS=pypy ../mrs_gp.py - -simple-disk-times: - echo == disk stream - about 3 sec for 100M, 2:03 for 1G - time cat $(BIGFILE) | python fixrcv.py > bigfile1/test.txt - echo == sharding disk-disk - about 10-11 sec for 100M with K=10, 1:18 for 1G with K=5 - time $(MRS) --input bigfile1 --output bigfile1-sharded --mapper cat --reducer cat --numReduceTasks $(K) - echo == copy sharded disk-disk - about 8-10 sec for 100M with K=10, 1:16 for 1G with K=5 - time $(MRS) --input bigfile1-sharded --output bigfile1-copy --mapper cat --reducer cat --numReduceTasks $(K) - -simple-ram-times: - echo == disk to ram stream - about 3 sec for 100M, 3:19 for 1G with K=5 - time cat /afs/cs.cmu.edu/user/wcohen/bigML/RCV1/RCV1.full_test.txt | python fixrcv.py > $(RAM)/bigfile1/test.txt - echo == sharding ram-ram - about 10-11 sec for 100M , 0:08 for 1G with K=5 - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks $(K) - echo == copy sharded ram-ram - about 8-10 sec for 100M, 0:07 for 1G with K=5 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks $(K) - echo == resharded ram-ram - about 8.7 sec for 100M with k=30 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks `expr $(K) / 2` - -# for train data - 1G -# k shard copy -# 1 5.7 5.4 -# 3 7.7 7.3 -# 5 7.9 7.5 -# 10 9.0 8.2 -# 20 9.7 8.8 - -simple-ram-times-sweep-k: - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks 1 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks 1 - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks 3 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks 3 - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks 5 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks 5 - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks 10 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks 10 - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks 20 - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/bigfile1-copy --mapper cat --reducer cat --numReduceTasks 20 - -# for test data - 100M --train -# version 1 version 2 pypy v2 -# k shard learn shard learn shard learn -# 1 5.7 3:44 2.8 3:05 1:27 3:46 -# 3 6.7 8:12 3.0 3:38 1:28 7:34 -# 10 8.5 14:02 -# 20 9.0 20:40 3.5 7:10 1:30 26:32 - -# for test data - 100M --streamTrain 100 pypy -# k shard learn %learnCPU -# 1 1:31 26:49 102 -# 3 1:29 1:01:35 250 -# 10 -# 20 - -learn-ram-times: - echo sharding - $(MRS) --serve - time $(MRS) --input $(RAM)/bigfile1 --output $(RAM)/bigfile1-sharded --mapper cat --reducer cat --numReduceTasks $(K) - echo learning - time $(MRS) --input $(RAM)/bigfile1-sharded --output $(RAM)/events --numReduceTasks $(K) --mapper $(NBCOUNT) --reducer $(SUM) - $(MRS) --shutdown diff --git a/mrs_test/test/small.txt b/mrs_test/test/small.txt deleted file mode 100644 index b29f9a6..0000000 --- a/mrs_test/test/small.txt +++ /dev/null @@ -1,20 +0,0 @@ -input 1 -input 2 -input 3 -input 4 -input 5 -input 6 -input 7 -input 8 -input 9 -input 10 -input 11 -input 12 -input 13 -input 14 -input 15 -input 16 -input 17 -input 18 -input 19 -input 20 diff --git a/mrs_test/test2/small.txt b/mrs_test/test2/small.txt deleted file mode 100644 index 7ed0fb2..0000000 --- a/mrs_test/test2/small.txt +++ /dev/null @@ -1,20 +0,0 @@ -secondary-input 1 -secondary-input 2 -secondary-input 3 -secondary-input 4 -secondary-input 5 -secondary-input 6 -secondary-input 7 -secondary-input 8 -secondary-input 9 -secondary-input 10 -secondary-input 11 -secondary-input 12 -secondary-input 13 -secondary-input 14 -secondary-input 15 -secondary-input 16 -secondary-input 17 -secondary-input 18 -secondary-input 19 -secondary-input 20