Skip to content

Commit

Permalink
cleanup print statements and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
wwcohen committed Dec 2, 2015
1 parent e37f700 commit c25c857
Showing 1 changed file with 8 additions and 25 deletions.
33 changes: 8 additions & 25 deletions mrs_gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,21 @@ def _fixDir(self,d):

@staticmethod
def inGPFS(d):
"""Returns True if this is a name of a directory in the
GPFileSystem.
"""
return d.startswith("gpfs:")

@staticmethod
def fmtNumChars(n):
""""Format a large number of characters readably by also including the
equivalent size in megabytes."""
equivalent size in megabytes.
"""
mb = n/(1024*1024.0)
return "%d(%.1fM)" % (n,mb)


# global file system used by map-reduce system
# global file system object used by map-reduce system

FS = GPFileSystem()

Expand Down Expand Up @@ -257,7 +261,7 @@ def report(self,includeLogs=True):
TASK_LOCK = threading.Lock()

##############################################################################
# main map-reduce algorithm(s)
# Main map-reduce algorithm(s)
#
# maponly is very simple: it sets up K independent mapper processes,
# one for each shard, which read from that shard and write to the
Expand Down Expand Up @@ -513,15 +517,10 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector):
TASK_STATS.start(tag)
while True:

# print 'stdin',inbufPtr,'stdout',len(result['stdout'].getvalue()), \
# 'stderr',len(result['stderr'].getvalue())

readable,writeable,exceptional = \
select.select(activeOutputFPs.keys(), activeInputs, activeOutputFPs.keys()+activeInputs, 0)
assert not exceptional,'exceptional files + %r' % exceptional

#print 'loop r,w',readable,writeable

progress = False
for fp in readable:
# key will be string 'stdout' or 'stdin'
Expand All @@ -543,9 +542,7 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector):
if pipe.stdin in writeable:
# figure out how much I can write...
hi = min(inbufPtr+MIN_PIPE_BUFFER_SIZE, len(inbuf))
#print '+','stdin',hi,len(inbuf)
n = os.write(writeable[0].fileno(), inbuf[inbufPtr:hi])
#print 'w',n
if n>0:
inbufPtr += n
TASK_STATS.ioSize[tag]['stdin'] += n
Expand All @@ -557,14 +554,12 @@ def asyncPipeThread(tag,pipe,inbuf,outCollector,errCollector):
activeInputs = []

if progress:
#print '.',
pass
elif pipe.poll()!=None:
#process finished
break
else:
#wait for process to get some output ready
#print '?..'
time.sleep(SLEEP_DURATION)

#finished the loop
Expand Down Expand Up @@ -607,8 +602,6 @@ def __init__(self,tag,reduceQs):
self.tag = tag
self.reduceQs = reduceQs
self.numReduceTasks = len(reduceQs)
#self.logfile = open(self.tag.replace("/","__")+".log", 'w')
#self.logfile.write("=== log "+repr(self)+" ===")
# buffer what goes into the reducer queues, since they seem
# very slow with lots of little inputs
self.buffers = map(lambda i:cStringIO.StringIO(), reduceQs)
Expand Down Expand Up @@ -640,46 +633,37 @@ def collect(self,str):
TASK_STATS.ioSize[self.tag]['stdin'] += len(line)
k = key(line)
self.buffers[hash(k)%self.numReduceTasks].write(line)
#self.logfile.write("echo:"+line)
#self.logfile.write("key: %s hash: %d queue: %d\n" % (k,hash(k),(hash(k) % self.numReduceTasks)))
lastNewline = nextNewline+1

def close(self):
assert not self.leftover, "collected data wasn't linefeed-terminated"
global TASK_STATS
#self.logfile.write("=== end echo\n")
for i in range(len(self.reduceQs)):
# send the buffered-up data for the i-th queue
bufi = self.buffers[i].getvalue()
self.reduceQs[i].put(bufi)
#self.logfile.write("sent %d chars to reducer %d %r\n" % (len(bufi),i,self.reduceQs[i]))
TASK_STATS.ioSize[self.tag]['stdout'] += len(bufi)
# signal we're done with this queue
self.reduceQs[i].put(None)
TASK_STATS.end(self.tag)
#self.logfile.close()

####################
# used with reducer queues

def acceptReduceInputs(tag,numMapTasks,reduceQ,reducePipe):
"""Thread that monitors a queue of items to add to send to a reducer process."""
#logfile = open(tag.replace("/","__")+".log",'w')
numPoison = 0 #number of mappers that have finished writing
#logfile.write('= accepting inputs for queue %r from %d shufflers\n' % (reduceQ,numMapTasks))
while numPoison<numMapTasks:
task = reduceQ.get()
if task==None:
#some mapper has indicated that it's finished
numPoison += 1
#logfile.write('= poison %d/%d shufflers\n' % (numPoison,numMapTasks))
else:
line = task
reducePipe.stdin.write(line)
#logfile.write('= accepted %d chars for queue %r\n' % (len(line),repr(reduceQ)))

reduceQ.task_done()
#now all mappers are finished so we can exit
#logfile.write('= exiting queue %r poisons %d/%d\n' % (reduceQ,numPoison,numMapTasks))

####################
# access input/output files for mapreduce
Expand Down Expand Up @@ -1002,7 +986,6 @@ def usage():
logging.basicConfig(filename="server.log",level=logging.INFO)
runServer()
else:
logging.basicConfig(level=logging.INFO)
if "--send" in optdict:
sendRequest(optdict['--send'])
elif "--shutdown" in optdict:
Expand Down

0 comments on commit c25c857

Please sign in to comment.