Skip to content

Commit 33196db

Browse files
author
ffffwh
committed
add BulkInsert3 (default 128)
1 parent a59cdc9 commit 33196db

File tree

4 files changed

+11
-1
lines changed

4 files changed

+11
-1
lines changed

driver/common/binlog.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ type ApplierTableItem struct {
162162
PsInsert0 []*sql.Stmt
163163
PsInsert1 []*sql.Stmt
164164
PsInsert2 []*sql.Stmt
165+
PsInsert3 []*sql.Stmt
165166
PsDelete []*sql.Stmt
166167
PsUpdate []*sql.Stmt
167168
ColumnMapTo []string
@@ -173,6 +174,7 @@ func NewApplierTableItem(parallelWorkers int) *ApplierTableItem {
173174
PsInsert0: make([]*sql.Stmt, parallelWorkers),
174175
PsInsert1: make([]*sql.Stmt, parallelWorkers),
175176
PsInsert2: make([]*sql.Stmt, parallelWorkers),
177+
PsInsert3: make([]*sql.Stmt, parallelWorkers),
176178
PsDelete: make([]*sql.Stmt, parallelWorkers),
177179
PsUpdate: make([]*sql.Stmt, parallelWorkers),
178180
}
@@ -190,6 +192,7 @@ func (ait *ApplierTableItem) Reset() {
190192
closeStmts(ait.PsInsert0)
191193
closeStmts(ait.PsInsert1)
192194
closeStmts(ait.PsInsert2)
195+
closeStmts(ait.PsInsert3)
193196
closeStmts(ait.PsDelete)
194197
closeStmts(ait.PsUpdate)
195198

driver/common/taskconfig.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type DtleTaskConfig struct {
5656
WaitOnJob string `codec:"WaitOnJob"`
5757
BulkInsert1 int `codec:"BulkInsert1"`
5858
BulkInsert2 int `codec:"BulkInsert2"`
59+
BulkInsert3 int `codec:"BulkInsert3"`
5960
SlaveNetWriteTimeout int `codec:"SlaveNetWriteTimeout"`
6061
BigTxSrcQueue int32 `codec:"BigTxSrcQueue"`
6162

driver/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ var (
144144
hclspec.NewLiteral(`4`)),
145145
"BulkInsert2": hclspec.NewDefault(hclspec.NewAttr("BulkInsert2", "number", false),
146146
hclspec.NewLiteral(`8`)),
147+
"BulkInsert3": hclspec.NewDefault(hclspec.NewAttr("BulkInsert3", "number", false),
148+
hclspec.NewLiteral(`128`)),
147149
"BigTxSrcQueue": hclspec.NewDefault(hclspec.NewAttr("BigTxSrcQueue", "number", false),
148150
hclspec.NewLiteral(`3`)),
149151
"KafkaConfig": hclspec.NewBlock("KafkaConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{

driver/mysql/applier_incr.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,11 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
597597
for i := 0; i < nRows; {
598598
var pstmt **gosql.Stmt
599599
var rows [][]interface{}
600-
if nRows-i >= a.mysqlContext.BulkInsert2 {
600+
if nRows-i >= a.mysqlContext.BulkInsert3 {
601+
pstmt = &tableItem.PsInsert3[workerIdx]
602+
rows = event.Rows[i : i+a.mysqlContext.BulkInsert3]
603+
i += a.mysqlContext.BulkInsert3
604+
} else if nRows-i >= a.mysqlContext.BulkInsert2 {
601605
pstmt = &tableItem.PsInsert2[workerIdx]
602606
rows = event.Rows[i : i+a.mysqlContext.BulkInsert2]
603607
i += a.mysqlContext.BulkInsert2

0 commit comments

Comments
 (0)