Skip to content

Commit af77db2

Browse files
author
ffffwh
committed
applier: big tx: add a level of pipeline #1018
1 parent 33196db commit af77db2

File tree

1 file changed

+93
-38
lines changed

1 file changed

+93
-38
lines changed

driver/mysql/applier_incr.go

Lines changed: 93 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,15 @@ func (a *ApplierIncr) heterogeneousReplay() {
432432
}
433433
}
434434

435+
func (a *ApplierIncr) HasShutdown() bool {
436+
select {
437+
case <-a.shutdownCh:
438+
return true
439+
default:
440+
return false
441+
}
442+
}
443+
435444
// ApplyEventQueries applies multiple DML queries onto the dest table
436445
func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.EntryContext) error {
437446
logger := a.logger.Named("ApplyBinlogEvent")
@@ -454,7 +463,84 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
454463
dbApplier.DbMutex.Unlock()
455464
atomic.AddInt64(a.memory2, -int64(binlogEntry.Size()))
456465
}()
466+
467+
type execItem struct {
468+
hasUK bool
469+
pstmt **gosql.Stmt
470+
query string
471+
args []interface{}
472+
}
473+
prepareIfNilAndExecute := func(hasUK bool, pstmt **gosql.Stmt, query string, args []interface{}) (err error) {
474+
var r gosql.Result
475+
476+
if hasUK {
477+
if *pstmt == nil {
478+
a.logger.Debug("buildDMLEventQuery prepare query", "query", query)
479+
*pstmt, err = a.dbs[workerIdx].Db.PrepareContext(a.ctx, query)
480+
if err != nil {
481+
a.logger.Error("buildDMLEventQuery prepare query", "query", query, "err", err)
482+
return err
483+
}
484+
}
485+
486+
r, err = (*pstmt).ExecContext(a.ctx, args...)
487+
} else {
488+
r, err = a.dbs[workerIdx].Db.ExecContext(a.ctx, query, args...)
489+
}
490+
491+
if err != nil {
492+
logger.Error("error at exec", "gtid", hclog.Fmt("%s:%d", txSid, binlogEntry.Coordinates.GetGNO()),
493+
"err", err)
494+
return err
495+
}
496+
497+
nr, err := r.RowsAffected()
498+
if err != nil {
499+
logger.Error("RowsAffected error", "gno", binlogEntry.Coordinates.GetGNO(), "event", 0, "err", err)
500+
} else {
501+
logger.Debug("RowsAffected.after", "gno", binlogEntry.Coordinates.GetGNO(), "event", 0, "nr", nr)
502+
}
503+
return nil
504+
}
505+
506+
somequeue := make(chan *execItem, 16)
507+
queueOrExec := func(item *execItem) error {
508+
if a.inBigTx {
509+
somequeue <- item
510+
return nil
511+
} else {
512+
return prepareIfNilAndExecute(item.hasUK, item.pstmt, item.query, item.args)
513+
}
514+
}
515+
516+
executionLoopExitedCh := make(chan struct{})
517+
if a.inBigTx {
518+
go func() {
519+
defer close(executionLoopExitedCh)
520+
for {
521+
select {
522+
case item := <-somequeue:
523+
if item == nil {
524+
return
525+
}
526+
527+
err := prepareIfNilAndExecute(item.hasUK, item.pstmt, item.query, item.args)
528+
if err != nil {
529+
a.OnError(common.TaskStateDead, err)
530+
}
531+
case <-a.shutdownCh:
532+
return
533+
}
534+
}
535+
}()
536+
} else {
537+
close(executionLoopExitedCh)
538+
}
539+
457540
for i, event := range binlogEntry.Events {
541+
if a.HasShutdown() {
542+
break
543+
}
458544
logger.Debug("binlogEntry.Events", "gno", binlogEntry.Coordinates.GetGNO(), "event", i)
459545

460546
if event.DML == common.NotDML {
@@ -558,39 +644,6 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
558644

559645
tableItem := binlogEntryCtx.TableItems[i]
560646

561-
prepareIfNilAndExecute := func(hasUK bool, pstmt **gosql.Stmt, query string, args []interface{}) (err error) {
562-
var r gosql.Result
563-
564-
if hasUK {
565-
if *pstmt == nil {
566-
a.logger.Debug("buildDMLEventQuery prepare query", "query", query)
567-
*pstmt, err = a.dbs[workerIdx].Db.PrepareContext(a.ctx, query)
568-
if err != nil {
569-
a.logger.Error("buildDMLEventQuery prepare query", "query", query, "err", err)
570-
return err
571-
}
572-
}
573-
574-
r, err = (*pstmt).ExecContext(a.ctx, args...)
575-
} else {
576-
r, err = a.dbs[workerIdx].Db.ExecContext(a.ctx, query, args...)
577-
}
578-
579-
if err != nil {
580-
logger.Error("error at exec", "gtid", hclog.Fmt("%s:%d", txSid, binlogEntry.Coordinates.GetGNO()),
581-
"err", err)
582-
return err
583-
}
584-
585-
nr, err := r.RowsAffected()
586-
if err != nil {
587-
logger.Error("RowsAffected error", "gno", binlogEntry.Coordinates.GetGNO(), "event", i, "err", err)
588-
} else {
589-
logger.Debug("RowsAffected.after", "gno", binlogEntry.Coordinates.GetGNO(), "event", i, "nr", nr)
590-
}
591-
return nil
592-
}
593-
594647
switch event.DML {
595648
case common.InsertDML:
596649
nRows := len(event.Rows)
@@ -622,7 +675,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
622675
}
623676
a.logger.Debug("BuildDMLInsertQuery", "query", query)
624677

625-
err = prepareIfNilAndExecute(true, pstmt, query, sharedArgs)
678+
err = queueOrExec(&execItem{true, pstmt, query, sharedArgs})
626679
if err != nil {
627680
return err
628681
}
@@ -637,7 +690,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
637690
}
638691
a.logger.Debug("BuildDMLDeleteQuery", "query", query)
639692

640-
err = prepareIfNilAndExecute(hasUK, pstmt, query, uniqueKeyArgs)
693+
err = queueOrExec(&execItem{hasUK, pstmt, query, uniqueKeyArgs})
641694
if err != nil {
642695
return err
643696
}
@@ -664,7 +717,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
664717
return err
665718
}
666719

