Skip to content

Commit 6e67993

Browse files
author
ffffwh
committed
add job config RetryTxLimit
1 parent b8d4ebb commit 6e67993

File tree

8 files changed

+19
-5
lines changed

8 files changed

+19
-5
lines changed

api/docs/docs.go

+3
Original file line numberDiff line numberDiff line change
@@ -3017,6 +3017,9 @@ var doc = `{
30173017
"bulk_insert2": {
30183018
"type": "integer"
30193019
},
3020+
"retry_tx_limit": {
3021+
"type": "integer"
3022+
},
30203023
"dependency_history_size": {
30213024
"type": "integer"
30223025
},

api/docs/swagger.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -3001,6 +3001,9 @@
30013001
"bulk_insert2": {
30023002
"type": "integer"
30033003
},
3004+
"retry_tx_limit": {
3005+
"type": "integer"
3006+
},
30043007
"dependency_history_size": {
30053008
"type": "integer"
30063009
},
@@ -3756,4 +3759,4 @@
37563759
"in": "header"
37573760
}
37583761
}
3759-
}
3762+
}

api/docs/swagger.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,8 @@ definitions:
591591
type: integer
592592
bulk_insert2:
593593
type: integer
594+
retry_tx_limit:
595+
type: integer
594596
dependency_history_size:
595597
type: integer
596598
parallel_workers:

api/handler/v2/job.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func createOrUpdateJob(logger g.LoggerType, jobParam *models.CreateOrUpdateMysql
331331
if jobParam.SrcTask.GroupTimeout == 0 {
332332
jobParam.SrcTask.GroupTimeout = common.DefaultSrcGroupTimeout
333333
}
334-
logger.Info("MysqlDestTaskConfig", jobParam.DestTask.MysqlDestTaskConfig)
334+
logger.Info("MysqlDestTaskConfig", "config", jobParam.DestTask.MysqlDestTaskConfig)
335335
if jobParam.DestTask.MysqlDestTaskConfig != nil {
336336
if jobParam.DestTask.MysqlDestTaskConfig.ParallelWorkers == 0 {
337337
jobParam.DestTask.MysqlDestTaskConfig.ParallelWorkers = common.DefaultNumWorkers
@@ -613,6 +613,7 @@ func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig, destConfig *mod
613613
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.DependencyHistorySize, "DependencyHistorySize")
614614
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.BulkInsert1, "BulkInsert1")
615615
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.BulkInsert2, "BulkInsert2")
616+
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.RetryTxLimit, "RetryTxLimit")
616617
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.SetGtidNext, "SetGtidNext")
617618
taskConfigInNomadFormat["DestConnectionConfig"] = buildMysqlConnectionConfigMap(destConfig.ConnectionConfig)
618619
} else if kafkaConfig != nil {
@@ -883,6 +884,7 @@ func buildBasicTaskProfile(logger g.LoggerType, jobId string, srcTaskDetail *mod
883884
DependencyHistorySize: destMySqlTaskDetail.TaskConfig.MysqlDestTaskConfig.DependencyHistorySize,
884885
BulkInsert1: destMySqlTaskDetail.TaskConfig.MysqlDestTaskConfig.BulkInsert1,
885886
BulkInsert2: destMySqlTaskDetail.TaskConfig.MysqlDestTaskConfig.BulkInsert2,
887+
RetryTxLimit: destMySqlTaskDetail.TaskConfig.MysqlDestTaskConfig.RetryTxLimit,
886888
SetGtidNext: destMySqlTaskDetail.TaskConfig.MysqlDestTaskConfig.SetGtidNext,
887889
}
888890
basicTaskProfile.Configuration.DstConfig = models.DstConfig{MysqlDestTaskConfig: mysqlDstConfig}
@@ -1060,6 +1062,7 @@ func buildMysqlDestTaskDetail(taskName string, internalTaskConfig common.DtleTas
10601062
DependencyHistorySize: internalTaskConfig.DependencyHistorySize,
10611063
BulkInsert1: internalTaskConfig.BulkInsert1,
10621064
BulkInsert2: internalTaskConfig.BulkInsert2,
1065+
RetryTxLimit: internalTaskConfig.RetryTxLimit,
10631066
SetGtidNext: internalTaskConfig.SetGtidNext,
10641067
}
10651068
destTaskDetail.TaskConfig = models.DestTaskConfig{

api/models/job_v2.go

+1
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ type MysqlDestTaskConfig struct {
182182
DependencyHistorySize int `json:"dependency_history_size"`
183183
BulkInsert1 int `json:"bulk_insert1"`
184184
BulkInsert2 int `json:"bulk_insert2"`
185+
RetryTxLimit int `json:"retry_tx_limit"`
185186
SetGtidNext bool `json:"set_gtid_next"`
186187
}
187188

driver/common/taskconfig.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type DtleTaskConfig struct {
5757
BulkInsert1 int `codec:"BulkInsert1"`
5858
BulkInsert2 int `codec:"BulkInsert2"`
5959
BulkInsert3 int `codec:"BulkInsert3"`
60+
RetryTxLimit int `codec:"RetryTxLimit"`
6061
SlaveNetWriteTimeout int `codec:"SlaveNetWriteTimeout"`
6162
BigTxSrcQueue int32 `codec:"BigTxSrcQueue"`
6263
TwoWaySync bool `codec:"TwoWaySync"`
@@ -116,7 +117,7 @@ type MySQLDriverConfig struct {
116117
RowCopyStartTime time.Time
117118
RowCopyEndTime time.Time
118119

119-
Stage string
120+
Stage string
120121
}
121122

122123
// ElapsedRowCopyTime returns time since starting to copy chunks of rows

driver/driver.go

+2
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ var (
162162
hclspec.NewLiteral(`8`)),
163163
"BulkInsert3": hclspec.NewDefault(hclspec.NewAttr("BulkInsert3", "number", false),
164164
hclspec.NewLiteral(`128`)),
165+
"RetryTxLimit": hclspec.NewDefault(hclspec.NewAttr("RetryTxLimit", "number", false),
166+
hclspec.NewLiteral(`3`)),
165167
"BigTxSrcQueue": hclspec.NewDefault(hclspec.NewAttr("BigTxSrcQueue", "number", false),
166168
hclspec.NewLiteral(`3`)),
167169
"KafkaConfig": hclspec.NewBlock("KafkaConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{

driver/mysql/applier_incr.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -234,12 +234,11 @@ func (a *ApplierIncr) MtsWorker(workerIndex int) {
234234
case entryContext := <-a.applyBinlogMtsTxQueue:
235235
hasEntry = true
236236
logger.Debug("a binlogEntry MTS dequeue", "gno", entryContext.Entry.Coordinates.GetGNO())
237-
const deadlockTryLimit = 3
238237
for iTry := 0; ; iTry++ {
239238
err := a.ApplyBinlogEvent(workerIndex, entryContext)
240239
if err != nil {
241240
if merr, isME := err.(*mysqldriver.MySQLError); isME {
242-
if merr.Number == sql.ErrLockDeadlock && iTry < deadlockTryLimit {
241+
if merr.Number == sql.ErrLockDeadlock && iTry < a.mysqlContext.RetryTxLimit {
243242
logger.Info("found deadlock. will retry tx", "gno", entryContext.Entry.Coordinates.GetGNO(),
244243
"iTry", iTry)
245244
continue

0 commit comments

Comments
 (0)