Skip to content

Commit

Permalink
fixed a bug in acceptReduceInputs causing failures on joins
Browse files Browse the repository at this point in the history
  • Loading branch information
wwcohen committed Dec 2, 2015
1 parent 75acdf1 commit e277c8f
Show file tree
Hide file tree
Showing 13 changed files with 1,815 additions and 57 deletions.
4 changes: 3 additions & 1 deletion TODO.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ TODO for mrs_gp.py:
- check if input file is a directory or not, and write to a file, not directory, if --numReduceTasks = 1,
so it can be used with regular gp - done

- test with gpig, test on processes with errors, ....
- test with gpig - done
- fix - gpig prefixes dirs in gpfs with / because it likes viewdir/foo.gp
- test on processes with errors, ....
- os.times() - can this show children's time/CPU pct so I can see actual load?
- server timeout, for security, in case I leave it running...?
- add an 'abort task' method for server...? or maybe kill client thread?
Expand Down
1 change: 1 addition & 0 deletions gpextras.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def distributeCommands(self,task,gp,maybeRemoteCopy,localCopy):
def simpleMapCommands(self,task,gp,mapCom,src,dst):
"""A map-only job with zero or one inputs."""
assert src,'undefined src for this view? you may be using Wrap with target:mrs'
# parallelism is ignored for map-only tasks
return [ "%s --input %s --output %s --mapper '%s'" % (self.mrsCommand,src,dst,mapCom) ]

def simpleMapReduceCommands(self,task,gp,mapCom,reduceCom,src,dst):
Expand Down
115 changes: 62 additions & 53 deletions mrs_gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import sys
import threading
import time
import time
import traceback
import urllib

Expand Down Expand Up @@ -111,7 +110,7 @@ def listDirs(self,pretty=False):
information, sort of like the output of ls -l.
"""
result = sorted(self.filesIn.keys())
if not pretty:
if not pretty:
return result
else:
def fmtdir(d): return '%s%3d %s' % (GPFileSystem.FILES_MARKER,len(FS.listFiles(d)),d)
Expand Down Expand Up @@ -159,7 +158,12 @@ def __str__(self):

def _fixDir(self,d):
"""Strip the prefix gpfs: if it is present."""
return d if not GPFileSystem.inGPFS(d) else d[len("gpfs:"):]
if d.startswith("gpfs:/"):
return d[len("gpfs:/"):]
elif d.startswith("gpfs:"):
return d[len("gpfs:"):]
else:
return d

@staticmethod
def inGPFS(d):
Expand Down Expand Up @@ -322,7 +326,8 @@ def performTask(optdict):

def key(line):
"""Extract the key for a line containing a tab-separated key,value pair."""
return line[:line.find("\t")]
k = line[:line.find("\t")]
return k

def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread):
"""Run a generic streaming map-reduce process. The mapper and reducer
Expand All @@ -344,9 +349,9 @@ def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread):
#names of the reduce tasks
reduceTags = map(lambda j:'reducer-to-%s-%s' % (outdirs[j],outfiles[j]), range(numReduceTasks))
reduceQs = map(lambda j:Queue.Queue(), range(numReduceTasks))
reducePipes = map(lambda j:makePipe("sort -k1,2 | "+reducer), range(numReduceTasks))
reducePipes = map(lambda j:makePipe("LC_COLLATE=C sort -k1,2 | "+reducer), range(numReduceTasks))
reduceQThreads = map(
lambda j:threading.Thread(target=acceptReduceInputs, args=(numMapTasks,reduceQs[j],reducePipes[j])),
lambda j:threading.Thread(target=acceptReduceInputs, args=(reduceTags[j],numMapTasks,reduceQs[j],reducePipes[j])),
range(numReduceTasks))
TASK_STATS.end('_init reducers and queues')

Expand Down Expand Up @@ -374,13 +379,8 @@ def mapreduce(indirList,outdir,mapper,reducer,numReduceTasks,pipeThread):
TASK_STATS.start('_run mappers')
for t in reduceQThreads: t.start()
for t in mapThreads: t.start()
#print 'join mapThreads'
for t in mapThreads: t.join()
#print 'join reduceQThreads'
for t in reduceQThreads: t.join()
#print 'join reduceQs'
#for q in reduceQs: q.join()
#print 'joined....'
TASK_STATS.end('_run mappers')

