From c25c8575802d0351a756d28a046d198c9371b6ef Mon Sep 17 00:00:00 2001 From: William Cohen Date: Wed, 2 Dec 2015 16:59:50 -0500 Subject: [PATCH] cleanup print statements and docs --- mrs_gp.py | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/mrs_gp.py b/mrs_gp.py index cd71dfa..9aa3ebc 100644 --- a/mrs_gp.py +++ b/mrs_gp.py @@ -167,17 +167,21 @@ def _fixDir(self,d): @staticmethod def inGPFS(d): + """Returns True if this is a name of a directory in the + GPFileSystem. + """ return d.startswith("gpfs:") @staticmethod def fmtNumChars(n): """"Format a large number of characters readably by also including the - equivalent size in megabytes.""" + equivalent size in megabytes. + """ mb = n/(1024*1024.0) return "%d(%.1fM)" % (n,mb) -# global file system used by map-reduce system +# global file system object used by map-reduce system FS = GPFileSystem() @@ -257,7 +261,7 @@ def report(self,includeLogs=True): TASK_LOCK = threading.Lock() ############################################################################## -# main map-reduce algorithm(s) +# Main map-reduce algorithm(s) # # maponly is very simple: it sets up K independent mapper processes, # one for each shard, which read from that shard and write to the @@ -513,15 +517,10 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector): TASK_STATS.start(tag) while True: -# print 'stdin',inbufPtr,'stdout',len(result['stdout'].getvalue()), \ -# 'stderr',len(result['stderr'].getvalue()) - readable,writeable,exceptional = \ select.select(activeOutputFPs.keys(), activeInputs, activeOutputFPs.keys()+activeInputs, 0) assert not exceptional,'exceptional files + %r' % exceptional - #print 'loop r,w',readable,writeable - progress = False for fp in readable: # key will be string 'stdout' or 'stdin' @@ -543,9 +542,7 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector): if pipe.stdin in writeable: # figure out how much I can write... hi = min(inbufPtr+MIN_PIPE_BUFFER_SIZE, len(inbuf)) - #print '+','stdin',hi,len(inbuf) n = os.write(writeable[0].fileno(), inbuf[inbufPtr:hi]) - #print 'w',n if n>0: inbufPtr += n TASK_STATS.ioSize[tag]['stdin'] += n @@ -557,14 +554,12 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector): activeInputs = [] if progress: - #print '.', pass elif pipe.poll()!=None: #process finished break else: #wait for process to get some output ready - #print '?..' time.sleep(SLEEP_DURATION) #finished the loop @@ -607,8 +602,6 @@ def __init__(self,tag,reduceQs): self.tag = tag self.reduceQs = reduceQs self.numReduceTasks = len(reduceQs) - #self.logfile = open(self.tag.replace("/","__")+".log", 'w') - #self.logfile.write("=== log "+repr(self)+" ===") # buffer what goes into the reducer queues, since they seem # very slow with lots of little inputs self.buffers = map(lambda i:cStringIO.StringIO(), reduceQs) @@ -640,46 +633,37 @@ def collect(self,str): TASK_STATS.ioSize[self.tag]['stdin'] += len(line) k = key(line) self.buffers[hash(k)%self.numReduceTasks].write(line) - #self.logfile.write("echo:"+line) - #self.logfile.write("key: %s hash: %d queue: %d\n" % (k,hash(k),(hash(k) % self.numReduceTasks))) lastNewline = nextNewline+1 def close(self): assert not self.leftover, "collected data wasn't linefeed-terminated" global TASK_STATS - #self.logfile.write("=== end echo\n") for i in range(len(self.reduceQs)): # send the buffered-up data for the i-th queue bufi = self.buffers[i].getvalue() self.reduceQs[i].put(bufi) - #self.logfile.write("sent %d chars to reducer %d %r\n" % (len(bufi),i,self.reduceQs[i])) TASK_STATS.ioSize[self.tag]['stdout'] += len(bufi) # signal we're done with this queue self.reduceQs[i].put(None) TASK_STATS.end(self.tag) - #self.logfile.close() #################### # used with reducer queues def acceptReduceInputs(tag,numMapTasks,reduceQ,reducePipe): """Thread that monitors a queue of items to add to send to a reducer process.""" - #logfile = open(tag.replace("/","__")+".log",'w') numPoison = 0 #number of mappers that have finished writing - #logfile.write('= accepting inputs for queue %r from %d shufflers\n' % (reduceQ,numMapTasks)) while numPoison