Skip to content

Commit

Permalink
mpaside join added
Browse files Browse the repository at this point in the history
  • Loading branch information
wwcohen committed Sep 15, 2017
1 parent a15b390 commit 786f1e9
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions gpextras.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ def logprogress(rowValue,rowIndex):
Log.__init__(self, inner=inner, logfun=logprogress)


def MapsideJoin(jinForLargeView, jinForSmallView):
""" A map-side join using Augment
"""
def smallViewLoader(view):
smallViewDict = {}
for line in open(view.distributableFile()):
row = view.planner._serializer.fromString(line.strip())
key = (jinForSmallView.joinBy)(row)
smallViewDict[key] = row
return smallViewDict
def joiner((rowFromLargeView,smallViewDict)):
key = (jinForLargeView.joinBy)(rowFromLargeView)
if key in smallViewDict:
return [(rowFromLargeView,smallViewDict[key])]
else:
return []

return Augment( jinForLargeView.view, sideview=jinForSmallView.view, loadedBy=smallViewLoader) \
| Flatten( by=joiner )


##############################################################################
# extension to use mrs_gp, a local map-reduce for streaming intended
# mainly for use on ramdisks
Expand Down

0 comments on commit 786f1e9

Please sign in to comment.