Skip to content

Commit be0a3a2

Browse files
author
ffffwh
committed
api: job/detail: add JobStage
1 parent 7ef1543 commit be0a3a2

File tree

5 files changed

+96
-160
lines changed

5 files changed

+96
-160
lines changed

api/handler/v2/job.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -710,12 +710,15 @@ func GetJobDetail(c echo.Context, logger g.LoggerType, jobType DtleJobType) erro
710710
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(err))
711711
}
712712

713-
progress, delay := getTaskProgress(logger, c.Request().Header, resp.TaskLogs)
713+
progress, delay, stage := getTaskProgress(logger, c.Request().Header, resp.TaskLogs, reqParam.JobId)
714+
714715
resp.BasicTaskProfile.JobBaseInfo.DumpProgress = progress
715716
resp.BasicTaskProfile.JobBaseInfo.Delay = delay
716717
resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Password = "*"
717718
resp.BasicTaskProfile.ConnectionInfo.DstDataBase.Password = "*"
718719

720+
resp.BasicTaskProfile.JobBaseInfo.JobStage = stage
721+
719722
return c.JSON(http.StatusOK, resp)
720723
}
721724

@@ -743,31 +746,30 @@ func getJobDetail(logger g.LoggerType, jobId string, jobType DtleJobType) (*mode
743746
}, nil
744747
}
745748

746-
func getTaskProgress(logger g.LoggerType, header http.Header, taskLogs []models.TaskLog) (*models.DumpProgress, int64) {
747-
url := fmt.Sprintf("http://%v/v2/monitor/task", handler.ApiAddr)
748-
for i := range taskLogs {
749-
taskLog := taskLogs[i]
750-
allocationId := taskLogs[i].AllocationId
751-
if taskLog.Status == nomadApi.AllocDesiredStatusRun && taskLog.Target == common.TaskTypeDest {
752-
res := models.GetTaskProgressRespV2{
753-
BaseResp: models.BuildBaseResp(nil),
754-
}
755-
args := map[string]string{
756-
"allocation_id": allocationId,
757-
"task_name": common.TaskTypeDest,
758-
}
759-
if err := handler.InvokeApiWithKvData(http.MethodGet, url, args, &res, header); nil != err {
760-
logger.Warn("forward api failed", "url", url, "err", err)
761-
return nil, 0
762-
}
763-
return &models.DumpProgress{
764-
ExecRowCount: res.TaskStatus.ExecMasterRowCount,
765-
TotalRowCount: res.TaskStatus.ReadMasterRowCount,
766-
}, res.TaskStatus.DelayCount.Time
767-
}
749+
// return: progress, delay, stage
750+
func getTaskProgress(logger g.LoggerType, header http.Header, taskLogs []models.TaskLog, jobId string) (*models.DumpProgress, int64, string) {
751+
storeManager, err := common.NewStoreManager([]string{handler.ConsulAddr}, logger)
752+
if err != nil {
753+
logger.Warn("getTaskProgress: failed to connect to consul", "ConsulAddr", handler.ConsulAddr, "err", err)
754+
return nil, 0, ""
755+
}
756+
757+
execRowCount, totalRowCount, err := storeManager.GetFullProgress(jobId)
758+
progress := &models.DumpProgress{
759+
ExecRowCount: execRowCount,
760+
TotalRowCount: totalRowCount,
761+
}
762+
if err != nil {
763+
logger.Warn("getTaskProgress: failed to GetJobStatus", "ConsulAddr", handler.ConsulAddr, "err", err)
764+
return nil, 0, ""
765+
}
766+
767+
stage, err := storeManager.GetJobStage(jobId)
768+
if err != nil {
769+
logger.Warn("getTaskProgress: failed to GetJobStatus", "ConsulAddr", handler.ConsulAddr, "err", err)
770+
return nil, 0, ""
768771
}
769-
logger.Warn("Unable to get delayed data")
770-
return nil, 0
772+
return progress, 0, stage
771773
}
772774

773775
func buildBasicTaskProfile(logger g.LoggerType, jobId string, srcTaskDetail *models.SrcTaskDetail,
@@ -1377,9 +1379,10 @@ func GetSubscriptionJobDetailV2(c echo.Context) error {
13771379
basicTaskProfile.Configuration.FailOver = failover
13781380
basicTaskProfile.ConnectionInfo.SrcDataBase.Password = "*"
13791381

1380-
progress, delay := getTaskProgress(logger, c.Request().Header, taskLog)
1382+
progress, delay, stage := getTaskProgress(logger, c.Request().Header, taskLog, reqParam.JobId)
13811383
basicTaskProfile.JobBaseInfo.DumpProgress = progress
13821384
basicTaskProfile.JobBaseInfo.Delay = delay
1385+
basicTaskProfile.JobBaseInfo.JobStage = stage
13831386
if len(nomadJob.TaskGroups) != 0 {
13841387
basicTaskProfile.Configuration.RetryTimes = *nomadJob.TaskGroups[0].RestartPolicy.Attempts
13851388
}

api/handler/v2/monitor.go

Lines changed: 34 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99

1010
"github.com/actiontech/dtle/api/handler"
1111
"github.com/actiontech/dtle/api/models"
12-
mysql "github.com/actiontech/dtle/driver"
13-
nomadApi "github.com/hashicorp/nomad/api"
1412
"github.com/labstack/echo/v4"
1513
)
1614

@@ -31,143 +29,45 @@ func GetTaskProgressV2(c echo.Context) error {
3129
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(err))
3230
}
3331

34-
targetNomadAddr := reqParam.NomadHttpAddress
35-
if targetNomadAddr == "" {
36-
// find out the node that the task is running
37-
logger.Info("find out the node that the task is running")
38-
url := handler.BuildUrl("/v1/allocations")
39-
logger.Info("invoke nomad api begin", "url", url)
40-
nomadAllocs := []nomadApi.Allocation{}
41-
if err := handler.InvokeApiWithKvData(http.MethodGet, url, nil, &nomadAllocs); nil != err {
42-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("invoke nomad api %v failed: %v", url, err)))
43-
}
44-
logger.Info("invoke nomad api finished")
45-
nodeId := ""
46-
for _, alloc := range nomadAllocs {
47-
if alloc.ID == reqParam.AllocationId {
48-
nodeId = alloc.NodeID
49-
break
50-
}
51-
}
52-
if nodeId == "" {
53-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("cannot find out which node the allocation is running on")))
54-
}
55-
url = handler.BuildUrl(fmt.Sprintf("/v1/node/%v", nodeId))
56-
logger.Info("invoke nomad api begin", "url", url)
57-
nomadNode := nomadApi.Node{}
58-
if err := handler.InvokeApiWithKvData(http.MethodGet, url, nil, &nomadNode); nil != err {
59-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("invoke nomad api %v failed: %v", url, err)))
60-
}
61-
logger.Info("invoke nomad api finished")
62-
targetNomadAddr = nomadNode.HTTPAddr
63-
}
32+
logger.Warn("/v2/monitor/task is unimplemented, returning dummy data", "AllocationId", reqParam.AllocationId)
6433

