Skip to content

Commit 573bd5a

Browse files
authored
Merge 96ac92e into blathers/backport-release-25.3-155945
2 parents bd61646 + 96ac92e commit 573bd5a

File tree

10 files changed

+248
-20
lines changed

10 files changed

+248
-20
lines changed

pkg/sql/apply_join.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type applyJoinNode struct {
3535
// The data source with no outer columns.
3636
singleInputPlanNode
3737

38+
// forwarder allows propagating the ProducerMetadata towards the
39+
// DistSQLReceiver.
40+
forwarder metadataForwarder
41+
3842
// pred represents the join predicate.
3943
pred *joinPredicate
4044

@@ -248,12 +252,13 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
248252
}
249253
plan := p.(*planComponents)
250254
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
251-
if err := runPlanInsidePlan(
252-
ctx, params, plan, rowResultWriter,
253-
nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
254-
); err != nil {
255+
queryStats, err := runPlanInsidePlan(
256+
ctx, params, plan, rowResultWriter, nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
257+
)
258+
if err != nil {
255259
return err
256260
}
261+
forwardInnerQueryStats(a.forwarder, queryStats)
257262
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
258263
return nil
259264
}
@@ -267,7 +272,7 @@ func runPlanInsidePlan(
267272
resultWriter rowResultWriter,
268273
deferredRoutineSender eval.DeferredRoutineSender,
269274
stmtForDistSQLDiagram string,
270-
) error {
275+
) (topLevelQueryStats, error) {
271276
defer plan.close(ctx)
272277
execCfg := params.ExecCfg()
273278
recv := MakeDistSQLReceiver(
@@ -286,6 +291,11 @@ func runPlanInsidePlan(
286291
// before we can produce any "outer" rows to be returned to the client, so
287292
// we make sure to unset pausablePortal field on the planner.
288293
plannerCopy.pausablePortal = nil
294+
// Avoid any possible metadata confusion by unsetting the
295+
// routineMetadataForwarder (if there is a routine in the inner plan that
296+
// needs it, then the plannerCopy will be updated during the inner plan
297+
// setup).
298+
plannerCopy.routineMetadataForwarder = nil
289299

290300
// planner object embeds the extended eval context, so we will modify that
291301
// (which won't affect the outer planner's extended eval context), and we'll
@@ -301,6 +311,8 @@ func runPlanInsidePlan(
301311
// return from this method (after the main query is executed).
302312
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
303313
defer subqueryResultMemAcc.Close(ctx)
314+
// Note that planAndRunSubquery updates recv.stats with top-level
315+
// subquery stats.
304316
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
305317
ctx,
306318
&plannerCopy,
@@ -311,7 +323,7 @@ func runPlanInsidePlan(
311323
false, /* skipDistSQLDiagramGeneration */
312324
params.p.mustUseLeafTxn(),
313325
) {
314-
return resultWriter.Err()
326+
return recv.stats, resultWriter.Err()
315327
}
316328
}
317329

@@ -338,10 +350,10 @@ func runPlanInsidePlan(
338350

339351
// Check if there was an error interacting with the resultWriter.
340352
if recv.commErr != nil {
341-
return recv.commErr
353+
return recv.stats, recv.commErr
342354
}
343355
if resultWriter.Err() != nil {
344-
return resultWriter.Err()
356+
return recv.stats, resultWriter.Err()
345357
}
346358

347359
plannerCopy.autoCommit = false
@@ -358,10 +370,10 @@ func runPlanInsidePlan(
358370
// need to update the plan for cleanup purposes before proceeding.
359371
*plan = plannerCopy.curPlan.planComponents
360372
if recv.commErr != nil {
361-
return recv.commErr
373+
return recv.stats, recv.commErr
362374
}
363375

364-
return resultWriter.Err()
376+
return recv.stats, resultWriter.Err()
365377
}
366378

367379
func (a *applyJoinNode) Values() tree.Datums {

pkg/sql/conn_executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3957,6 +3957,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
39573957
p.noticeSender = nil
39583958
p.preparedStatements = ex.getPrepStmtsAccessor()
39593959
p.sqlCursors = ex.getCursorAccessor()
3960+
p.routineMetadataForwarder = nil
39603961
p.storedProcTxnState = ex.getStoredProcTxnStateAccessor()
39613962
p.createdSequences = ex.getCreatedSequencesAccessor()
39623963

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3496,6 +3496,8 @@ type topLevelQueryStats struct {
34963496
// client receiving the PGWire protocol messages (as well as construcing
34973497
// those messages).
34983498
clientTime time.Duration
3499+
// NB: when adding another field here, consider whether
3500+
// forwardInnerQueryStats method needs an adjustment.
34993501
}
35003502

35013503
func (s *topLevelQueryStats) add(other *topLevelQueryStats) {

pkg/sql/distsql_physical_planner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4371,7 +4371,7 @@ func (dsp *DistSQLPlanner) wrapPlan(
43714371
// expecting is in fact RowsAffected. RowsAffected statements return a single
43724372
// row with the number of rows affected by the statement, and are the only
43734373
// types of statement where it's valid to invoke a plan's fast path.
4374-
wrapper := newPlanNodeToRowSource(
4374+
wrapper, err := newPlanNodeToRowSource(
43754375
n,
43764376
runParams{
43774377
extendedEvalCtx: &evalCtx,
@@ -4380,6 +4380,9 @@ func (dsp *DistSQLPlanner) wrapPlan(
43804380
useFastPath,
43814381
firstNotWrapped,
43824382
)
4383+
if err != nil {
4384+
return nil, err
4385+
}
43834386

43844387
localProcIdx := p.AddLocalProcessor(wrapper)
43854388
var input []execinfrapb.InputSyncSpec

pkg/sql/distsql_running.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,15 @@ func (dsp *DistSQLPlanner) Run(
10051005
return
10061006
}
10071007

1008+
if len(flows) == 1 && evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1009+
// If we have a fully local plan and a RootTxn, we don't expect any
1010+
// concurrency, so it's safe to use the DistSQLReceiver to push the
1011+
// metadata into directly from routines.
1012+
if planCtx.planner != nil {
1013+
planCtx.planner.routineMetadataForwarder = recv
1014+
}
1015+
}
1016+
10081017
if finishedSetupFn != nil {
10091018
finishedSetupFn(flow)
10101019
}
@@ -1131,6 +1140,8 @@ type DistSQLReceiver struct {
11311140
}
11321141
}
11331142

1143+
var _ metadataForwarder = &DistSQLReceiver{}
1144+
11341145
// rowResultWriter is a subset of CommandResult to be used with the
11351146
// DistSQLReceiver. It's implemented by RowResultWriter.
11361147
type rowResultWriter interface {
@@ -1503,6 +1514,35 @@ func (r *DistSQLReceiver) checkConcurrentError() {
15031514
}
15041515
}
15051516

1517+
type metadataForwarder interface {
1518+
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
1519+
}
1520+
1521+
// forwardInnerQueryStats propagates the query stats of "inner" plans as
1522+
// metadata via the forwarder.
1523+
func forwardInnerQueryStats(f metadataForwarder, stats topLevelQueryStats) {
1524+
if !buildutil.CrdbTestBuild && f == nil {
1525+
// Safety measure in production builds in case the forwarder is nil for
1526+
// some reason.
1527+
return
1528+
}
1529+
meta := execinfrapb.GetProducerMeta()
1530+
meta.Metrics = execinfrapb.GetMetricsMeta()
1531+
meta.Metrics.BytesRead = stats.bytesRead
1532+
meta.Metrics.RowsRead = stats.rowsRead
1533+
meta.Metrics.RowsWritten = stats.rowsWritten
1534+
// stats.networkEgressEstimate and stats.clientTime are ignored since they
1535+
// only matter at the "true" top-level query (and actually should be zero
1536+
// here anyway).
1537+
f.forwardMetadata(meta)
1538+
}
1539+
1540+
func (r *DistSQLReceiver) forwardMetadata(metadata *execinfrapb.ProducerMetadata) {
1541+
// Note that we don't use pushMeta method directly in order to go through
1542+
// the testing callback path.
1543+
r.Push(nil /* row */, metadata)
1544+
}
1545+
15061546
// pushMeta takes in non-empty metadata object and pushes it to the result
15071547
// writer. Possibly updated status is returned.
15081548
func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus {

pkg/sql/distsql_running_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,3 +1214,127 @@ SELECT id, details FROM jobs AS j INNER JOIN cte1 ON id = job_id WHERE id = 1;
12141214
require.Equal(t, 1, id)
12151215
require.Equal(t, 1, details)
12161216
}
1217+
1218+
// TestTopLevelQueryStats verifies that top-level query stats are collected
1219+
// correctly, including when the query executes "plans inside plans".
1220+
func TestTopLevelQueryStats(t *testing.T) {
1221+
defer leaktest.AfterTest(t)()
1222+
defer log.Scope(t).Close(t)
1223+
1224+
// testQuery will be updated throughout the test to the current target.
1225+
var testQuery atomic.Value
1226+
// The callback will send number of rows read and rows written (for each
1227+
// ProducerMetadata.Metrics object) on these channels, respectively.
1228+
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
Knobs: base.TestingKnobs{
1231+
SQLExecutor: &ExecutorTestingKnobs{
1232+
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1233+
if target := testQuery.Load(); target == nil || target.(string) != query {
1234+
return nil
1235+
}
1236+
return func(row rowenc.EncDatumRow, batch coldata.Batch, meta *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1237+
if meta != nil && meta.Metrics != nil {
1238+
rowsReadCh <- meta.Metrics.RowsRead
1239+
rowsWrittenCh <- meta.Metrics.RowsWritten
1240+
}
1241+
return row, batch, meta
1242+
}
1243+
},
1244+
},
1245+
},
1246+
})
1247+
defer s.Stopper().Stop(context.Background())
1248+
1249+
if _, err := sqlDB.Exec(`
1250+
CREATE TABLE t (k INT PRIMARY KEY);
1251+
INSERT INTO t SELECT generate_series(1, 10);
1252+
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
1253+
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
1254+
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
1255+
`); err != nil {
1256+
t.Fatal(err)
1257+
}
1258+
1259+
for _, tc := range []struct {
1260+
name string
1261+
query string
1262+
expRowsRead int64
1263+
expRowsWritten int64
1264+
}{
1265+
{
1266+
name: "simple read",
1267+
query: "SELECT k FROM t",
1268+
expRowsRead: 10,
1269+
expRowsWritten: 0,
1270+
},
1271+
{
1272+
name: "simple write",
1273+
query: "INSERT INTO t SELECT generate_series(11, 42)",
1274+
expRowsRead: 0,
1275+
expRowsWritten: 32,
1276+
},
1277+
{
1278+
name: "read with apply join",
1279+
query: `SELECT (
1280+
WITH foo AS MATERIALIZED (SELECT k FROM t AS x WHERE x.k = y.k)
1281+
SELECT * FROM foo
1282+
) FROM t AS y`,
1283+
expRowsRead: 84, // scanning the table twice
1284+
expRowsWritten: 0,
1285+
},
1286+
{
1287+
name: "routine, no reads",
1288+
query: "SELECT no_reads()",
1289+
expRowsRead: 0,
1290+
expRowsWritten: 0,
1291+
},
1292+
{
1293+
name: "routine, reads",
1294+
query: "SELECT reads()",
1295+
expRowsRead: 42,
1296+
expRowsWritten: 0,
1297+
},
1298+
{
1299+
name: "routine, write",
1300+
query: "SELECT write(43)",
1301+
expRowsRead: 0,
1302+
expRowsWritten: 1,
1303+
},
1304+
{
1305+
name: "routine, multiple reads and writes",
1306+
query: "SELECT reads(), write(44), reads(), write(45), write(46), reads()",
1307+
expRowsRead: 133, // first read is 43 rows, second is 44, third is 46
1308+
expRowsWritten: 3,
1309+
},
1310+
} {
1311+
t.Run(tc.name, func(t *testing.T) {
1312+
testQuery.Store(tc.query)
1313+
errCh := make(chan error)
1314+
// Spin up the worker goroutine which will actually execute the
1315+
// query.
1316+
go func() {
1317+
defer close(errCh)
1318+
_, err := sqlDB.Exec(tc.query)
1319+
errCh <- err
1320+
}()
1321+
// In the main goroutine, loop until the query is completed while
1322+
// accumulating the top-level query stats.
1323+
var rowsRead, rowsWritten int64
1324+
LOOP:
1325+
for {
1326+
select {
1327+
case read := <-rowsReadCh:
1328+
rowsRead += read
1329+
case written := <-rowsWrittenCh:
1330+
rowsWritten += written
1331+
case err := <-errCh:
1332+
require.NoError(t, err)
1333+
break LOOP
1334+
}
1335+
}
1336+
require.Equal(t, tc.expRowsRead, rowsRead)
1337+
require.Equal(t, tc.expRowsWritten, rowsWritten)
1338+
})
1339+
}
1340+
}

pkg/sql/plan_node_to_row_source.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ import (
2323
"github.com/cockroachdb/errors"
2424
)
2525

26-
type metadataForwarder interface {
27-
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
28-
}
29-
3026
type planNodeToRowSource struct {
3127
execinfra.ProcessorBase
3228

@@ -50,6 +46,7 @@ type planNodeToRowSource struct {
5046
var _ execinfra.LocalProcessor = &planNodeToRowSource{}
5147
var _ execreleasable.Releasable = &planNodeToRowSource{}
5248
var _ execopnode.OpNode = &planNodeToRowSource{}
49+
var _ metadataForwarder = &planNodeToRowSource{}
5350

5451
var planNodeToRowSourcePool = sync.Pool{
5552
New: func() interface{} {
@@ -59,7 +56,7 @@ var planNodeToRowSourcePool = sync.Pool{
5956

6057
func newPlanNodeToRowSource(
6158
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
62-
) *planNodeToRowSource {
59+
) (*planNodeToRowSource, error) {
6360
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
6461
*p = planNodeToRowSource{
6562
ProcessorBase: p.ProcessorBase,
@@ -85,7 +82,39 @@ func newPlanNodeToRowSource(
8582
} else {
8683
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
8784
}
88-
return p
85+
// Find any planNodes that need a way to propagate metadata before we get to
86+
// the firstNotWrapped - this planNodeToRowSource adapter will be the
87+
// forwarder for all of them.
88+
var setForwarder func(parent planNode) error
89+
setForwarder = func(parent planNode) error {
90+
switch t := parent.(type) {
91+
case *applyJoinNode:
92+
t.forwarder = p
93+
case *recursiveCTENode:
94+
t.forwarder = p
95+
}
96+
for i, n := 0, parent.InputCount(); i < n; i++ {
97+
child, err := parent.Input(i)
98+
if err != nil {
99+
return err
100+
}
101+
if child == p.firstNotWrapped {
102+
// Once we get to the firstNotWrapped, we stop the recursion
103+
// since all remaining planNodes aren't our responsibility.
104+
return nil
105+
}
106+
if err = setForwarder(child); err != nil {
107+
return err
108+
}
109+
}
110+
return nil
111+
}
112+
err := setForwarder(p.node)
113+
if err != nil {
114+
p.Release()
115+
return nil, err
116+
}
117+
return p, nil
89118
}
90119

91120
// MustBeStreaming implements the execinfra.Processor interface.

0 commit comments

Comments
 (0)