From 786f1e9eedb676c2b43874d513588d8e9bdecc98 Mon Sep 17 00:00:00 2001 From: William Cohen Date: Fri, 15 Sep 2017 13:11:58 -0400 Subject: [PATCH] mpaside join added --- gpextras.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/gpextras.py b/gpextras.py index 51b2afb..e447447 100644 --- a/gpextras.py +++ b/gpextras.py @@ -82,6 +82,27 @@ def logprogress(rowValue,rowIndex): Log.__init__(self, inner=inner, logfun=logprogress) +def MapsideJoin(jinForLargeView, jinForSmallView): + """ A map-side join using Augment + """ + def smallViewLoader(view): + smallViewDict = {} + for line in open(view.distributableFile()): + row = view.planner._serializer.fromString(line.strip()) + key = (jinForSmallView.joinBy)(row) + smallViewDict[key] = row + return smallViewDict + def joiner((rowFromLargeView,smallViewDict)): + key = (jinForLargeView.joinBy)(rowFromLargeView) + if key in smallViewDict: + return [(rowFromLargeView,smallViewDict[key])] + else: + return [] + + return Augment( jinForLargeView.view, sideview=jinForSmallView.view, loadedBy=smallViewLoader) \ + | Flatten( by=joiner ) + + ############################################################################## # extension to use mrs_gp, a local map-reduce for streaming intended # mainly for use on ramdisks