667-
err = prepareIfNilAndExecute(true, pstmt, query, sharedArgs)
720+
err = queueOrExec(&execItem{true, pstmt, query, sharedArgs})
668721
if err != nil {
669722
return err
670723
}
@@ -677,7 +730,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
677730
}
678731
a.logger.Debug("BuildDMLDeleteQuery", "query", query)
679732

680-
err = prepareIfNilAndExecute(hasUK, pstmt, query, uniqueKeyArgs)
733+
err = queueOrExec(&execItem{hasUK, pstmt, query, uniqueKeyArgs})
681734
if err != nil {
682735
return err
683736
}
@@ -692,7 +745,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
692745
args = append(args, sharedArgs...)
693746
args = append(args, uniqueKeyArgs...)
694747

695-
err = prepareIfNilAndExecute(hasUK, pstmt, query, args)
748+
err = queueOrExec(&execItem{hasUK, pstmt, query, args})
696749
if err != nil {
697750
return err
698751
}
@@ -711,6 +764,8 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
711764
timestamp = event.Timestamp
712765
atomic.AddUint64(&a.appliedQueryCount, uint64(1))
713766
}
767+
close(somequeue)
768+
<-executionLoopExitedCh
714769

715770
if binlogEntry.Final {
716771
if !a.SkipGtidExecutedTable && a.sourceType == "mysql" {

0 commit comments

Comments
 (0)