@@ -745,7 +745,7 @@ func (dsp *DistSQLPlanner) Run(
745745 // the line.
746746 localState .EvalContext = evalCtx
747747 localState .IsLocal = planCtx .isLocal
748- localState .MustUseLeaf = planCtx .mustUseLeafTxn
748+ localState .AddConcurrency ( planCtx .flowConcurrency )
749749 localState .Txn = txn
750750 localState .LocalProcs = plan .LocalProcessors
751751 localState .LocalVectorSources = plan .LocalVectorSources
@@ -768,15 +768,19 @@ func (dsp *DistSQLPlanner) Run(
768768 // cannot create a LeafTxn, so we cannot parallelize scans.
769769 planCtx .parallelizeScansIfLocal = false
770770 for _ , flow := range flows {
771- localState .HasConcurrency = localState .HasConcurrency || execinfra .HasParallelProcessors (flow )
771+ if execinfra .HasParallelProcessors (flow ) {
772+ localState .AddConcurrency (distsql .ConcurrencyHasParallelProcessors )
773+ }
772774 }
773775 } else {
774776 if planCtx .isLocal && noMutations && planCtx .parallelizeScansIfLocal {
775777 // Even though we have a single flow on the gateway node, we might
776778 // have decided to parallelize the scans. If that's the case, we
777779 // will need to use the Leaf txn.
778780 for _ , flow := range flows {
779- localState .HasConcurrency = localState .HasConcurrency || execinfra .HasParallelProcessors (flow )
781+ if execinfra .HasParallelProcessors (flow ) {
782+ localState .AddConcurrency (distsql .ConcurrencyHasParallelProcessors )
783+ }
780784 }
781785 }
782786 if noMutations {
@@ -877,7 +881,7 @@ func (dsp *DistSQLPlanner) Run(
877881 // Both index and lookup joins, with and without
878882 // ordering, are executed via the Streamer API that has
879883 // concurrency.
880- localState .HasConcurrency = true
884+ localState .AddConcurrency ( distsql . ConcurrencyStreamer )
881885 break
882886 }
883887 }
@@ -1000,11 +1004,37 @@ func (dsp *DistSQLPlanner) Run(
10001004 return
10011005 }
10021006
1003- if len (flows ) == 1 && evalCtx .Txn != nil && evalCtx .Txn .Type () == kv .RootTxn {
1004- // If we have a fully local plan and a RootTxn, we don't expect any
1005- // concurrency, so it's safe to use the DistSQLReceiver to push the
1006- // metadata into directly from routines.
1007- if planCtx .planner != nil {
1007+ if len (flows ) == 1 && planCtx .planner != nil {
1008+ // We have a fully local plan, so check whether it'll be safe to use the
1009+ // DistSQLReceiver to push the metadata into directly from routines
1010+ // (which is the case when we don't have any concurrency between
1011+ // routines themselves as well as a routine and the "head" processor -
1012+ // the one pushing into the DistSQLReceiver).
1013+ var safe bool
1014+ if evalCtx .Txn != nil && evalCtx .Txn .Type () == kv .RootTxn {
1015+ // We have a RootTxn, so we don't expect any concurrency whatsoever.
1016+ safe = true
1017+ } else {
1018+ // We have a LeafTxn, so we need to examine what kind of concurrency
1019+ // is present in the flow.
1020+ var safeConcurrency distsql.ConcurrencyKind
1021+ // We don't care whether we use the Streamer API - it has
1022+ // concurrency only at the KV client level and below.
1023+ safeConcurrency |= distsql .ConcurrencyStreamer
1024+ // If we have "outer plan" concurrency, the "inner" and the
1025+ // "outer" plans have their own DistSQLReceivers.
1026+ //
1027+ // Note that the same is the case with parallel CHECKs concurrency,
1028+ // but then planCtx.planner is shared between goroutines, so we'll
1029+ // avoid mutating it. (We can't have routines in post-query CHECKs
1030+ // since only FK and UNIQUE checks are run in parallel.)
1031+ safeConcurrency |= distsql .ConcurrencyWithOuterPlan
1032+ unsafeConcurrency := ^ safeConcurrency
1033+ if localState .GetConcurrency ()& unsafeConcurrency == 0 {
1034+ safe = true
1035+ }
1036+ }
1037+ if safe {
10081038 planCtx .planner .routineMetadataForwarder = recv
10091039 }
10101040 }
@@ -1859,7 +1889,7 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
18591889 // Skip the diagram generation since on this "main" query path we
18601890 // can get it via the statement bundle.
18611891 true , /* skipDistSQLDiagramGeneration */
1862- false , /* mustUseLeafTxn */
1892+ false , /* innerPlansMustUseLeafTxn */
18631893 ) {
18641894 return recv .commErr
18651895 }
@@ -1926,7 +1956,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
19261956 recv * DistSQLReceiver ,
19271957 subqueryResultMemAcc * mon.BoundAccount ,
19281958 skipDistSQLDiagramGeneration bool ,
1929- mustUseLeafTxn bool ,
1959+ innerPlansMustUseLeafTxn bool ,
19301960) bool {
19311961 for planIdx , subqueryPlan := range subqueryPlans {
19321962 if err := dsp .planAndRunSubquery (
@@ -1939,7 +1969,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
19391969 recv ,
19401970 subqueryResultMemAcc ,
19411971 skipDistSQLDiagramGeneration ,
1942- mustUseLeafTxn ,
1972+ innerPlansMustUseLeafTxn ,
19431973 ); err != nil {
19441974 recv .SetError (err )
19451975 return false
@@ -1963,7 +1993,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
19631993 recv * DistSQLReceiver ,
19641994 subqueryResultMemAcc * mon.BoundAccount ,
19651995 skipDistSQLDiagramGeneration bool ,
1966- mustUseLeafTxn bool ,
1996+ innerPlansMustUseLeafTxn bool ,
19671997) error {
19681998 subqueryDistribution , distSQLProhibitedErr := planner .getPlanDistribution (ctx , subqueryPlan .plan )
19691999 distribute := DistributionType (LocalDistribution )
@@ -1975,7 +2005,9 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
19752005 subqueryPlanCtx .stmtType = tree .Rows
19762006 subqueryPlanCtx .skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration
19772007 subqueryPlanCtx .subOrPostQuery = true
1978- subqueryPlanCtx .mustUseLeafTxn = mustUseLeafTxn
2008+ if innerPlansMustUseLeafTxn {
2009+ subqueryPlanCtx .flowConcurrency = distsql .ConcurrencyWithOuterPlan
2010+ }
19792011 if planner .instrumentation .ShouldSaveFlows () {
19802012 subqueryPlanCtx .saveFlows = getDefaultSaveFlowsFunc (ctx , planner , planComponentTypeSubquery )
19812013 }
@@ -2590,7 +2622,9 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
25902622 }
25912623 postqueryPlanCtx .associateNodeWithComponents = associateNodeWithComponents
25922624 postqueryPlanCtx .collectExecStats = planner .instrumentation .ShouldCollectExecStats ()
2593- postqueryPlanCtx .mustUseLeafTxn = parallelCheck
2625+ if parallelCheck {
2626+ postqueryPlanCtx .flowConcurrency = distsql .ConcurrencyParallelChecks
2627+ }
25942628
25952629 postqueryPhysPlan , physPlanCleanup , err := dsp .createPhysPlan (ctx , postqueryPlanCtx , postqueryPlan )
25962630 defer physPlanCleanup ()
0 commit comments