Skip to content

Commit 6e7925b

Browse files
committed
support specify job starrt debug job
1 parent 4af9fb5 commit 6e7925b

File tree

5 files changed

+67
-35
lines changed

5 files changed

+67
-35
lines changed

api/handler/v2/log.go

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
// @accept application/x-www-form-urlencoded
1717
// @Security ApiKeyAuth
1818
// @Param dtle_log_level formData string true "dtle log level" Enums(TRACE, DEBUG, INFO, WARN, ERROR)
19+
// @Param dtle_debug_job formData string false "dtle debug job"
1920
// @Success 200 {object} models.UpdataLogLevelRespV2
2021
// @router /v2/log/level [post]
2122
func UpdateLogLevelV2(c echo.Context) error {
@@ -32,8 +33,11 @@ func UpdateLogLevelV2(c echo.Context) error {
3233
reqParam.DtleLogLevel, err))
3334
}
3435

36+
debugJob := handler.DtleDriver.ResetDebugJob(reqParam.DebugJob)
37+
3538
return c.JSON(http.StatusOK, &models.UpdataLogLevelRespV2{
3639
DtleLogLevel: strings.ToUpper(reqParam.DtleLogLevel),
40+
DebugJob: debugJob,
3741
BaseResp: models.BuildBaseResp(nil),
3842
})
3943
}

api/models/log_v2.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package models
22

33
type UpdataLogLevelReqV2 struct {
4-
DtleLogLevel string `form:"dtle_log_level" validate:"required"`
4+
DtleLogLevel string `form:"dtle_log_level" validate:"required"`
5+
DebugJob *string `form:"debug_job"`
56
}
67

78
type UpdataLogLevelRespV2 struct {
89
DtleLogLevel string `json:"dtle_log_level"`
10+
DebugJob string `json:"debug_job"`
911
BaseResp
1012
}
1113

driver/driver.go

