@@ -11,6 +11,7 @@ import (
1111
1212 "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
1313 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
14+ "github.com/cockroachdb/cockroach/pkg/sql/distsql"
1415 "github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
1516 "github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
1617 "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -35,6 +36,10 @@ type applyJoinNode struct {
3536 // The data source with no outer columns.
3637 singleInputPlanNode
3738
39+ // forwarder allows propagating the ProducerMetadata towards the
40+ // DistSQLReceiver.
41+ forwarder metadataForwarder
42+
3843 // pred represents the join predicate.
3944 pred * joinPredicate
4045
@@ -248,12 +253,13 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
248253 }
249254 plan := p .(* planComponents )
250255 rowResultWriter := NewRowResultWriter (& a .run .rightRows )
251- if err := runPlanInsidePlan (
252- ctx , params , plan , rowResultWriter ,
253- nil /* deferredRoutineSender */ , "" , /* stmtForDistSQLDiagram */
254- ); err != nil {
256+ queryStats , err := runPlanInsidePlan (
257+ ctx , params , plan , rowResultWriter , nil /* deferredRoutineSender */ , "" , /* stmtForDistSQLDiagram */
258+ )
259+ if err != nil {
255260 return err
256261 }
262+ forwardInnerQueryStats (a .forwarder , queryStats )
257263 a .run .rightRowsIterator = newRowContainerIterator (ctx , a .run .rightRows )
258264 return nil
259265}
@@ -267,7 +273,7 @@ func runPlanInsidePlan(
267273 resultWriter rowResultWriter ,
268274 deferredRoutineSender eval.DeferredRoutineSender ,
269275 stmtForDistSQLDiagram string ,
270- ) error {
276+ ) ( topLevelQueryStats , error ) {
271277 defer plan .close (ctx )
272278 execCfg := params .ExecCfg ()
273279 recv := MakeDistSQLReceiver (
@@ -286,6 +292,11 @@ func runPlanInsidePlan(
286292 // before we can produce any "outer" rows to be returned to the client, so
287293 // we make sure to unset pausablePortal field on the planner.
288294 plannerCopy .pausablePortal = nil
295+ // Avoid any possible metadata confusion by unsetting the
296+ // routineMetadataForwarder (if there is a routine in the inner plan that
297+ // needs it, then the plannerCopy will be updated during the inner plan
298+ // setup).
299+ plannerCopy .routineMetadataForwarder = nil
289300
290301 // planner object embeds the extended eval context, so we will modify that
291302 // (which won't affect the outer planner's extended eval context), and we'll
@@ -301,6 +312,8 @@ func runPlanInsidePlan(
301312 // return from this method (after the main query is executed).
302313 subqueryResultMemAcc := params .p .Mon ().MakeBoundAccount ()
303314 defer subqueryResultMemAcc .Close (ctx )
315+ // Note that planAndRunSubquery updates recv.stats with top-level
316+ // subquery stats.
304317 if ! execCfg .DistSQLPlanner .PlanAndRunSubqueries (
305318 ctx ,
306319 & plannerCopy ,
@@ -309,9 +322,9 @@ func runPlanInsidePlan(
309322 recv ,
310323 & subqueryResultMemAcc ,
311324 false , /* skipDistSQLDiagramGeneration */
312- params .p .mustUseLeafTxn (),
325+ params .p .innerPlansMustUseLeafTxn (),
313326 ) {
314- return resultWriter .Err ()
327+ return recv . stats , resultWriter .Err ()
315328 }
316329 }
317330
@@ -324,7 +337,9 @@ func runPlanInsidePlan(
324337 planCtx := execCfg .DistSQLPlanner .NewPlanningCtx (ctx , evalCtx , & plannerCopy , plannerCopy .txn , distributeType )
325338 planCtx .distSQLProhibitedErr = distSQLProhibitedErr
326339 planCtx .stmtType = recv .stmtType
327- planCtx .mustUseLeafTxn = params .p .mustUseLeafTxn ()
340+ if params .p .innerPlansMustUseLeafTxn () {
341+ planCtx .flowConcurrency = distsql .ConcurrencyWithOuterPlan
342+ }
328343 planCtx .stmtForDistSQLDiagram = stmtForDistSQLDiagram
329344
330345 // Wrap PlanAndRun in a function call so that we clean up immediately.
@@ -338,10 +353,10 @@ func runPlanInsidePlan(
338353
339354 // Check if there was an error interacting with the resultWriter.
340355 if recv .commErr != nil {
341- return recv .commErr
356+ return recv .stats , recv . commErr
342357 }
343358 if resultWriter .Err () != nil {
344- return resultWriter .Err ()
359+ return recv . stats , resultWriter .Err ()
345360 }
346361
347362 plannerCopy .autoCommit = false
@@ -358,10 +373,10 @@ func runPlanInsidePlan(
358373 // need to update the plan for cleanup purposes before proceeding.
359374 * plan = plannerCopy .curPlan .planComponents
360375 if recv .commErr != nil {
361- return recv .commErr
376+ return recv .stats , recv . commErr
362377 }
363378
364- return resultWriter .Err ()
379+ return recv . stats , resultWriter .Err ()
365380}
366381
367382func (a * applyJoinNode ) Values () tree.Datums {
0 commit comments