65-
targetHost, _, err := net.SplitHostPort(targetNomadAddr)
66-
if err != nil {
67-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("get target host failed: %v", err)))
68-
}
69-
logger.Info("got target host", "targetHost", targetHost)
70-
selfNomadHost, _, err := net.SplitHostPort(handler.ApiAddr)
71-
if err != nil {
72-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("get self nomad host failed: %v", err)))
73-
}
34+
//storeManager, err := common.NewStoreManager([]string{handler.ConsulAddr}, logger)
35+
//if err != nil {
36+
// return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v; connect to consul: failed: %v", handler.ConsulAddr, err)))
37+
//}
38+
39+
//storeManager.GetGtidForJob(reqParam.)
7440

7541
res := models.GetTaskProgressRespV2{
7642
BaseResp: models.BuildBaseResp(nil),
7743
}
78-
if targetHost != selfNomadHost {
79-
logger.Info("forwarding...", "targetHost", targetHost)
80-
// forward
81-
// invoke http://%v/v1/agent/self to get api_addr
82-
url := fmt.Sprintf("http://%v/v1/agent/self", targetNomadAddr)
83-
nomadAgentSelf := nomadApi.AgentSelf{}
84-
if err := handler.InvokeApiWithKvData(http.MethodGet, url, nil, &nomadAgentSelf); err != nil {
85-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("invoke nomad api %v failed: %v", url, err)))
86-
}
87-
88-
_, targetPort, err := getApiAddrFromAgentConfig(nomadAgentSelf.Config)
89-
if err != nil {
90-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("get target host failed: %v", err)))
91-
}
92-
forwardAddr := fmt.Sprintf("%s:%s", targetHost, targetPort)
93-
logger.Info("forwarding...", "forwardAddr", forwardAddr)
94-
95-
url = fmt.Sprintf("http://%v/v2/monitor/task", forwardAddr)
96-
args := map[string]string{
97-
"allocation_id": reqParam.AllocationId,
98-
"task_name": reqParam.TaskName,
99-
"nomad_address": targetNomadAddr,
100-
}
101-
logger.Info("forwarding... invoke target dtle api begin", "url", url)
102-
if err := handler.InvokeApiWithKvData(http.MethodGet, url, args, &res, c.Request().Header); nil != err {
103-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("forward api %v failed: %v", url, err)))
104-
}
105-
logger.Info("forwarding... invoke target dtle api finished")
106-
} else {
107-
taskStatus, ok, err := mysql.AllocIdTaskNameToTaskHandler.GetTaskStatistics(reqParam.AllocationId, reqParam.TaskName)
108-
if nil != err {
109-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("get task stats failed: %v. allocation_id=%v task_name=%v", err, reqParam.AllocationId, reqParam.TaskName)))
110-
} else if !ok {
111-
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("cannot find the task. allocation_id=%v task_name=%v", reqParam.AllocationId, reqParam.TaskName)))
112-
}
113-
114-
// build response struct
115-
var currentCoordinates *models.CurrentCoordinates
116-
var delayCount *models.DelayCount
117-
var throughputStat *models.ThroughputStat
118-
if taskStatus.CurrentCoordinates != nil {
119-
currentCoordinates = &models.CurrentCoordinates{
120-
File: taskStatus.CurrentCoordinates.File,
121-
Position: taskStatus.CurrentCoordinates.Position,
122-
GtidSet: taskStatus.CurrentCoordinates.GtidSet,
123-
RelayMasterLogFile: taskStatus.CurrentCoordinates.RelayMasterLogFile,
124-
ReadMasterLogPos: taskStatus.CurrentCoordinates.ReadMasterLogPos,
125-
RetrievedGtidSet: taskStatus.CurrentCoordinates.RetrievedGtidSet,
126-
}
127-
}
128-
129-
if taskStatus.DelayCount != nil {
130-
delayCount = &models.DelayCount{
131-
Num: taskStatus.DelayCount.Num,
132-
Time: taskStatus.DelayCount.Time,
133-
}
134-
}
135-
136-
if taskStatus.ThroughputStat != nil {
137-
throughputStat = &models.ThroughputStat{
138-
Num: taskStatus.ThroughputStat.Num,
139-
Time: taskStatus.ThroughputStat.Time,
140-
}
141-
}
142-
143-
res.TaskStatus = &models.TaskProgress{
144-
CurrentCoordinates: currentCoordinates,
145-
DelayCount: delayCount,
146-
ProgressPct: taskStatus.ProgressPct,
147-
ExecMasterRowCount: taskStatus.ExecMasterRowCount,
148-
ExecMasterTxCount: taskStatus.ExecMasterTxCount,
149-
ReadMasterRowCount: taskStatus.ReadMasterRowCount,
150-
ReadMasterTxCount: taskStatus.ReadMasterTxCount,
151-
ETA: taskStatus.ETA,
152-
Backlog: taskStatus.Backlog,
153-
ThroughputStat: throughputStat,
154-
NatsMsgStat: &models.NatsMessageStatistics{
155-
InMsgs: taskStatus.MsgStat.InMsgs,
156-
OutMsgs: taskStatus.MsgStat.OutMsgs,
157-
InBytes: taskStatus.MsgStat.InBytes,
158-
OutBytes: taskStatus.MsgStat.OutBytes,
159-
Reconnects: taskStatus.MsgStat.Reconnects,
160-
},
161-
BufferStat: &models.BufferStat{
162-
BinlogEventQueueSize: taskStatus.BufferStat.BinlogEventQueueSize,
163-
ExtractorTxQueueSize: taskStatus.BufferStat.ExtractorTxQueueSize,
164-
ApplierTxQueueSize: taskStatus.BufferStat.ApplierTxQueueSize,
165-
SendByTimeout: taskStatus.BufferStat.SendByTimeout,
166-
SendBySizeFull: taskStatus.BufferStat.SendBySizeFull,
167-
},
168-
Stage: taskStatus.Stage,
169-
Timestamp: taskStatus.Timestamp,
170-
}
44+
res.TaskStatus = &models.TaskProgress{
45+
CurrentCoordinates: &models.CurrentCoordinates{},
46+
DelayCount: &models.DelayCount{},
47+
//ProgressPct: taskStatus.ProgressPct,
48+
ExecMasterRowCount: 0,
49+
ExecMasterTxCount: 0,
50+
ReadMasterRowCount: 0,
51+
ReadMasterTxCount: 0,
52+
//ETA: taskStatus.ETA,
53+
//Backlog: taskStatus.Backlog,
54+
//ThroughputStat: throughputStat,
55+
//NatsMsgStat: &models.NatsMessageStatistics{
56+
// InMsgs: taskStatus.MsgStat.InMsgs,
57+
// OutMsgs: taskStatus.MsgStat.OutMsgs,
58+
// InBytes: taskStatus.MsgStat.InBytes,
59+
// OutBytes: taskStatus.MsgStat.OutBytes,
60+
// Reconnects: taskStatus.MsgStat.Reconnects,
61+
//},
62+
//BufferStat: &models.BufferStat{
63+
// BinlogEventQueueSize: taskStatus.BufferStat.BinlogEventQueueSize,
64+
// ExtractorTxQueueSize: taskStatus.BufferStat.ExtractorTxQueueSize,
65+
// ApplierTxQueueSize: taskStatus.BufferStat.ApplierTxQueueSize,
66+
// SendByTimeout: taskStatus.BufferStat.SendByTimeout,
67+
// SendBySizeFull: taskStatus.BufferStat.SendBySizeFull,
68+
//},
69+
Stage: "TODO",
70+
//Timestamp: taskStatus.Timestamp,
17171
}
17272

