Skip to content

Commit 96ac92e

Browse files
committed
sql: fix top-level query stats when "inner" plans are present
Previously, whenever we ran an "inner" plan (via `runPlanInsidePlan` helper), we ignored the top-level query stats for the "inner" plan. As a result, reads and writes performed by the inner plan weren't reflected in the "outer" top-level query stats. This is now fixed by adjusting the routines, apply joins, and recursive CTEs to propagate their metrics as ProducerMetadata objects. Note that for routines the access to the DistSQL infra is rather difficult, so we plumbed the access via the planner straight into the DistSQLReceiver, and even though it's ugly, it should work in practice. The only alternative I see is adding the necessary reference into the `eval.Context`, but then it gets tricky to actually set that and ensure the right copy of the eval context is observed by all routines (plus we'd need to make the copy or restore the original state somehow), so I chose to not pursue it. Release note (bug fix): Previously, CockroachDB didn't include reads and writes performed by routines (user-defined functions and stored procedures) as well as apply joins into `bytes read`, `rows read`, and `rows written` statement execution statistics, and this is now fixed. The bug has been present since before 23.2 version.
1 parent 37e26c6 commit 96ac92e

File tree

10 files changed

+248
-20
lines changed

10 files changed

+248
-20
lines changed

pkg/sql/apply_join.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type applyJoinNode struct {
3535
// The data source with no outer columns.
3636
singleInputPlanNode
3737

38+
// forwarder allows propagating the ProducerMetadata towards the
39+
// DistSQLReceiver.
40+
forwarder metadataForwarder
41+
3842
// pred represents the join predicate.
3943
pred *joinPredicate
4044

@@ -248,12 +252,13 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
248252
}
249253
plan := p.(*planComponents)
250254
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
251-
if err := runPlanInsidePlan(
252-
ctx, params, plan, rowResultWriter,
253-
nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
254-
); err != nil {
255+
queryStats, err := runPlanInsidePlan(
256+
ctx, params, plan, rowResultWriter, nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
257+
)
258+
if err != nil {
255259
return err
256260
}
261+
forwardInnerQueryStats(a.forwarder, queryStats)
257262
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
258263
return nil
259264
}
@@ -267,7 +272,7 @@ func runPlanInsidePlan(
267272
resultWriter rowResultWriter,
268273
deferredRoutineSender eval.DeferredRoutineSender,
269274
stmtForDistSQLDiagram string,
270-
) error {
275+
) (topLevelQueryStats, error) {
271276
defer plan.close(ctx)
272277
execCfg := params.ExecCfg()
273278
recv := MakeDistSQLReceiver(
@@ -286,6 +291,11 @@ func runPlanInsidePlan(
286291
// before we can produce any "outer" rows to be returned to the client, so
287292
// we make sure to unset pausablePortal field on the planner.
288293
plannerCopy.pausablePortal = nil
294+
// Avoid any possible metadata confusion by unsetting the
295+
// routineMetadataForwarder (if there is a routine in the inner plan that
296+
// needs it, then the plannerCopy will be updated during the inner plan
297+
// setup).
298+
plannerCopy.routineMetadataForwarder = nil
289299

290300
// planner object embeds the extended eval context, so we will modify that
291301
// (which won't affect the outer planner's extended eval context), and we'll
@@ -301,6 +311,8 @@ func runPlanInsidePlan(
301311
// return from this method (after the main query is executed).
302312
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
303313
defer subqueryResultMemAcc.Close(ctx)
314+
// Note that planAndRunSubquery updates recv.stats with top-level
315+
// subquery stats.
304316
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
305317
ctx,
306318
&plannerCopy,
@@ -311,7 +323,7 @@ func runPlanInsidePlan(
311323
false, /* skipDistSQLDiagramGeneration */
312324
params.p.mustUseLeafTxn(),
313325
) {
314-
return resultWriter.Err()
326+
return recv.stats, resultWriter.Err()
315327
}
316328
}
317329

@@ -338,10 +350,10 @@ func runPlanInsidePlan(
338350

339351
// Check if there was an error interacting with the resultWriter.
340352
if recv.commErr != nil {
341-
return recv.commErr
353+
return recv.stats, recv.commErr
342354
}
343355
if resultWriter.Err() != nil {
344-
return resultWriter.Err()
356+
return recv.stats, resultWriter.Err()
345357
}
346358

347359
plannerCopy.autoCommit = false
@@ -358,10 +370,10 @@ func runPlanInsidePlan(
358370
// need to update the plan for cleanup purposes before proceeding.
359371
*plan = plannerCopy.curPlan.planComponents
360372
if recv.commErr != nil {
361-
return recv.commErr
373+
return recv.stats, recv.commErr
362374
}
363375

364-
return resultWriter.Err()
376+
return recv.stats, resultWriter.Err()
365377
}
366378

367379
func (a *applyJoinNode) Values() tree.Datums {

pkg/sql/conn_executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3955,6 +3955,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
39553955
p.noticeSender = nil
39563956
p.preparedStatements = ex.getPrepStmtsAccessor()
39573957
p.sqlCursors = ex.getCursorAccessor()
3958+
p.routineMetadataForwarder = nil
39583959
p.storedProcTxnState = ex.getStoredProcTxnStateAccessor()
39593960
p.createdSequences = ex.getCreatedSequencesAccessor()
39603961

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3292,6 +3292,8 @@ type topLevelQueryStats struct {
32923292
// client receiving the PGWire protocol messages (as well as construcing
32933293
// those messages).
32943294
clientTime time.Duration
3295+
// NB: when adding another field here, consider whether
3296+
// forwardInnerQueryStats method needs an adjustment.
32953297
}
32963298

32973299
func (s *topLevelQueryStats) add(other *topLevelQueryStats) {

pkg/sql/distsql_physical_planner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4375,7 +4375,7 @@ func (dsp *DistSQLPlanner) wrapPlan(
43754375
// expecting is in fact RowsAffected. RowsAffected statements return a single
43764376
// row with the number of rows affected by the statement, and are the only
43774377
// types of statement where it's valid to invoke a plan's fast path.
4378-
wrapper := newPlanNodeToRowSource(
4378+
wrapper, err := newPlanNodeToRowSource(
43794379
n,
43804380
runParams{
43814381
extendedEvalCtx: &evalCtx,
@@ -4384,6 +4384,9 @@ func (dsp *DistSQLPlanner) wrapPlan(
43844384
useFastPath,
43854385
firstNotWrapped,
43864386
)
4387+
if err != nil {
4388+
return nil, err
4389+
}
43874390

43884391
localProcIdx := p.AddLocalProcessor(wrapper)
43894392
var input []execinfrapb.InputSyncSpec

pkg/sql/distsql_running.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,15 @@ func (dsp *DistSQLPlanner) Run(
10141014
return
10151015
}
10161016

1017+
if len(flows) == 1 && evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1018+
// If we have a fully local plan and a RootTxn, we don't expect any
1019+
// concurrency, so it's safe to use the DistSQLReceiver to push the
1020+
// metadata into directly from routines.
1021+
if planCtx.planner != nil {
1022+
planCtx.planner.routineMetadataForwarder = recv
1023+
}
1024+
}
1025+
10171026
if finishedSetupFn != nil {
10181027
finishedSetupFn(flow)
10191028
}
@@ -1144,6 +1153,8 @@ type DistSQLReceiver struct {
11441153
}
11451154
}
11461155

1156+
var _ metadataForwarder = &DistSQLReceiver{}
1157+
11471158
// rowResultWriter is a subset of CommandResult to be used with the
11481159
// DistSQLReceiver. It's implemented by RowResultWriter.
11491160
type rowResultWriter interface {
@@ -1570,6 +1581,35 @@ func (r *DistSQLReceiver) checkConcurrentError() {
15701581
}
15711582
}
15721583

1584+
type metadataForwarder interface {
1585+
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
1586+
}
1587+
1588+
// forwardInnerQueryStats propagates the query stats of "inner" plans as
1589+
// metadata via the forwarder.
1590+
func forwardInnerQueryStats(f metadataForwarder, stats topLevelQueryStats) {
1591+
if !buildutil.CrdbTestBuild && f == nil {
1592+
// Safety measure in production builds in case the forwarder is nil for
1593+
// some reason.
1594+
return
1595+
}
1596+
meta := execinfrapb.GetProducerMeta()
1597+
meta.Metrics = execinfrapb.GetMetricsMeta()
1598+
meta.Metrics.BytesRead = stats.bytesRead
1599+
meta.Metrics.RowsRead = stats.rowsRead
1600+
meta.Metrics.RowsWritten = stats.rowsWritten
1601+
// stats.networkEgressEstimate and stats.clientTime are ignored since they
1602+
// only matter at the "true" top-level query (and actually should be zero
1603+
// here anyway).
1604+
f.forwardMetadata(meta)
1605+
}
1606+
1607+
func (r *DistSQLReceiver) forwardMetadata(metadata *execinfrapb.ProducerMetadata) {
1608+
// Note that we don't use pushMeta method directly in order to go through
1609+
// the testing callback path.
1610+
r.Push(nil /* row */, metadata)
1611+
}
1612+
15731613
// pushMeta takes in non-empty metadata object and pushes it to the result
15741614
// writer. Possibly updated status is returned.
15751615
func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus {

pkg/sql/distsql_running_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,3 +1214,127 @@ SELECT id, details FROM jobs AS j INNER JOIN cte1 ON id = job_id WHERE id = 1;
12141214
require.Equal(t, 1, id)
12151215
require.Equal(t, 1, details)
12161216
}
1217+
1218+
// TestTopLevelQueryStats verifies that top-level query stats are collected
1219+
// correctly, including when the query executes "plans inside plans".
1220+
func TestTopLevelQueryStats(t *testing.T) {
1221+
defer leaktest.AfterTest(t)()
1222+
defer log.Scope(t).Close(t)
1223+
1224+
// testQuery will be updated throughout the test to the current target.
1225+
var testQuery atomic.Value
1226+
// The callback will send number of rows read and rows written (for each
1227+
// ProducerMetadata.Metrics object) on these channels, respectively.
1228+
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
Knobs: base.TestingKnobs{
1231+
SQLExecutor: &ExecutorTestingKnobs{
1232+
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1233+
if target := testQuery.Load(); target == nil || target.(string) != query {
1234+
return nil
1235+
}
1236+
return func(row rowenc.EncDatumRow, batch coldata.Batch, meta *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1237+
if meta != nil && meta.Metrics != nil {
1238+
rowsReadCh <- meta.Metrics.RowsRead
1239+
rowsWrittenCh <- meta.Metrics.RowsWritten
1240+
}
1241+
return row, batch, meta
1242+
}
1243+
},
1244+
},
1245+
},
1246+
})
1247+
defer s.Stopper().Stop(context.Background())
1248+
1249+
if _, err := sqlDB.Exec(`
1250+
CREATE TABLE t (k INT PRIMARY KEY);
1251+
INSERT INTO t SELECT generate_series(1, 10);
1252+
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
1253+
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
1254+
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
1255+
`); err != nil {
1256+
t.Fatal(err)
1257+
}
1258+
1259+
for _, tc := range []struct {
1260+
name string
1261+
query string
1262+
expRowsRead int64
1263+
expRowsWritten int64
1264+
}{
1265+
{
1266+
name: "simple read",
1267+
query: "SELECT k FROM t",
1268+
expRowsRead: 10,
1269+
expRowsWritten: 0,
1270+
},
1271+
{
1272+
name: "simple write",
1273+
query: "INSERT INTO t SELECT generate_series(11, 42)",
1274+
expRowsRead: 0,
1275+
expRowsWritten: 32,
1276+
},
1277+
{
1278+
name: "read with apply join",
1279+
query: `SELECT (
1280+
WITH foo AS MATERIALIZED (SELECT k FROM t AS x WHERE x.k = y.k)
1281+
SELECT * FROM foo
1282+
) FROM t AS y`,
1283+
expRowsRead: 84, // scanning the table twice
1284+
expRowsWritten: 0,
1285+
},
1286+
{
1287+
name: "routine, no reads",
1288+
query: "SELECT no_reads()",
1289+
expRowsRead: 0,
1290+
expRowsWritten: 0,
1291+
},
1292+
{
1293+
name: "routine, reads",
1294+
query: "SELECT reads()",
1295+
expRowsRead: 42,
1296+
expRowsWritten: 0,
1297+
},
1298+
{
1299+
name: "routine, write",
1300+
query: "SELECT write(43)",
1301+
expRowsRead: 0,
1302+
expRowsWritten: 1,
1303+
},
1304+
{
1305+
name: "routine, multiple reads and writes",
1306+
query: "SELECT reads(), write(44), reads(), write(45), write(46), reads()",
1307+
expRowsRead: 133, // first read is 43 rows, second is 44, third is 46
1308+
expRowsWritten: 3,
1309+
},
1310+
} {
1311+
t.Run(tc.name, func(t *testing.T) {
1312+
testQuery.Store(tc.query)
1313+
errCh := make(chan error)
1314+
// Spin up the worker goroutine which will actually execute the
1315+
// query.
1316+
go func() {
1317+
defer close(errCh)
1318+
_, err := sqlDB.Exec(tc.query)
1319+
errCh <- err
1320+
}()
1321+
// In the main goroutine, loop until the query is completed while
1322+
// accumulating the top-level query stats.
1323+
var rowsRead, rowsWritten int64
1324+
LOOP:
1325+
for {
1326+
select {
1327+
case read := <-rowsReadCh:
1328+
rowsRead += read
1329+
case written := <-rowsWrittenCh:
1330+
rowsWritten += written
1331+
case err := <-errCh:
1332+
require.NoError(t, err)
1333+
break LOOP
1334+
}
1335+
}
1336+
require.Equal(t, tc.expRowsRead, rowsRead)
1337+
require.Equal(t, tc.expRowsWritten, rowsWritten)
1338+
})
1339+
}
1340+
}

pkg/sql/plan_node_to_row_source.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ import (
2323
"github.com/cockroachdb/errors"
2424
)
2525

26-
type metadataForwarder interface {
27-
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
28-
}
29-
3026
type planNodeToRowSource struct {
3127
execinfra.ProcessorBase
3228

@@ -50,6 +46,7 @@ type planNodeToRowSource struct {
5046
var _ execinfra.LocalProcessor = &planNodeToRowSource{}
5147
var _ execreleasable.Releasable = &planNodeToRowSource{}
5248
var _ execopnode.OpNode = &planNodeToRowSource{}
49+
var _ metadataForwarder = &planNodeToRowSource{}
5350

5451
var planNodeToRowSourcePool = sync.Pool{
5552
New: func() interface{} {
@@ -59,7 +56,7 @@ var planNodeToRowSourcePool = sync.Pool{
5956

6057
func newPlanNodeToRowSource(
6158
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
62-
) *planNodeToRowSource {
59+
) (*planNodeToRowSource, error) {
6360
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
6461
*p = planNodeToRowSource{
6562
ProcessorBase: p.ProcessorBase,
@@ -85,7 +82,39 @@ func newPlanNodeToRowSource(
8582
} else {
8683
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
8784
}
88-
return p
85+
// Find any planNodes that need a way to propagate metadata before we get to
86+
// the firstNotWrapped - this planNodeToRowSource adapter will be the
87+
// forwarder for all of them.
88+
var setForwarder func(parent planNode) error
89+
setForwarder = func(parent planNode) error {
90+
switch t := parent.(type) {
91+
case *applyJoinNode:
92+
t.forwarder = p
93+
case *recursiveCTENode:
94+
t.forwarder = p
95+
}
96+
for i, n := 0, parent.InputCount(); i < n; i++ {
97+
child, err := parent.Input(i)
98+
if err != nil {
99+
return err
100+
}
101+
if child == p.firstNotWrapped {
102+
// Once we get to the firstNotWrapped, we stop the recursion
103+
// since all remaining planNodes aren't our responsibility.
104+
return nil
105+
}
106+
if err = setForwarder(child); err != nil {
107+
return err
108+
}
109+
}
110+
return nil
111+
}
112+
err := setForwarder(p.node)
113+
if err != nil {
114+
p.Release()
115+
return nil, err
116+
}
117+
return p, nil
89118
}
90119

91120
// MustBeStreaming implements the execinfra.Processor interface.

0 commit comments

Comments
 (0)