Skip to content

Commit 2c25e89

Browse files
authored
disttask: refactor metrics, collect metrics in scheduler manager (pingcap#50634)
close pingcap#49615
1 parent acd4999 commit 2c25e89

15 files changed

+598
-540
lines changed

pkg/disttask/framework/mock/scheduler_mock.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/disttask/framework/proto/subtask.go

+10
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ const (
5151
SubtaskStatePaused SubtaskState = "paused"
5252
)
5353

54+
// AllSubtaskStates is all subtask state.
55+
var AllSubtaskStates = []SubtaskState{
56+
SubtaskStatePending,
57+
SubtaskStateRunning,
58+
SubtaskStateSucceed,
59+
SubtaskStateFailed,
60+
SubtaskStateCanceled,
61+
SubtaskStatePaused,
62+
}
63+
5464
type (
5565
// SubtaskState is the state of subtask.
5666
SubtaskState string

pkg/disttask/framework/scheduler/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "scheduler",
55
srcs = [
66
"balancer.go",
7+
"collector.go",
78
"interface.go",
89
"nodes.go",
910
"scheduler.go",
@@ -32,6 +33,7 @@ go_library(
3233
"@com_github_pingcap_errors//:errors",
3334
"@com_github_pingcap_failpoint//:failpoint",
3435
"@com_github_pingcap_log//:log",
36+
"@com_github_prometheus_client_golang//prometheus",
3537
"@org_uber_go_zap//:zap",
3638
],
3739
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package scheduler
16+
17+
import (
18+
"strconv"
19+
"sync/atomic"
20+
"time"
21+
22+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
23+
"github.com/prometheus/client_golang/prometheus"
24+
)
25+
26+
var subtaskCollector = newCollector()
27+
28+
func init() {
29+
prometheus.MustRegister(subtaskCollector)
30+
}
31+
32+
// Because the exec_id of a subtask may change, after all tasks
33+
// are successful, subtasks will be migrated from tidb_subtask_background
34+
// to tidb_subtask_background_history. In the above situation,
35+
// the built-in collector of Prometheus needs to delete the previously
36+
// added metrics, which is quite troublesome.
37+
// Therefore, a custom collector is used.
38+
type collector struct {
39+
subtaskInfo atomic.Pointer[[]*proto.Subtask]
40+
41+
subtasks *prometheus.Desc
42+
subtaskDuration *prometheus.Desc
43+
}
44+
45+
func newCollector() *collector {
46+
return &collector{
47+
subtasks: prometheus.NewDesc(
48+
"tidb_disttask_subtasks",
49+
"Number of subtasks.",
50+
[]string{"task_type", "task_id", "status", "exec_id"}, nil,
51+
),
52+
subtaskDuration: prometheus.NewDesc(
53+
"tidb_disttask_subtask_duration",
54+
"Duration of subtasks in different states.",
55+
[]string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil,
56+
),
57+
}
58+
}
59+
60+
// Describe implements the prometheus.Collector interface.
61+
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
62+
ch <- c.subtasks
63+
ch <- c.subtaskDuration
64+
}
65+
66+
// Collect implements the prometheus.Collector interface.
67+
func (c *collector) Collect(ch chan<- prometheus.Metric) {
68+
p := c.subtaskInfo.Load()
69+
if p == nil {
70+
return
71+
}
72+
subtasks := *p
73+
74+
// taskID => execID => state => cnt
75+
subtaskCnt := make(map[int64]map[string]map[proto.SubtaskState]int)
76+
taskType := make(map[int64]proto.TaskType)
77+
for _, subtask := range subtasks {
78+
if _, ok := subtaskCnt[subtask.TaskID]; !ok {
79+
subtaskCnt[subtask.TaskID] = make(map[string]map[proto.SubtaskState]int)
80+
}
81+
if _, ok := subtaskCnt[subtask.TaskID][subtask.ExecID]; !ok {
82+
subtaskCnt[subtask.TaskID][subtask.ExecID] = make(map[proto.SubtaskState]int)
83+
}
84+
85+
subtaskCnt[subtask.TaskID][subtask.ExecID][subtask.State]++
86+
taskType[subtask.TaskID] = subtask.Type
87+
88+
c.setDistSubtaskDuration(ch, subtask)
89+
}
90+
for taskID, execIDMap := range subtaskCnt {
91+
for execID, stateMap := range execIDMap {
92+
for state, cnt := range stateMap {
93+
ch <- prometheus.MustNewConstMetric(c.subtasks, prometheus.GaugeValue,
94+
float64(cnt),
95+
taskType[taskID].String(),
96+
strconv.Itoa(int(taskID)),
97+
state.String(),
98+
execID,
99+
)
100+
}
101+
}
102+
}
103+
}
104+
105+
func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.Subtask) {
106+
switch subtask.State {
107+
case proto.SubtaskStatePending:
108+
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
109+
time.Since(subtask.CreateTime).Seconds(),
110+
subtask.Type.String(),
111+
strconv.Itoa(int(subtask.TaskID)),
112+
subtask.State.String(),
113+
strconv.Itoa(int(subtask.ID)),
114+
subtask.ExecID,
115+
)
116+
case proto.SubtaskStateRunning:
117+
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
118+
time.Since(subtask.StartTime).Seconds(),
119+
subtask.Type.String(),
120+
strconv.Itoa(int(subtask.TaskID)),
121+
subtask.State.String(),
122+
strconv.Itoa(int(subtask.ID)),
123+
subtask.ExecID,
124+
)
125+
}
126+
}

pkg/disttask/framework/scheduler/interface.go

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type TaskManager interface {
3030
// The returned tasks are sorted by task order, see proto.Task, and only contains
3131
// some fields, see row2TaskBasic.
3232
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
33+
// GetAllSubtasks gets all subtasks with basic columns.
34+
GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error)
3335
GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
3436
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
3537
GCSubtasks(ctx context.Context) error

