Skip to content

Commit b0dd81d

Browse files
committed
sql: fix top-level query stats when "inner" plans are present
Previously, whenever we ran an "inner" plan (via `runPlanInsidePlan` helper), we ignored the top-level query stats for the "inner" plan. As a result, reads and writes performed by the inner plan weren't reflected in the "outer" top-level query stats. This is now fixed by adjusting the routines, apply joins, and recursive CTEs to propagate their metrics as ProducerMetadata objects. Note that for routines the access to the DistSQL infra is rather difficult, so we plumbed the access via the planner straight into the DistSQLReceiver, and even though it's ugly, it should work in practice. The only alternative I see is adding the necessary reference into the `eval.Context`, but then it gets tricky to actually set that and ensure the right copy of the eval context is observed by all routines (plus we'd need to make the copy or restore the original state somehow), so I chose to not pursue it. Release note (bug fix): Previously, CockroachDB didn't include reads and writes performed by routines (user-defined functions and stored procedures) as well as apply joins into `bytes read`, `rows read`, and `rows written` statement execution statistics, and this is now fixed. The bug has been present since before 23.2 version.
1 parent 156a7a4 commit b0dd81d

File tree

10 files changed

+248
-20
lines changed

10 files changed

+248
-20
lines changed

pkg/sql/apply_join.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type applyJoinNode struct {
3535
// The data source with no outer columns.
3636
singleInputPlanNode
3737

38+
// forwarder allows propagating the ProducerMetadata towards the
39+
// DistSQLReceiver.
40+
forwarder metadataForwarder
41+
3842
// pred represents the join predicate.
3943
pred *joinPredicate
4044

@@ -248,12 +252,13 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
248252
}
249253
plan := p.(*planComponents)
250254
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
251-
if err := runPlanInsidePlan(
252-
ctx, params, plan, rowResultWriter,
253-
nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
254-
); err != nil {
255+
queryStats, err := runPlanInsidePlan(
256+
ctx, params, plan, rowResultWriter, nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
257+
)
258+
if err != nil {
255259
return err
256260
}
261+
forwardInnerQueryStats(a.forwarder, queryStats)
257262
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
258263
return nil
259264
}
@@ -267,7 +272,7 @@ func runPlanInsidePlan(
267272
resultWriter rowResultWriter,
268273
deferredRoutineSender eval.DeferredRoutineSender,
269274
stmtForDistSQLDiagram string,
270-
) error {
275+
) (topLevelQueryStats, error) {
271276
defer plan.close(ctx)
272277
execCfg := params.ExecCfg()
273278
recv := MakeDistSQLReceiver(
@@ -286,6 +291,11 @@ func runPlanInsidePlan(
286291
// before we can produce any "outer" rows to be returned to the client, so
287292
// we make sure to unset pausablePortal field on the planner.
288293
plannerCopy.pausablePortal = nil
294+
// Avoid any possible metadata confusion by unsetting the
295+
// routineMetadataForwarder (if there is a routine in the inner plan that
296+
// needs it, then the plannerCopy will be updated during the inner plan
297+
// setup).
298+
plannerCopy.routineMetadataForwarder = nil
289299

290300
// planner object embeds the extended eval context, so we will modify that
291301
// (which won't affect the outer planner's extended eval context), and we'll
@@ -301,6 +311,8 @@ func runPlanInsidePlan(
301311
// return from this method (after the main query is executed).
302312
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
303313
defer subqueryResultMemAcc.Close(ctx)
314+
// Note that planAndRunSubquery updates recv.stats with top-level
315+
// subquery stats.
304316
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
305317
ctx,
306318
&plannerCopy,
@@ -311,7 +323,7 @@ func runPlanInsidePlan(
311323
false, /* skipDistSQLDiagramGeneration */
312324
params.p.mustUseLeafTxn(),
313325
) {
314-
return resultWriter.Err()
326+
return recv.stats, resultWriter.Err()
315327
}
316328
}
317329

@@ -338,10 +350,10 @@ func runPlanInsidePlan(
338350

339351
// Check if there was an error interacting with the resultWriter.
340352
if recv.commErr != nil {
341-
return recv.commErr
353+
return recv.stats, recv.commErr
342354
}
343355
if resultWriter.Err() != nil {
344-
return resultWriter.Err()
356+
return recv.stats, resultWriter.Err()
345357
}
346358

347359
plannerCopy.autoCommit = false
@@ -358,10 +370,10 @@ func runPlanInsidePlan(
358370
// need to update the plan for cleanup purposes before proceeding.
359371
*plan = plannerCopy.curPlan.planComponents
360372
if recv.commErr != nil {
361-
return recv.commErr
373+
return recv.stats, recv.commErr
362374
}
363375

364-
return resultWriter.Err()
376+
return recv.stats, resultWriter.Err()
365377
}
366378

367379
func (a *applyJoinNode) Values() tree.Datums {

pkg/sql/conn_executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3957,6 +3957,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
39573957
p.noticeSender = nil
39583958
p.preparedStatements = ex.getPrepStmtsAccessor()
39593959
p.sqlCursors = ex.getCursorAccessor()
3960+
p.routineMetadataForwarder = nil
39603961
p.storedProcTxnState = ex.getStoredProcTxnStateAccessor()
39613962
p.createdSequences = ex.getCreatedSequencesAccessor()
39623963

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3496,6 +3496,8 @@ type topLevelQueryStats struct {
34963496
// client receiving the PGWire protocol messages (as well as construcing
34973497
// those messages).
34983498
clientTime time.Duration
3499+
// NB: when adding another field here, consider whether
3500+
// forwardInnerQueryStats method needs an adjustment.
34993501
}
35003502

35013503
func (s *topLevelQueryStats) add(other *topLevelQueryStats) {

pkg/sql/distsql_physical_planner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4371,7 +4371,7 @@ func (dsp *DistSQLPlanner) wrapPlan(
43714371
// expecting is in fact RowsAffected. RowsAffected statements return a single
43724372
// row with the number of rows affected by the statement, and are the only
43734373
// types of statement where it's valid to invoke a plan's fast path.
4374-
wrapper := newPlanNodeToRowSource(
4374+
wrapper, err := newPlanNodeToRowSource(
43754375
n,
43764376
runParams{
43774377
extendedEvalCtx: &evalCtx,
@@ -4380,6 +4380,9 @@ func (dsp *DistSQLPlanner) wrapPlan(
43804380
useFastPath,
43814381
firstNotWrapped,
43824382
)
4383+
if err != nil {
4384+
return nil, err
4385+
}
43834386

43844387
localProcIdx := p.AddLocalProcessor(wrapper)
43854388
var input []execinfrapb.InputSyncSpec

pkg/sql/distsql_running.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,15 @@ func (dsp *DistSQLPlanner) Run(
10051005
return
10061006
}
10071007

1008+
if len(flows) == 1 && evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1009+
// If we have a fully local plan and a RootTxn, we don't expect any
1010+
// concurrency, so it's safe to use the DistSQLReceiver to push the
1011+
// metadata into directly from routines.
1012+
if planCtx.planner != nil {
1013+
planCtx.planner.routineMetadataForwarder = recv
1014+
}
1015+
}
1016+
10081017
if finishedSetupFn != nil {
10091018
finishedSetupFn(flow)
10101019
}
@@ -1131,6 +1140,8 @@ type DistSQLReceiver struct {
11311140
}
11321141
}
11331142

1143+
var _ metadataForwarder = &DistSQLReceiver{}
1144+
11341145
// rowResultWriter is a subset of CommandResult to be used with the
11351146
// DistSQLReceiver. It's implemented by RowResultWriter.
11361147
type rowResultWriter interface {
@@ -1503,6 +1514,35 @@ func (r *DistSQLReceiver) checkConcurrentError() {
15031514
}
15041515
}
15051516

1517+
type metadataForwarder interface {
1518+
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
1519+
}
1520+
1521+
// forwardInnerQueryStats propagates the query stats of "inner" plans as
1522+
// metadata via the forwarder.
1523+
func forwardInnerQueryStats(f metadataForwarder, stats topLevelQueryStats) {
1524+
if !buildutil.CrdbTestBuild && f == nil {
1525+
// Safety measure in production builds in case the forwarder is nil for
1526+
// some reason.
1527+
return
1528+
}
1529+
meta := execinfrapb.GetProducerMeta()
1530+
meta.Metrics = execinfrapb.GetMetricsMeta()
1531+
meta.Metrics.BytesRead = stats.bytesRead
1532+
meta.Metrics.RowsRead = stats.rowsRead
1533+
meta.Metrics.RowsWritten = stats.rowsWritten
1534+
// stats.networkEgressEstimate and stats.clientTime are ignored since they
1535+
// only matter at the "true" top-level query (and actually should be zero
1536+
// here anyway).
1537+
f.forwardMetadata(meta)
1538+
}
1539+
1540+
func (r *DistSQLReceiver) forwardMetadata(metadata *execinfrapb.ProducerMetadata) {
1541+
// Note that we don't use pushMeta method directly in order to go through
1542+
// the testing callback path.
1543+
r.Push(nil /* row */, metadata)
1544+
}
1545+
15061546
// pushMeta takes in non-empty metadata object and pushes it to the result
15071547
// writer. Possibly updated status is returned.
15081548
func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus {

pkg/sql/distsql_running_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,3 +1214,127 @@ SELECT id, details FROM jobs AS j INNER JOIN cte1 ON id = job_id WHERE id = 1;
12141214
require.Equal(t, 1, id)
12151215
require.Equal(t, 1, details)
12161216
}
1217+
1218+
// TestTopLevelQueryStats verifies that top-level query stats are collected
1219+
// correctly, including when the query executes "plans inside plans".
1220+
func TestTopLevelQueryStats(t *testing.T) {
1221+
defer leaktest.AfterTest(t)()
1222+
defer log.Scope(t).Close(t)
1223+
1224+
// testQuery will be updated throughout the test to the current target.
1225+
var testQuery atomic.Value
1226+
// The callback will send number of rows read and rows written (for each
1227+
// ProducerMetadata.Metrics object) on these channels, respectively.
1228+
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
Knobs: base.TestingKnobs{
1231+
SQLExecutor: &ExecutorTestingKnobs{
1232+
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1233+
if target := testQuery.Load(); target == nil || target.(string) != query {
1234+
return nil
1235+
}
1236+
return func(row rowenc.EncDatumRow, batch coldata.Batch, meta *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1237+
if meta != nil && meta.Metrics != nil {
1238+
rowsReadCh <- meta.Metrics.RowsRead
1239+
rowsWrittenCh <- meta.Metrics.RowsWritten
1240+
}
1241+
return row, batch, meta
1242+
}
1243+
},
1244+
},
1245+
},
1246+
})
1247+
defer s.Stopper().Stop(context.Background())
1248+
1249+
if _, err := sqlDB.Exec(`
1250+
CREATE TABLE t (k INT PRIMARY KEY);
1251+
INSERT INTO t SELECT generate_series(1, 10);
1252+
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
1253+
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
1254+
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
1255+
`); err != nil {
1256+
t.Fatal(err)
1257+
}
1258+
1259+
for _, tc := range []struct {
1260+
name string
1261+
query string
1262+
expRowsRead int64
1263+
expRowsWritten int64
1264+
}{
1265+
{
1266+
name: "simple read",
1267+
query: "SELECT k FROM t",
1268+
expRowsRead: 10,
1269+
expRowsWritten: 0,
1270+
},
1271+
{
1272+
name: "simple write",
1273+
query: "INSERT INTO t SELECT generate_series(11, 42)",
1274+
expRowsRead: 0,
1275+
expRowsWritten: 32,
1276+
},
1277+
{
1278+
name: "read with apply join",
1279+
query: `SELECT (
1280+
WITH foo AS MATERIALIZED (SELECT k FROM t AS x WHERE x.k = y.k)
1281+
SELECT * FROM foo
1282+
) FROM t AS y`,
1283+
expRowsRead: 84, // scanning the table twice
1284+
expRowsWritten: 0,
1285+
},
1286+
{
1287+
name: "routine, no reads",
1288+
query: "SELECT no_reads()",
1289+
expRowsRead: 0,
1290+
expRowsWritten: 0,
1291+
},
1292+
{
1293+
name: "routine, reads",
1294+
query: "SELECT reads()",
1295+
expRowsRead: 42,
1296+
expRowsWritten: 0,
1297+
},
1298+
{
1299+
name: "routine, write",
1300+
query: "SELECT write(43)",
1301+
expRowsRead: 0,
1302+
expRowsWritten: 1,
1303+
},
1304+
{
1305+
name: "routine, multiple reads and writes",
1306+
query: "SELECT reads(), write(44), reads(), write(45), write(46), reads()",
1307+
expRowsRead: 133, // first read is 43 rows, second is 44, third is 46
1308+
expRowsWritten: 3,
1309+
},
1310+
} {
1311+
t.Run(tc.name, func(t *testing.T) {
1312+
testQuery.Store(tc.query)
1313+
errCh := make(chan error)
1314+
// Spin up the worker goroutine which will actually execute the
1315+
// query.
1316+
go func() {
1317+
defer close(errCh)
1318+
_, err := sqlDB.Exec(tc.query)
1319+
errCh <- err
1320+
}()
1321+
// In the main goroutine, loop until the query is completed while
1322+
// accumulating the top-level query stats.
1323+
var rowsRead, rowsWritten int64
1324+
LOOP:
1325+
for {
1326+
select {
1327+
case read := <-rowsReadCh:
1328+
rowsRead += read
1329+
case written := <-rowsWrittenCh:
1330+
rowsWritten += written
1331+
case err := <-errCh:
1332+
require.NoError(t, err)
1333+
break LOOP
1334+
}
1335+
}
1336+
require.Equal(t, tc.expRowsRead, rowsRead)
1337+
require.Equal(t, tc.expRowsWritten, rowsWritten)
1338+
})
1339+
}
1340+
}

pkg/sql/plan_node_to_row_source.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ import (
2323
"github.com/cockroachdb/errors"
2424
)
2525

26-
type metadataForwarder interface {
27-
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
28-
}
29-
3026
type planNodeToRowSource struct {
3127
execinfra.ProcessorBase
3228

@@ -50,6 +46,7 @@ type planNodeToRowSource struct {
5046
var _ execinfra.LocalProcessor = &planNodeToRowSource{}
5147
var _ execreleasable.Releasable = &planNodeToRowSource{}
5248
var _ execopnode.OpNode = &planNodeToRowSource{}
49+
var _ metadataForwarder = &planNodeToRowSource{}
5350

5451
var planNodeToRowSourcePool = sync.Pool{
5552
New: func() interface{} {
@@ -59,7 +56,7 @@ var planNodeToRowSourcePool = sync.Pool{
5956

6057
func newPlanNodeToRowSource(
6158
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
62-
) *planNodeToRowSource {
59+
) (*planNodeToRowSource, error) {
6360
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
6461
*p = planNodeToRowSource{
6562
ProcessorBase: p.ProcessorBase,
@@ -85,7 +82,39 @@ func newPlanNodeToRowSource(
8582
} else {
8683
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
8784
}
88-
return p
85+
// Find any planNodes that need a way to propagate metadata before we get to
86+
// the firstNotWrapped - this planNodeToRowSource adapter will be the
87+
// forwarder for all of them.
88+
var setForwarder func(parent planNode) error
89+
setForwarder = func(parent planNode) error {
90+
switch t := parent.(type) {
91+
case *applyJoinNode:
92+
t.forwarder = p
93+
case *recursiveCTENode:
94+
t.forwarder = p
95+
}
96+
for i, n := 0, parent.InputCount(); i < n; i++ {
97+
child, err := parent.Input(i)
98+
if err != nil {
99+
return err
100+
}
101+
if child == p.firstNotWrapped {
102+
// Once we get to the firstNotWrapped, we stop the recursion
103+
// since all remaining planNodes aren't our responsibility.
104+
return nil
105+
}
106+
if err = setForwarder(child); err != nil {
107+
return err
108+
}
109+
}
110+
return nil
111+
}
112+
err := setForwarder(p.node)
113+
if err != nil {
114+
p.Release()
115+
return nil, err
116+
}
117+
return p, nil
89118
}
90119

91120
// MustBeStreaming implements the execinfra.Processor interface.

0 commit comments

Comments
 (0)