From e277c8f646222497710ba5e8ea52122bf05edece Mon Sep 17 00:00:00 2001 From: William Cohen Date: Wed, 2 Dec 2015 15:54:20 -0500 Subject: [PATCH] fixed a bug in acceptReduceInputs causing failures on joins --- TODO.txt | 4 +- gpextras.py | 1 + mrs_gp.py | 115 ++-- mrs_test/Makefile | 2 +- mrs_test/caterr.py | 2 +- mrs_test/corpus.txt | 964 +++++++++++++++++++++++++++++++++ mrs_test/id-parks.txt | 654 ++++++++++++++++++++++ mrs_test/mrs-phirl-naive.py | 72 +++ mrs_test/mrs-wordcount.py | 20 + mrs_test/setup-mrs-direct.bash | 9 + mrs_test/setup-mrs-server.bash | 7 + mrs_test/test/abc.txt | 20 + tutorial/Makefile | 2 +- 13 files changed, 1815 insertions(+), 57 deletions(-) create mode 100644 mrs_test/corpus.txt create mode 100644 mrs_test/id-parks.txt create mode 100644 mrs_test/mrs-phirl-naive.py create mode 100644 mrs_test/mrs-wordcount.py create mode 100644 mrs_test/setup-mrs-direct.bash create mode 100644 mrs_test/setup-mrs-server.bash create mode 100644 mrs_test/test/abc.txt diff --git a/TODO.txt b/TODO.txt index 57ca4d7..9d83ef3 100644 --- a/TODO.txt +++ b/TODO.txt @@ -14,7 +14,9 @@ TODO for mrs_gp.py: - check if input file is a directory or not, and write to a file, not directory, if --numReduceTasks = 1, so it can be used with regular gp - done - - test with gpig, test on processes with errors, .... + - test with gpig - done + - fix - gpig prefixes dirs in gpfs with / because it likes viewdir/foo.gp + - test on processes with errors, .... - os.times() - can this show children's time/CPU pct so I can see actual load? - server timeout, for security, in case I leave it running...? - add an 'abort task' method for server...? or maybe kill client thread? diff --git a/gpextras.py b/gpextras.py index 7fd72be..a60639c 100644 --- a/gpextras.py +++ b/gpextras.py @@ -100,6 +100,7 @@ def distributeCommands(self,task,gp,maybeRemoteCopy,localCopy): def simpleMapCommands(self,task,gp,mapCom,src,dst): """A map-only job with zero or one inputs.""" assert src,'undefined src for this view? you may be using Wrap with target:mrs' + # parallelism is ignored for map-only tasks return [ "%s --input %s --output %s --mapper '%s'" % (self.mrsCommand,src,dst,mapCom) ] def simpleMapReduceCommands(self,task,gp,mapCom,reduceCom,src,dst): diff --git a/mrs_gp.py b/mrs_gp.py index c6de859..cd71dfa 100644 --- a/mrs_gp.py +++ b/mrs_gp.py @@ -15,7 +15,6 @@ import sys import threading import time -import time import traceback import urllib @@ -111,7 +110,7 @@ def listDirs(self,pretty=False): information, sort of like the output of ls -l. """ result = sorted(self.filesIn.keys()) - if not pretty: + if not pretty: return result else: def fmtdir(d): return '%s%3d %s' % (GPFileSystem.FILES_MARKER,len(FS.listFiles(d)),d) @@ -159,7 +158,12 @@ def __str__(self): def _fixDir(self,d): """Strip the prefix gpfs: if it is present.""" - return d if not GPFileSystem.inGPFS(d) else d[len("gpfs:"):] + if d.startswith("gpfs:/"): + return d[len("gpfs:/"):] + elif d.startswith("gpfs:"): + return d[len("gpfs:"):] + else: + return d @staticmethod def inGPFS(d): @@ -322,7 +326,8 @@ def performTask(optdict): def key(line): """Extract the key for a line containing a tab-separated key,value pair.""" - return line[:line.find("\t")] + k = line[:line.find("\t")] + return k def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread): """Run a generic streaming map-reduce process. The mapper and reducer @@ -344,9 +349,9 @@ def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread): #names of the reduce tasks reduceTags = map(lambda j:'reducer-to-%s-%s' % (outdirs[j],outfiles[j]), range(numReduceTasks)) reduceQs = map(lambda j:Queue.Queue(), range(numReduceTasks)) - reducePipes = map(lambda j:makePipe("sort -k1,2 | "+reducer), range(numReduceTasks)) + reducePipes = map(lambda j:makePipe("LC_COLLATE=C sort -k1,2 | "+reducer), range(numReduceTasks)) reduceQThreads = map( - lambda j:threading.Thread(target=acceptReduceInputs, args=(numMapTasks,reduceQs[j],reducePipes[j])), + lambda j:threading.Thread(target=acceptReduceInputs, args=(reduceTags[j],numMapTasks,reduceQs[j],reducePipes[j])), range(numReduceTasks)) TASK_STATS.end('_init reducers and queues') @@ -374,13 +379,8 @@ def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread): TASK_STATS.start('_run mappers') for t in reduceQThreads: t.start() for t in mapThreads: t.start() - #print 'join mapThreads' for t in mapThreads: t.join() - #print 'join reduceQThreads' for t in reduceQThreads: t.join() - #print 'join reduceQs' - #for q in reduceQs: q.join() - #print 'joined....' TASK_STATS.end('_run mappers') reduceErrCollectors = map( @@ -607,6 +607,8 @@ def __init__(self,tag,reduceQs): self.tag = tag self.reduceQs = reduceQs self.numReduceTasks = len(reduceQs) + #self.logfile = open(self.tag.replace("/","__")+".log", 'w') + #self.logfile.write("=== log "+repr(self)+" ===") # buffer what goes into the reducer queues, since they seem # very slow with lots of little inputs self.buffers = map(lambda i:cStringIO.StringIO(), reduceQs) @@ -634,40 +636,50 @@ def collect(self,str): break else: line = lines[lastNewline:nextNewline+1] + assert line.strip().find("\n") < 0 TASK_STATS.ioSize[self.tag]['stdin'] += len(line) k = key(line) - #self.reduceQs[hash(k)%self.numReduceTasks].put(line) self.buffers[hash(k)%self.numReduceTasks].write(line) + #self.logfile.write("echo:"+line) + #self.logfile.write("key: %s hash: %d queue: %d\n" % (k,hash(k),(hash(k) % self.numReduceTasks))) lastNewline = nextNewline+1 def close(self): assert not self.leftover, "collected data wasn't linefeed-terminated" global TASK_STATS + #self.logfile.write("=== end echo\n") for i in range(len(self.reduceQs)): # send the buffered-up data for the i-th queue bufi = self.buffers[i].getvalue() self.reduceQs[i].put(bufi) + #self.logfile.write("sent %d chars to reducer %d %r\n" % (len(bufi),i,self.reduceQs[i])) TASK_STATS.ioSize[self.tag]['stdout'] += len(bufi) # signal we're done with this queue self.reduceQs[i].put(None) TASK_STATS.end(self.tag) + #self.logfile.close() #################### # used with reducer queues -def acceptReduceInputs(numMapTasks,reduceQ,reducePipe): +def acceptReduceInputs(tag,numMapTasks,reduceQ,reducePipe): """Thread that monitors a queue of items to add to send to a reducer process.""" + #logfile = open(tag.replace("/","__")+".log",'w') numPoison = 0 #number of mappers that have finished writing + #logfile.write('= accepting inputs for queue %r from %d shufflers\n' % (reduceQ,numMapTasks)) while numPoison