Skip to content

Commit

Permalink
fixes after testing some simple gpig-mrs interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
wwcohen committed Nov 30, 2015
1 parent 00e9e68 commit 76edc06
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
10 changes: 5 additions & 5 deletions gpextras.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class MRSCompiler(MRCompiler):
"""

def __init__(self):
self.mrs_loc = 'mrs_gp'
self.mrsCommand = os.environ.get('GP_MRS_COMMAND','mrs_gp')

def distributeCommands(self,task,gp,maybeRemoteCopy,localCopy):
"""Distribute the remote copy to the local directory."""
Expand All @@ -100,13 +100,13 @@ def distributeCommands(self,task,gp,maybeRemoteCopy,localCopy):
def simpleMapCommands(self,task,gp,mapCom,src,dst):
"""A map-only job with zero or one inputs."""
assert src,'undefined src for this view? you may be using Wrap with target:mrs'
return [ "%s --input %s --output %s --mapper '%s'" % (self.mrs_loc,src,dst,mapCom) ]
return [ "%s --input %s --output %s --mapper '%s'" % (self.mrsCommand,src,dst,mapCom) ]

def simpleMapReduceCommands(self,task,gp,mapCom,reduceCom,src,dst):
"""A map-reduce job with one input."""
p = task.reduceParallel(gp)
return [ "%s --input %s --output %s --mapper '%s' --numReduceTasks %d --reducer '%s'" \
% (self.mrs_loc,src,dst,mapCom,p,reduceCom) ]
% (self.mrsCommand,src,dst,mapCom,p,reduceCom) ]

def joinCommands(self,task,gp,mapComs,reduceCom,srcs,midpoint,dst):
"""A map-reduce job with several inputs."""
Expand All @@ -115,8 +115,8 @@ def midi(i): return midpoint + '-' + str(i)
subplan = []
for i in range(len(srcs)):
subplan.append("%s --input %s --output %s --mapper '%s'" \
% (self.mrs_loc,srcs[i],mid(i),mapComs[i]))
% (self.mrsCommand,srcs[i],mid(i),mapComs[i]))
allMidpoints = ",".join([mid(i) for i in range(len(srcs))])
subplan.append("%s --inputs %s --output %s --mapper cat --numReduceTasks %d --reducer '%s'" \
% (self.mrs_loc,allMidpoints,dst,p,reduceCom))
% (self.mrsCommand,allMidpoints,dst,p,reduceCom))
return subplan
24 changes: 14 additions & 10 deletions mrs_gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,21 +685,29 @@ def setupFiles(indirList,outdir,numReduceTasks):
could also contains only files. In this case, if there is a
single output, it will also be a file.
"""
#clear space/make directory for output, if necessary
if os.path.exists(outdir):
logging.warn('removing %s' % (outdir))
shutil.rmtree(outdir)

indirs = []
infiles = []
outputToFile = False
if all(os.path.isfile(f) for f in indirList):
#ok if all inputs are files...
for f in indirList:
inhead,intail = os.path.split(f)
indirs.append(inhead)
infiles.append(intail)
#and if they are and there's one non-GPFS output, make that a file too
if (numReduceTasks==1 or (numReduceTasks==-1 and len(indirList)==1)) and not GPFileSystem.inGPFS(outdir):
#if there's one output, and all inputs are files, then make the output a file also
outhead,outtail = os.path.split(outdir)
outdirs=[outhead]
outfiles=[outtail]
outputToFile = True
else:
#collect all the input files
for dir in indirList:
if GPFileSystem.inGPFS(dir):
files = FS.listFiles(dir)
Expand All @@ -709,18 +717,14 @@ def setupFiles(indirList,outdir,numReduceTasks):
assert False,'illegal input location %s' % dir
infiles.extend(files)
indirs.extend([dir] * len(files))
#clear space/make directory for output, if necessary
if os.path.exists(outdir):
logging.warn('removing %s' % (outdir))
shutil.rmtree(outdir)
if not outputToFile:
os.makedirs(outdir)
# construct the list of output files
if numReduceTasks == -1:
outfiles = infiles
else:
outfiles = map(lambda j:'part04%d' % j, range(numReduceTasks))
outdirs = [outdir]*len(outfiles)
# construct the list of output files
if numReduceTasks == -1:
outfiles = infiles
else:
outfiles = map(lambda j:'part%04d' % j, range(numReduceTasks))
outdirs = [outdir]*len(outfiles)
return indirs,infiles,outdirs,outfiles

def getInput(indir,f):
Expand Down
2 changes: 1 addition & 1 deletion tutorial/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
update:
cp ../emr-extras/*-sample.sh
cp ../guineapig.py .
cp ../guineapig.py ../mrs_gp.py ../gpextras.py .

clean:
rm -rf gpig_views
Expand Down

0 comments on commit 76edc06

Please sign in to comment.