17373
return c.JSON(http.StatusOK, &res)

api/models/job_v2.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type JobBaseInfo struct {
3434
JobSteps []common.JobStep `json:"job_steps"`
3535
Delay int64 `json:"delay"`
3636
DumpProgress *DumpProgress `json:"dump_progress"`
37+
JobStage string `json:"job_stage"`
3738
}
3839

3940
type DumpProgress struct {

driver/common/store.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,29 @@ func (sm *StoreManager) WatchTree(dir string, stopCh <-chan struct{}) (<-chan []
621621
return sm.consulStore.WatchTree(dir, stopCh)
622622
}
623623

624+
// return: ExecRowCount, TotalRowCount
625+
func (sm *StoreManager) GetFullProgress(jobName string) (int64, int64, error) {
626+
// TODO
627+
return 42, 42, nil
628+
}
629+
630+
func (sm *StoreManager) PutJobStage(jobName string, stage string) error {
631+
key := fmt.Sprintf("dtle/%v/JobStage", jobName)
632+
return sm.consulStore.Put(key, []byte(stage), nil)
633+
}
634+
635+
func (sm *StoreManager) GetJobStage(jobName string) (string, error) {
636+
key := fmt.Sprintf("dtle/%v/JobStage", jobName)
637+
kv, err := sm.consulStore.Get(key)
638+
if err == store.ErrKeyNotFound {
639+
return "", nil
640+
} else if err != nil {
641+
return "", err
642+
}
643+
644+
return string(kv.Value), nil
645+
}
646+
624647
// consul store item
625648

626649
func NewDefaultRole(tenant string) *Role {

driver/mysql/applier.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ func (a *Applier) updateGtidLoop() {
189189
if a.mysqlContext.Gtid != "" {
190190
if a.stage != JobIncrCopy {
191191
a.stage = JobIncrCopy
192+
err := a.storeManager.PutJobStage(a.subject, JobIncrCopy)
193+
if err != nil {
194+
a.onError(common.TaskStateDead, errors.Wrap(err, "PutJobStage"))
195+
return
196+
}
192197
a.sendEvent(JobIncrCopy)
193198
}
194199
}
@@ -374,6 +379,10 @@ func (a *Applier) Run() {
374379
}
375380
if a.stage != JobFullCopy {
376381
a.stage = JobFullCopy
382+
err = a.storeManager.PutJobStage(a.subject, a.stage)
383+
if err != nil {
384+
a.onError(common.TaskStateDead, err)
385+
}
377386
a.sendEvent(JobFullCopy)
378387
}
379388

0 commit comments

Comments
 (0)