+38-23
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ var (
7474
hclspec.NewLiteral(`"Info"`)),
7575
"ui_dir": hclspec.NewDefault(hclspec.NewAttr("ui_dir", "string", false),
7676
hclspec.NewLiteral(`""`)),
77+
"debug_job": hclspec.NewDefault(hclspec.NewAttr("debug_job", "string", false),
78+
hclspec.NewLiteral(`""`)),
7779
"rsa_private_key_path": hclspec.NewDefault(hclspec.NewAttr("rsa_private_key_path", "string", false),
7880
hclspec.NewLiteral(`""`)),
7981
"cert_file_path": hclspec.NewDefault(hclspec.NewAttr("cert_file_path", "string", false),
@@ -151,9 +153,9 @@ var (
151153
"SkipIncrementalCopy": hclspec.NewAttr("SkipIncrementalCopy", "bool", false),
152154
"SlaveNetWriteTimeout": hclspec.NewDefault(hclspec.NewAttr("SlaveNetWriteTimeout", "number", false),
153155
hclspec.NewLiteral(`28800`)), // 8 hours
154-
"SrcConnectionConfig": hclspec.NewBlock("SrcConnectionConfig", false, connectionConfigSpec),
156+
"SrcConnectionConfig": hclspec.NewBlock("SrcConnectionConfig", false, connectionConfigSpec),
155157
"DestConnectionConfig": hclspec.NewBlock("DestConnectionConfig", false, connectionConfigSpec),
156-
"WaitOnJob": hclspec.NewAttr("WaitOnJob", "string", false),
158+
"WaitOnJob": hclspec.NewAttr("WaitOnJob", "string", false),
157159
"TwoWaySync": hclspec.NewDefault(hclspec.NewAttr("TwoWaySync", "bool", false),
158160
hclspec.NewLiteral(`false`)),
159161
"BulkInsert1": hclspec.NewDefault(hclspec.NewAttr("BulkInsert1", "number", false),
@@ -192,7 +194,7 @@ var (
192194
hclspec.NewLiteral(`67108864`)),
193195
"SetGtidNext": hclspec.NewDefault(hclspec.NewAttr("SetGtidNext", "bool", false),
194196
hclspec.NewLiteral(`false`)),
195-
"DestType": hclspec.NewAttr("DestType", "string", false),
197+
"DestType": hclspec.NewAttr("DestType", "string", false),
196198
"SrcOracleConfig": hclspec.NewBlock("SrcOracleConfig", false, oracleConfigSpec),
197199
})
198200

@@ -320,6 +322,7 @@ type DriverConfig struct {
320322
StatsCollectionInterval int `codec:"stats_collection_interval"`
321323
PublishMetrics bool `codec:"publish_metrics"`
322324
LogLevel string `codec:"log_level"`
325+
DebugJob string `codec:"debug_job"`
323326
LogFile string `codec:"log_file"`
324327
UiDir string `codec:"ui_dir"`
325328
RsaPrivateKeyPath string `codec:"rsa_private_key_path"`
@@ -338,29 +341,13 @@ func (d *Driver) setupLogger() (err error) {
338341
if d.config.LogFile == "" {
339342
d.logger.Info("use nomad logger", "level", d.config.LogLevel)
340343
} else {
341-
err = os.MkdirAll(filepath.Dir(d.config.LogFile), 0755)
344+
err := os.MkdirAll(filepath.Dir(d.config.LogFile), 0755)
342345
if err != nil {
343346
return err
344347
}
345-
346-
logFileName := d.config.LogFile
347-
if strings.HasSuffix(logFileName, "/") {
348-
logFileName += "dtle.log"
349-
}
350-
351-
rotateFile := &lumberjack.Logger{
352-
Filename: logFileName,
353-
MaxSize: 512, // MB
354-
Compress: true,
355-
}
348+
d.logger = setupLogger(d.config.LogFile, "dtle.log")
356349

357350
d.logger.Info("switching to dtle logger", "file", d.config.LogFile, "level", d.config.LogLevel)
358-
359-
d.logger = hclog.New(&hclog.LoggerOptions{
360-
Name: "",
361-
Level: hclog.Info,
362-
Output: rotateFile,
363-
})
364351
g.Logger = d.logger
365352

366353
err = d.SetLogLevel(d.config.LogLevel)
@@ -372,6 +359,26 @@ func (d *Driver) setupLogger() (err error) {
372359
return nil
373360
}
374361

362+
func setupLogger(logFilePath, fileName string) hclog.Logger {
363+
364+
logFileName := logFilePath
365+
if strings.HasSuffix(logFileName, "/") {
366+
logFileName += fileName
367+
}
368+
369+
rotateFile := &lumberjack.Logger{
370+
Filename: logFileName,
371+
MaxSize: 512, // MB
372+
Compress: true,
373+
}
374+
375+
return hclog.New(&hclog.LoggerOptions{
376+
Name: "",
377+
Level: hclog.Info,
378+
Output: rotateFile,
379+
})
380+
}
381+
375382
func (d *Driver) SetConfig(c *base.Config) (err error) {
376383
if c != nil && c.AgentConfig != nil {
377384
d.nomadConfig = c.AgentConfig.Driver
@@ -497,7 +504,7 @@ func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint,
497504
return ch, nil
498505
}
499506

500-
//It allows the driver to indicate its health to the client.
507+
// It allows the driver to indicate its health to the client.
501508
// The channel returned should immediately send an initial Fingerprint,
502509
// then send periodic updates at an interval that is appropriate for the driver until the context is canceled.
503510
func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
@@ -515,7 +522,7 @@ func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerp
515522
}
516523
}
517524

518-
//get the driver status
525+
// get the driver status
519526
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
520527

521528
var health drivers.HealthState
@@ -873,6 +880,14 @@ func (d *Driver) SetLogLevel(level string) error {
873880
return nil
874881
}
875882

883+
func (d *Driver) ResetDebugJob(job *string) string {
884+
if job != nil {
885+
d.config.DebugJob = *job
886+
}
887+
888+
return d.config.DebugJob
889+
}
890+
876891
func (d *Driver) SetSetupApiServerFn(fn func(logger g.LoggerType, driverConfig *DriverConfig) (err error)) {
877892
d.setupApiServerFn = fn
878893
}

driver/handle.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/armon/go-metrics"
1818
"github.com/pkg/errors"
1919

20+
"github.com/hashicorp/go-hclog"
2021
"github.com/hashicorp/nomad/plugins/drivers"
2122
)
2223

@@ -36,8 +37,8 @@ type taskHandle struct {
3637

3738
ctx context.Context
3839
cancelFunc context.CancelFunc
39-
waitCh chan *drivers.ExitResult
40-
stats *common.TaskStatistics
40+
waitCh chan *drivers.ExitResult
41+
stats *common.TaskStatistics
4142

4243
driverConfig *common.MySQLDriverConfig
4344
shutdown bool
@@ -153,7 +154,9 @@ func (h *taskHandle) run(d *Driver) {
153154
for {
154155
select {
155156
case <-h.ctx.Done():
156-
if !t.Stop() { <-t.C }
157+
if !t.Stop() {
158+
<-t.C
159+
}
157160
return
158161
case <-t.C:
159162
if h.runner != nil {
@@ -180,16 +183,23 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
180183
StateDir: d.config.DataDir,
181184
}
182185

186+
logger := h.logger
187+
// create a new log file for debug job
188+
if d.config.DebugJob == h.taskConfig.JobName && hclog.LevelFromString(d.config.LogLevel) != hclog.Debug {
189+
logger = setupLogger(d.config.LogFile, "dtle_debug_job.log")
190+
logger.SetLevel(hclog.Debug)
191+
}
192+
183193
switch common.TaskTypeFromString(h.taskConfig.Name) {
184194
case common.TaskTypeSrc:
185195
if h.driverConfig.SrcOracleConfig != nil {
186196
h.logger.Debug("found oracle src", "SrcOracleConfig", h.driverConfig.SrcOracleConfig)
187-
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
197+
runner, err = extractor.NewExtractorOracle(ctx, h.driverConfig, logger, d.storeManager, h.waitCh, h.ctx)
188198
if err != nil {
189199
return nil, errors.Wrap(err, "NewExtractor")
190200
}
191201
} else {
192-
e, err := mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
202+
e, err := mysql.NewExtractor(ctx, h.driverConfig, logger, d.storeManager, h.waitCh, h.ctx)
193203
if err != nil {
194204
return nil, errors.Wrap(err, "NewOracleExtractor")
195205
}
@@ -200,11 +210,11 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
200210
StateDir: d.config.DataDir,
201211
}
202212
cfg2 := &common.MySQLDriverConfig{
203-
DtleTaskConfig: common.DtleTaskConfig{
213+
DtleTaskConfig: common.DtleTaskConfig{
204214
DestType: "mysql",
205215
},
206216
}
207-
e.RevApplier, err = mysql.NewApplier(ctx2, cfg2, h.logger, d.storeManager, d.config.NatsAdvertise,
217+
e.RevApplier, err = mysql.NewApplier(ctx2, cfg2, logger, d.storeManager, d.config.NatsAdvertise,
208218
h.waitCh, d.eventer, h.taskConfig, h.ctx)
209219
}
210220

@@ -213,13 +223,13 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
213223
h.logger.Debug("found dest", "allConfig", h.driverConfig)
214224
switch strings.ToLower(h.driverConfig.DestType) {
215225
case "kafka":
216-
runner, err = kafka.NewKafkaRunner(ctx, h.logger,
226+
runner, err = kafka.NewKafkaRunner(ctx, logger,
217227
d.storeManager, d.config.NatsAdvertise, h.waitCh, h.ctx)
218228
if err != nil {
219229
return nil, errors.Wrap(err, "NewKafkaRunner")
220230
}
221231
case "mysql":
222-
runner, err = mysql.NewApplier(ctx, h.driverConfig, h.logger, d.storeManager,
232+
runner, err = mysql.NewApplier(ctx, h.driverConfig, logger, d.storeManager,
223233
d.config.NatsAdvertise, h.waitCh, d.eventer, h.taskConfig, h.ctx)
224234
if err != nil {
225235
return nil, errors.Wrap(err, "NewApplier")

driver/mysql/applier.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
"bytes"
1111
gosql "database/sql"
1212
"fmt"
13-
mysqldriver "github.com/go-sql-driver/mysql"
1413
"math"
1514
"strconv"
1615
"strings"
1716
"sync"
1817
"sync/atomic"
1918
"time"
2019

20+
mysqldriver "github.com/go-sql-driver/mysql"
21+
2122
"github.com/actiontech/dtle/driver/common"
2223
"github.com/actiontech/dtle/driver/mysql/base"
2324
"github.com/actiontech/dtle/driver/mysql/sql"
@@ -1231,7 +1232,7 @@ func errIsMysqlDeadlock(err error) bool {
12311232
if err != nil {
12321233
merr, isME := err.(*mysqldriver.MySQLError)
12331234
if isME {
1234-
return merr.Number ==sql.ErrLockDeadlock
1235+
return merr.Number == sql.ErrLockDeadlock
12351236
} else {
12361237
return false
12371238
}

0 commit comments

Comments
 (0)