diff --git a/gpextras.py b/gpextras.py index 51b2afb..e447447 100644 --- a/gpextras.py +++ b/gpextras.py @@ -82,6 +82,27 @@ def logprogress(rowValue,rowIndex): Log.__init__(self, inner=inner, logfun=logprogress) +def MapsideJoin(jinForLargeView, jinForSmallView): + """ A map-side join using Augment + """ + def smallViewLoader(view): + smallViewDict = {} + for line in open(view.distributableFile()): + row = view.planner._serializer.fromString(line.strip()) + key = (jinForSmallView.joinBy)(row) + smallViewDict[key] = row + return smallViewDict + def joiner((rowFromLargeView,smallViewDict)): + key = (jinForLargeView.joinBy)(rowFromLargeView) + if key in smallViewDict: + return [(rowFromLargeView,smallViewDict[key])] + else: + return [] + + return Augment( jinForLargeView.view, sideview=jinForSmallView.view, loadedBy=smallViewLoader) \ + | Flatten( by=joiner ) + + ############################################################################## # extension to use mrs_gp, a local map-reduce for streaming intended # mainly for use on ramdisks