Skip to content

Commit 93c6870

Browse files
authored
Merge pull request #155538 from cockroachdb/blathers/backport-release-25.4-155273
release-25.4: changefeedccl: fix progress skew metrics computation
2 parents d05d689 + e57d1d9 commit 93c6870

File tree

5 files changed

+92
-44
lines changed

5 files changed

+92
-44
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ go_library(
145145
"//pkg/util/httputil",
146146
"//pkg/util/humanizeutil",
147147
"//pkg/util/intsets",
148+
"//pkg/util/iterutil",
148149
"//pkg/util/json",
149150
"//pkg/util/log",
150151
"//pkg/util/log/channel",

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,16 +2211,29 @@ func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc
22112211

22122212
// updateProgressSkewMetrics updates the progress skew metrics.
22132213
func (cf *changeFrontier) updateProgressSkewMetrics() {
2214-
maxSpanTS := cf.frontier.LatestTS()
2215-
maxTableTS := cf.frontier.Frontier()
2216-
for _, f := range cf.frontier.Frontiers() {
2217-
tableTS := f.Frontier()
2218-
if tableTS.After(maxTableTS) {
2219-
maxTableTS = tableTS
2214+
fastestSpanTS := cf.frontier.LatestTS()
2215+
fastestTableTS := func() hlc.Timestamp {
2216+
var maxTS hlc.Timestamp
2217+
for _, f := range cf.frontier.Frontiers() {
2218+
if f.Frontier().After(maxTS) {
2219+
maxTS = f.Frontier()
2220+
}
2221+
}
2222+
return maxTS
2223+
}()
2224+
2225+
slowestTS := cf.frontier.Frontier()
2226+
var spanSkew, tableSkew int64
2227+
if slowestTS.IsSet() {
2228+
if fastestSpanTS.IsSet() {
2229+
spanSkew = fastestSpanTS.WallTime - slowestTS.WallTime
2230+
}
2231+
if fastestTableTS.IsSet() {
2232+
tableSkew = fastestTableTS.WallTime - slowestTS.WallTime
22202233
}
22212234
}
22222235

2223-
cf.sliMetrics.setFastestTS(cf.sliMetricsID, maxSpanTS, maxTableTS)
2236+
cf.sliMetrics.setProgressSkew(cf.sliMetricsID, spanSkew, tableSkew)
22242237
}
22252238

22262239
func frontierIsBehind(frontier hlc.Timestamp, sv *settings.Values) bool {

pkg/ccl/changefeedccl/metrics.go

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package changefeedccl
77

88
import (
9+
"cmp"
910
"context"
11+
"maps"
1012
"slices"
1113
"strings"
1214
"sync/atomic"
@@ -25,6 +27,7 @@ import (
2527
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2628
"github.com/cockroachdb/cockroach/pkg/util/cidr"
2729
"github.com/cockroachdb/cockroach/pkg/util/hlc"
30+
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
2831
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
2932
"github.com/cockroachdb/cockroach/pkg/util/metric"
3033
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
@@ -184,11 +187,11 @@ type sliMetrics struct {
184187

185188
mu struct {
186189
syncutil.Mutex
187-
id int64
188-
resolved map[int64]hlc.Timestamp
189-
checkpoint map[int64]hlc.Timestamp
190-
fastestSpan map[int64]hlc.Timestamp
191-
fastestTable map[int64]hlc.Timestamp
190+
id int64
191+
resolved map[int64]hlc.Timestamp
192+
checkpoint map[int64]hlc.Timestamp
193+
spanSkew map[int64]int64
194+
tableSkew map[int64]int64
192195
}
193196
NetMetrics *cidr.NetMetrics
194197

@@ -202,8 +205,8 @@ func (m *sliMetrics) closeId(id int64) {
202205
defer m.mu.Unlock()
203206
delete(m.mu.checkpoint, id)
204207
delete(m.mu.resolved, id)
205-
delete(m.mu.fastestSpan, id)
206-
delete(m.mu.fastestTable, id)
208+
delete(m.mu.spanSkew, id)
209+
delete(m.mu.tableSkew, id)
207210
}
208211

209212
// setResolved writes a resolved timestamp entry for the given id.
@@ -224,15 +227,15 @@ func (m *sliMetrics) setCheckpoint(id int64, ts hlc.Timestamp) {
224227
}
225228
}
226229

227-
// setFastestTS saves the fastest span/table timestamps for a given id.
228-
func (m *sliMetrics) setFastestTS(id int64, spanTS hlc.Timestamp, tableTS hlc.Timestamp) {
230+
// setProgressSkew saves the span skew/table skew for a given ID.
231+
func (m *sliMetrics) setProgressSkew(id int64, spanSkew int64, tableSkew int64) {
229232
m.mu.Lock()
230233
defer m.mu.Unlock()
231-
if _, ok := m.mu.fastestSpan[id]; ok {
232-
m.mu.fastestSpan[id] = spanTS
234+
if _, ok := m.mu.spanSkew[id]; ok {
235+
m.mu.spanSkew[id] = spanSkew
233236
}
234-
if _, ok := m.mu.fastestTable[id]; ok {
235-
m.mu.fastestTable[id] = tableTS
237+
if _, ok := m.mu.tableSkew[id]; ok {
238+
m.mu.tableSkew[id] = tableSkew
236239
}
237240
}
238241

@@ -245,8 +248,8 @@ func (m *sliMetrics) claimId() int64 {
245248
// ignored until a nonzero timestamp is written.
246249
m.mu.checkpoint[id] = hlc.Timestamp{}
247250
m.mu.resolved[id] = hlc.Timestamp{}
248-
m.mu.fastestSpan[id] = hlc.Timestamp{}
249-
m.mu.fastestTable[id] = hlc.Timestamp{}
251+
m.mu.spanSkew[id] = 0
252+
m.mu.tableSkew[id] = 0
250253
m.mu.id++
251254
return id
252255
}
@@ -1272,8 +1275,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12721275
}
12731276
sm.mu.resolved = make(map[int64]hlc.Timestamp)
12741277
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
1275-
sm.mu.fastestSpan = make(map[int64]hlc.Timestamp)
1276-
sm.mu.fastestTable = make(map[int64]hlc.Timestamp)
1278+
sm.mu.spanSkew = make(map[int64]int64)
1279+
sm.mu.tableSkew = make(map[int64]int64)
12771280
sm.mu.id = 1 // start the first id at 1 so we can detect intiialization
12781281

12791282
minTimestampGetter := func(m map[int64]hlc.Timestamp) func() int64 {
@@ -1304,34 +1307,21 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
13041307
}
13051308
}
13061309

1307-
maxTimestampSkewGetter := func(
1308-
base map[int64]hlc.Timestamp, ahead map[int64]hlc.Timestamp,
1309-
) func() int64 {
1310+
maxTimestampSkewGetter := func(m map[int64]int64) func() int64 {
13101311
return func() int64 {
13111312
sm.mu.Lock()
13121313
defer sm.mu.Unlock()
1313-
var maxSkew int64
1314-
for id, b := range base {
1315-
a := ahead[id]
1316-
if a.IsEmpty() || b.IsEmpty() {
1317-
continue
1318-
}
1319-
skew := a.WallTime - b.WallTime
1320-
if skew > maxSkew {
1321-
maxSkew = skew
1322-
}
1323-
}
1324-
return maxSkew
1314+
return iterutil.MaxFunc(maps.Values(m), cmp.Compare)
13251315
}
13261316
}
13271317

13281318
sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
13291319
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
13301320
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)
13311321
sm.SpanProgressSkew = a.SpanProgressSkew.AddFunctionalChild(
1332-
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestSpan), scope)
1322+
maxTimestampSkewGetter(sm.mu.spanSkew), scope)
13331323
sm.TableProgressSkew = a.TableProgressSkew.AddFunctionalChild(
1334-
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestTable), scope)
1324+
maxTimestampSkewGetter(sm.mu.tableSkew), scope)
13351325

13361326
a.mu.sliMetrics[scope] = sm
13371327
return sm, nil
@@ -1340,7 +1330,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
13401330
// getLaggingRangesCallback returns a function which can be called to update the
13411331
// lagging ranges metric. It should be called with the current number of lagging
13421332
// ranges.
1343-
func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
1333+
func (m *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
13441334
// Because this gauge is shared between changefeeds in the same metrics scope,
13451335
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
13461336
// ensure values written by others are not overwritten. The code below is used
@@ -1364,10 +1354,10 @@ func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64)
13641354
last.Lock()
13651355
defer last.Unlock()
13661356

1367-
s.LaggingRanges.Dec(last.lagging - lagging)
1357+
m.LaggingRanges.Dec(last.lagging - lagging)
13681358
last.lagging = lagging
13691359

1370-
s.TotalRanges.Dec(last.total - total)
1360+
m.TotalRanges.Dec(last.total - total)
13711361
last.total = total
13721362
}
13731363
}

pkg/util/iterutil/iterutil.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,15 @@ func MinFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
7878
}
7979
return m
8080
}
81+
82+
// MaxFunc returns the maximum element in seq, using cmp to compare elements.
83+
// If seq has no values, the zero value is returned.
84+
func MaxFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
85+
var m E
86+
for i, v := range Enumerate(seq) {
87+
if i == 0 || cmp(v, m) > 0 {
88+
m = v
89+
}
90+
}
91+
return m
92+
}

pkg/util/iterutil/iterutil_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,35 @@ func TestMinFunc(t *testing.T) {
9595
})
9696
}
9797
}
98+
99+
func TestMaxFunc(t *testing.T) {
100+
intCmp := func(a, b int) int {
101+
return a - b
102+
}
103+
104+
for name, tc := range map[string]struct {
105+
input []int
106+
}{
107+
"empty": {
108+
input: nil,
109+
},
110+
"one element": {
111+
input: []int{1},
112+
},
113+
"multiple elements": {
114+
input: []int{1, 3, 2},
115+
},
116+
"multiple elements with zero value": {
117+
input: []int{1, 0, 3, 2},
118+
},
119+
} {
120+
t.Run(name, func(t *testing.T) {
121+
m := iterutil.MaxFunc(slices.Values(tc.input), intCmp)
122+
if len(tc.input) == 0 {
123+
require.Equal(t, 0, m)
124+
} else {
125+
require.Equal(t, slices.MaxFunc(tc.input, intCmp), m)
126+
}
127+
})
128+
}
129+
}

0 commit comments

Comments
 (0)