Skip to content

Commit cc14fce

Browse files
author
ffffwh
committed
optimize the big tx DML pipeline #1018.
Do not re-creating goroutine & channels.
1 parent a76ff19 commit cc14fce

File tree

3 files changed

+95
-81
lines changed

3 files changed

+95
-81
lines changed

driver/mysql/applier.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1075,9 +1075,8 @@ func (a *Applier) Shutdown() error {
10751075
close(a.shutdownCh)
10761076

10771077
if a.ai != nil {
1078-
a.ai.wg.Wait()
1078+
a.ai.Shutdown()
10791079
}
1080-
a.logger.Debug("Shutdown. a.ai.wg.Wait. after")
10811080
a.wg.Wait()
10821081
a.logger.Debug("Shutdown. a.wg.Wait. after")
10831082

driver/mysql/applier_incr.go

+93-79
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
sql "github.com/actiontech/dtle/driver/mysql/sql"
1717
"github.com/actiontech/dtle/g"
1818
gomysql "github.com/go-mysql-org/go-mysql/mysql"
19-
"github.com/hashicorp/go-hclog"
2019
"github.com/pkg/errors"
2120
uuid "github.com/satori/go.uuid"
2221
)
@@ -62,6 +61,8 @@ type ApplierIncr struct {
6261

6362
wg sync.WaitGroup
6463
SkipGtidExecutedTable bool
64+
logTxCommit bool
65+
noBigTxDMLPipe bool
6566

6667
mtsManager *MtsManager
6768
wsManager *WritesetManager
@@ -71,8 +72,9 @@ type ApplierIncr struct {
7172
sourceType string
7273
tableSpecs []*common.TableSpec
7374

74-
inBigTx bool
75-
logTxCommit bool
75+
inBigTx bool
76+
bigTxEventQueue chan *dmlExecItem
77+
bigTxEventWg sync.WaitGroup
7678
}
7779

7880
func NewApplierIncr(applier *Applier, sourcetype string) (*ApplierIncr, error) {
@@ -94,6 +96,7 @@ func NewApplierIncr(applier *Applier, sourcetype string) (*ApplierIncr, error) {
9496
gtidSetLock: applier.gtidSetLock,
9597
tableItems: make(mapSchemaTableItems),
9698
sourceType: sourcetype,
99+
bigTxEventQueue: make(chan *dmlExecItem, 16),
97100
}
98101

99102
if g.EnvIsTrue(g.ENV_SKIP_GTID_EXECUTED_TABLE) {
@@ -103,6 +106,10 @@ func NewApplierIncr(applier *Applier, sourcetype string) (*ApplierIncr, error) {
103106
if g.EnvIsTrue(g.ENV_DTLE_LOG_TX_COMMIT) {
104107
a.logTxCommit = true
105108
}
109+
if g.EnvIsTrue(g.ENV_DTLE_NO_BIG_TX_DML_PIPE) {
110+
a.logger.Info("found DTLE_NO_BIG_TX_DML_PIPE")
111+
a.noBigTxDMLPipe = true
112+
}
106113

107114
a.timestampCtx = NewTimestampContext(a.shutdownCh, a.logger, func() bool {
108115
return len(a.binlogEntryQueue) == 0 && len(a.applyBinlogMtsTxQueue) == 0
@@ -186,11 +193,32 @@ func (a *ApplierIncr) Run() (err error) {
186193
return nil
187194
}
188195

196+
func (a *ApplierIncr) bigTxQueueExecutor() {
197+
for {
198+
item := <-a.bigTxEventQueue
199+
if item == nil {
200+
break
201+
}
202+
203+
if !a.HasShutdown() {
204+
err := a.prepareIfNilAndExecute(item, 0)
205+
if err != nil {
206+
a.OnError(common.TaskStateDead, err)
207+
}
208+
}
209+
a.bigTxEventWg.Done()
210+
}
211+
}
212+
189213
func (a *ApplierIncr) MtsWorker(workerIndex int) {
190214
keepLoop := true
191215

192216
logger := a.logger.With("worker", workerIndex)
193217

218+
if workerIndex == 0 {
219+
go a.bigTxQueueExecutor()
220+
}
221+
194222
t := time.NewTicker(pingInterval)
195223
defer t.Stop()
196224
hasEntry := false
@@ -445,6 +473,38 @@ func (a *ApplierIncr) HasShutdown() bool {
445473
return false
446474
}
447475
}
476+
func (a *ApplierIncr) prepareIfNilAndExecute(item *dmlExecItem, workerIdx int) (err error) {
477+
// hasUK bool, pstmt **gosql.Stmt, query string, args []interface{}
478+
var r gosql.Result
479+
480+
if item.hasUK {
481+
if *item.pstmt == nil {
482+
a.logger.Debug("buildDMLEventQuery prepare query", "query", item.query)
483+
*item.pstmt, err = a.dbs[workerIdx].Db.PrepareContext(a.ctx, item.query)
484+
if err != nil {
485+
a.logger.Error("buildDMLEventQuery prepare query", "query", item.query, "err", err)
486+
return err
487+
}
488+
}
489+
490+
r, err = (*item.pstmt).ExecContext(a.ctx, item.args...)
491+
} else {
492+
r, err = a.dbs[workerIdx].Db.ExecContext(a.ctx, item.query, item.args...)
493+
}
494+
495+
if err != nil {
496+
a.logger.Error("error at exec", "gno", item.gno, "err", err)
497+
return err
498+
}
499+
500+
nr, err := r.RowsAffected()
501+
if err != nil {
502+
a.logger.Error("RowsAffected error", "gno", item.gno, "event", 0, "err", err)
503+
} else {
504+
a.logger.Debug("RowsAffected.after", "gno", item.gno, "event", 0, "nr", nr)
505+
}
506+
return nil
507+
}
448508

449509
// ApplyEventQueries applies multiple DML queries onto the dest table
450510
func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.EntryContext) error {
@@ -455,7 +515,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
455515

456516
var err error
457517
var timestamp uint32
458-
txSid := binlogEntry.Coordinates.GetSid()
518+
gno := binlogEntry.Coordinates.GetGNO()
459519

460520
dbApplier.DbMutex.Lock()
461521
if dbApplier.Tx == nil {
@@ -469,79 +529,17 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
469529
atomic.AddInt64(a.memory2, -int64(binlogEntry.Size()))
470530
}()
471531

472-
type execItem struct {
473-
hasUK bool
474-
pstmt **gosql.Stmt
475-
query string
476-
args []interface{}
477-
}
478-
prepareIfNilAndExecute := func(hasUK bool, pstmt **gosql.Stmt, query string, args []interface{}) (err error) {
479-
var r gosql.Result
480-
481-
if hasUK {
482-
if *pstmt == nil {
483-
a.logger.Debug("buildDMLEventQuery prepare query", "query", query)
484-
*pstmt, err = a.dbs[workerIdx].Db.PrepareContext(a.ctx, query)
485-
if err != nil {
486-
a.logger.Error("buildDMLEventQuery prepare query", "query", query, "err", err)
487-
return err
488-
}
489-
}
490-
491-
r, err = (*pstmt).ExecContext(a.ctx, args...)
492-
} else {
493-
r, err = a.dbs[workerIdx].Db.ExecContext(a.ctx, query, args...)
494-
}
495-
496-
if err != nil {
497-
logger.Error("error at exec", "gtid", hclog.Fmt("%s:%d", txSid, binlogEntry.Coordinates.GetGNO()),
498-
"err", err)
499-
return err
500-
}
501-
502-
nr, err := r.RowsAffected()
503-
if err != nil {
504-
logger.Error("RowsAffected error", "gno", binlogEntry.Coordinates.GetGNO(), "event", 0, "err", err)
505-
} else {
506-
logger.Debug("RowsAffected.after", "gno", binlogEntry.Coordinates.GetGNO(), "event", 0, "nr", nr)
507-
}
508-
return nil
509-
}
510-
511-
somequeue := make(chan *execItem, 16)
512-
queueOrExec := func(item *execItem) error {
513-
if a.inBigTx {
514-
somequeue <- item
532+
queueOrExec := func(item *dmlExecItem) error {
533+
// TODO check if shutdown?
534+
if !a.noBigTxDMLPipe && a.inBigTx {
535+
a.bigTxEventWg.Add(1)
536+
a.bigTxEventQueue <- item
515537
return nil
516538
} else {
517-
return prepareIfNilAndExecute(item.hasUK, item.pstmt, item.query, item.args)
539+
return a.prepareIfNilAndExecute(item, workerIdx)
518540
}
519541
}
520542

521-
executionLoopExitedCh := make(chan struct{})
522-
if a.inBigTx {
523-
go func() {
524-
defer close(executionLoopExitedCh)
525-
for {
526-
select {
527-
case item := <-somequeue:
528-
if item == nil {
529-
return
530-
}
531-
532-
err := prepareIfNilAndExecute(item.hasUK, item.pstmt, item.query, item.args)
533-
if err != nil {
534-
a.OnError(common.TaskStateDead, err)
535-
}
536-
case <-a.shutdownCh:
537-
return
538-
}
539-
}
540-
}()
541-
} else {
542-
close(executionLoopExitedCh)
543-
}
544-
545543
for i, event := range binlogEntry.Events {
546544
if a.HasShutdown() {
547545
break
@@ -680,7 +678,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
680678
}
681679
a.logger.Debug("BuildDMLInsertQuery", "query", query)
682680

683-
err = queueOrExec(&execItem{true, pstmt, query, sharedArgs})
681+
err = queueOrExec(&dmlExecItem{true, pstmt, query, sharedArgs, gno})
684682
if err != nil {
685683
return err
686684
}
@@ -695,7 +693,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
695693
}
696694
a.logger.Debug("BuildDMLDeleteQuery", "query", query)
697695

698-
err = queueOrExec(&execItem{hasUK, pstmt, query, uniqueKeyArgs})
696+
err = queueOrExec(&dmlExecItem{hasUK, pstmt, query, uniqueKeyArgs, gno})
699697
if err != nil {
700698
return err
701699
}
@@ -722,7 +720,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
722720
return err
723721
}
724722

725-
err = queueOrExec(&execItem{true, pstmt, query, sharedArgs})
723+
err = queueOrExec(&dmlExecItem{true, pstmt, query, sharedArgs, gno})
726724
if err != nil {
727725
return err
728726
}
@@ -735,7 +733,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
735733
}
736734
a.logger.Debug("BuildDMLDeleteQuery", "query", query)
737735

738-
err = queueOrExec(&execItem{hasUK, pstmt, query, uniqueKeyArgs})
736+
err = queueOrExec(&dmlExecItem{hasUK, pstmt, query, uniqueKeyArgs, gno})
739737
if err != nil {
740738
return err
741739
}
@@ -750,7 +748,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
750748
args = append(args, sharedArgs...)
751749
args = append(args, uniqueKeyArgs...)
752750

753-
err = queueOrExec(&execItem{hasUK, pstmt, query, args})
751+
err = queueOrExec(&dmlExecItem{hasUK, pstmt, query, args, gno})
754752
if err != nil {
755753
return err
756754
}
@@ -769,8 +767,10 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
769767
timestamp = event.Timestamp
770768
atomic.AddUint64(&a.appliedQueryCount, uint64(1))
771769
}
772-
close(somequeue)
773-
<-executionLoopExitedCh
770+
a.bigTxEventWg.Wait()
771+
if a.HasShutdown() {
772+
return fmt.Errorf("ApplyBinlogEvent: applier has been shutdown. gno %v", gno)
773+
}
774774

775775
if binlogEntry.Final {
776776
if !a.SkipGtidExecutedTable && a.sourceType == "mysql" {
@@ -887,3 +887,17 @@ func (a *ApplierIncr) handleEntryOracle(entryCtx *common.EntryContext) (err erro
887887
}
888888
return nil
889889
}
890+
891+
func (a *ApplierIncr) Shutdown() {
892+
close(a.bigTxEventQueue)
893+
a.wg.Wait()
894+
a.logger.Debug("Shutdown. ApplierIncr.wg.Wait. after")
895+
}
896+
897+
type dmlExecItem struct {
898+
hasUK bool
899+
pstmt **gosql.Stmt
900+
query string
901+
args []interface{}
902+
gno int64 // for log only
903+
}

g/g.go

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const (
4444
ENV_SKIP_GTID_EXECUTED_TABLE = "DTLE_SKIP_GTID_EXECUTED_TABLE"
4545
ENV_FORCE_MTS = "DTLE_FORCE_MTS"
4646
ENV_DTLE_LOG_TX_COMMIT = "DTLE_LOG_TX_COMMIT"
47+
ENV_DTLE_NO_BIG_TX_DML_PIPE = "DTLE_NO_BIG_TX_DML_PIPE"
4748

4849
NatsMaxPayload = 64 * 1024 * 1024
4950

0 commit comments

Comments
 (0)