reduceErrCollectors = map(
Expand Down Expand Up @@ -607,6 +607,8 @@ def __init__(self,tag,reduceQs):
self.tag = tag
self.reduceQs = reduceQs
self.numReduceTasks = len(reduceQs)
#self.logfile = open(self.tag.replace("/","__")+".log", 'w')
#self.logfile.write("=== log "+repr(self)+" ===")
# buffer what goes into the reducer queues, since they seem
# very slow with lots of little inputs
self.buffers = map(lambda i:cStringIO.StringIO(), reduceQs)
Expand Down Expand Up @@ -634,40 +636,50 @@ def collect(self,str):
break
else:
line = lines[lastNewline:nextNewline+1]
assert line.strip().find("\n") < 0
TASK_STATS.ioSize[self.tag]['stdin'] += len(line)
k = key(line)
#self.reduceQs[hash(k)%self.numReduceTasks].put(line)
self.buffers[hash(k)%self.numReduceTasks].write(line)
#self.logfile.write("echo:"+line)
#self.logfile.write("key: %s hash: %d queue: %d\n" % (k,hash(k),(hash(k) % self.numReduceTasks)))
lastNewline = nextNewline+1

def close(self):
assert not self.leftover, "collected data wasn't linefeed-terminated"
global TASK_STATS
#self.logfile.write("=== end echo\n")
for i in range(len(self.reduceQs)):
# send the buffered-up data for the i-th queue
bufi = self.buffers[i].getvalue()
self.reduceQs[i].put(bufi)
#self.logfile.write("sent %d chars to reducer %d %r\n" % (len(bufi),i,self.reduceQs[i]))
TASK_STATS.ioSize[self.tag]['stdout'] += len(bufi)
# signal we're done with this queue
self.reduceQs[i].put(None)
TASK_STATS.end(self.tag)
#self.logfile.close()

####################
# used with reducer queues

def acceptReduceInputs(numMapTasks,reduceQ,reducePipe):
def acceptReduceInputs(tag,numMapTasks,reduceQ,reducePipe):
"""Thread that monitors a queue of items to add to send to a reducer process."""
#logfile = open(tag.replace("/","__")+".log",'w')
numPoison = 0 #number of mappers that have finished writing
#logfile.write('= accepting inputs for queue %r from %d shufflers\n' % (reduceQ,numMapTasks))
while numPoison<numMapTasks:
task = reduceQ.get()
if task:
if task==None:
#some mapper has indicated that it's finished
numPoison += 1
#logfile.write('= poison %d/%d shufflers\n' % (numPoison,numMapTasks))
else:
line = task
reducePipe.stdin.write(line)
#logfile.write('= accepted %d chars for queue %r\n' % (len(line),repr(reduceQ)))
reduceQ.task_done()
else:
#some mapper has indicated that it's finished
numPoison += 1
#now all mappers are finished so we can exit
#logfile.write('= exiting queue %r poisons %d/%d\n' % (reduceQ,numPoison,numMapTasks))

####################
# access input/output files for mapreduce
Expand All @@ -679,54 +691,51 @@ def setupFiles(indirList,outdir,numReduceTasks):
is determined by numReduceTasks: if it is -1, then this is
interpreted as a map-only task, and the number of outputs is the
same as the number of inputs.
Generally indirList is a list of directories or GPFS dirs, and the
input files are the contents of those directories. But indirList
could also contains only files. In this case, if there is a
single output, it will also be a file.
"""
#clear space/make directory for output, if necessary
if os.path.exists(outdir):
if GPFileSystem.inGPFS(outdir):
global FS
FS.rmDir(outdir)
elif os.path.exists(outdir):
logging.warn('removing %s' % (outdir))
if os.path.isdir(outdir):
shutil.rmtree(outdir)
else:
os.remove(outdir)
if os.path.isdir(outdir): shutil.rmtree(outdir)
else: os.remove(outdir)

#collect input files and directories
indirs = []
infiles = []
outputToFile = False
if all(os.path.isfile(f) for f in indirList):
#ok if all inputs are files...
for f in indirList:
inhead,intail = os.path.split(f)
inputsAreFiles = True
for dir in indirList:
if GPFileSystem.inGPFS(dir):
files = FS.listFiles(dir)
infiles.extend(files)
indirs.extend([dir] * len(files))
inputsAreFiles = False
elif os.path.isdir(dir):
files = [f for f in os.listdir(dir)]
infiles.extend(files)
indirs.extend([dir] * len(files))
inputsAreFiles = False
elif os.path.isfile(dir):
inhead,intail = os.path.split(dir)
indirs.append(inhead)
infiles.append(intail)
#and if they are and there's one non-GPFS output, make that a file too
if (numReduceTasks==1 or (numReduceTasks==-1 and len(indirList)==1)) and not GPFileSystem.inGPFS(outdir):
#if there's one output, and all inputs are files, then make the output a file also
outhead,outtail = os.path.split(outdir)
outdirs=[outhead]
outfiles=[outtail]
outputToFile = True
else:
assert False,'illegal input location %s' % dir

numActualReduceTasks = len(infiles) if numReduceTasks<0 else numReduceTasks
if inputsAreFiles and numActualReduceTasks==1 and not GPFileSystem.inGPFS(outdir):
#make the output a file
outhead,outtail = os.path.split(outdir)
outdirs=[outhead]
outfiles=[outtail]
else:
#collect all the input files
for dir in indirList:
if GPFileSystem.inGPFS(dir):
files = FS.listFiles(dir)
elif os.path.isdir(dir):
files = [f for f in os.listdir(dir)]
else:
assert False,'illegal input location %s' % dir
infiles.extend(files)
indirs.extend([dir] * len(files))
if not outputToFile:
os.makedirs(outdir)
# construct the list of output files
if not GPFileSystem.inGPFS(outdir):
os.makedirs(outdir)
if numReduceTasks == -1:
outfiles = infiles
else:
outfiles = map(lambda j:'part%04d' % j, range(numReduceTasks))
outfiles = map(lambda j:'part%04d' % j, range(numActualReduceTasks))
outdirs = [outdir]*len(outfiles)
return indirs,infiles,outdirs,outfiles

Expand Down
2 changes: 1 addition & 1 deletion mrs_test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ASYNC = 0

#
clean:
rm -rf tmp tmp.txt sharded parallel-events.txt server.log test test2 gpig_views
rm -rf tmp tmp.txt sharded parallel-events.txt server.log test2 gpig_views shuffler-*.log reducer-*.log

backup:
cp ../mrs_gp_v4.py ../backup/`date "+%H:%M:%S"`-mrs_gp_v4.py
Expand Down
2 changes: 1 addition & 1 deletion mrs_test/caterr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
if __name__ == "__main__":
for line in sys.stdin:
print line,
logging.warn("echo to stderr:"+line.strip()+"...")
logging.warn(line.strip())
Loading

0 comments on commit e277c8f

Please sign in to comment.