pkg/disttask/framework/scheduler/scheduler_manager.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ var (
3737
// defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table.
3838
defaultHistorySubtaskTableGcInterval = 24 * time.Hour
3939
// DefaultCleanUpInterval is the interval of cleanup routine.
40-
DefaultCleanUpInterval = 10 * time.Minute
40+
DefaultCleanUpInterval = 10 * time.Minute
41+
defaultCollectMetricsInterval = 5 * time.Second
4142
)
4243

4344
// WaitTaskFinished is used to sync the test.
@@ -162,6 +163,7 @@ func (sm *Manager) Start() {
162163
sm.wg.Run(sm.scheduleTaskLoop)
163164
sm.wg.Run(sm.gcSubtaskHistoryTableLoop)
164165
sm.wg.Run(sm.cleanupTaskLoop)
166+
sm.wg.Run(sm.collectLoop)
165167
sm.wg.Run(func() {
166168
sm.nodeMgr.maintainLiveNodesLoop(sm.ctx, sm.taskMgr)
167169
})
@@ -419,3 +421,28 @@ func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler {
419421
serverID: sm.serverID,
420422
})
421423
}
424+
425+
func (sm *Manager) collectLoop() {
426+
sm.logger.Info("collect loop start")
427+
ticker := time.NewTicker(defaultCollectMetricsInterval)
428+
defer ticker.Stop()
429+
for {
430+
select {
431+
case <-sm.ctx.Done():
432+
sm.logger.Info("collect loop exits")
433+
return
434+
case <-ticker.C:
435+
sm.collect()
436+
}
437+
}
438+
}
439+
440+
func (sm *Manager) collect() {
441+
subtasks, err := sm.taskMgr.GetAllSubtasks(sm.ctx)
442+
if err != nil {
443+
sm.logger.Warn("get all subtasks failed", zap.Error(err))
444+
return
445+
}
446+
447+
subtaskCollector.subtaskInfo.Store(&subtasks)
448+
}

pkg/disttask/framework/storage/converter.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask {
7979
if !r.IsNull(8) {
8080
ordinal = int(r.GetInt64(8))
8181
}
82+
83+
// subtask defines start time as bigint, to ensure backward compatible,
84+
// we keep it that way, and we convert it here.
85+
var startTime time.Time
86+
if !r.IsNull(9) {
87+
ts := r.GetInt64(9)
88+
startTime = time.Unix(ts, 0)
89+
}
90+
8291
subtask := &proto.Subtask{
8392
ID: r.GetInt64(0),
8493
Step: proto.Step(r.GetInt64(1)),
@@ -89,25 +98,23 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask {
8998
Concurrency: int(r.GetInt64(6)),
9099
CreateTime: createTime,
91100
Ordinal: ordinal,
101+
StartTime: startTime,
92102
}
93103
return subtask
94104
}
95105

96106
// Row2SubTask converts a row to a subtask.
97107
func Row2SubTask(r chunk.Row) *proto.Subtask {
98108
subtask := row2BasicSubTask(r)
99-
// subtask defines start/update time as bigint, to ensure backward compatible,
109+
110+
// subtask defines update time as bigint, to ensure backward compatible,
100111
// we keep it that way, and we convert it here.
101-
var startTime, updateTime time.Time
102-
if !r.IsNull(9) {
103-
ts := r.GetInt64(9)
104-
startTime = time.Unix(ts, 0)
105-
}
112+
var updateTime time.Time
106113
if !r.IsNull(10) {
107114
ts := r.GetInt64(10)
108115
updateTime = time.Unix(ts, 0)
109116
}
110-
subtask.StartTime = startTime
117+
111118
subtask.UpdateTime = updateTime
112119
subtask.Meta = r.GetBytes(11)
113120
subtask.Summary = r.GetJSON(12).String()

pkg/disttask/framework/storage/subtask_state.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtask
123123
for _, subtask := range subtasks {
124124
_, err := sqlexec.ExecSQL(ctx, se, `
125125
update mysql.tidb_background_subtask
126-
set state = %?, state_update_time = CURRENT_TIMESTAMP()
126+
set state = %?, state_update_time = unix_timestamp()
127127
where id = %? and exec_id = %? and state = %?`,
128128
proto.SubtaskStatePending, subtask.ID, subtask.ExecID, proto.SubtaskStateRunning)
129129
if err != nil {

pkg/disttask/framework/storage/task_table.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ const (
4242
TaskColumns = basicTaskColumns + `, t.start_time, t.state_update_time, t.meta, t.dispatcher_id, t.error`
4343
// InsertTaskColumns is the columns used in insert task.
4444
InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time`
45-
basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal`
45+
basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal, start_time`
4646
// SubtaskColumns is the columns for subtask.
47-
SubtaskColumns = basicSubtaskColumns + `, start_time, state_update_time, meta, summary`
47+
SubtaskColumns = basicSubtaskColumns + `, state_update_time, meta, summary`
4848
// InsertSubtaskColumns is the columns used in insert subtask.
4949
InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary`
5050
)
@@ -730,3 +730,19 @@ func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64
730730
}
731731
return subtasks, nil
732732
}
733+
734+
// GetAllSubtasks gets all subtasks with basic columns.
735+
func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error) {
736+
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`)
737+
if err != nil {
738+
return nil, err
739+
}
740+
if len(rs) == 0 {
741+
return nil, nil
742+
}
743+
subtasks := make([]*proto.Subtask, 0, len(rs))
744+
for _, r := range rs {
745+
subtasks = append(subtasks, row2BasicSubTask(r))
746+
}
747+
return subtasks, nil
748+
}

0 commit comments

Comments
 (0)