Skip to content

Commit f405a0e

Browse files
author
ffffwh
committed
Merge branch 'master' into 3.21.01.0
2 parents af771be + 79effde commit f405a0e

File tree

6 files changed

+51
-53
lines changed

6 files changed

+51
-53
lines changed

drivers/mysql/common/binlog.go

+11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ func NewBinlogEntryAt(coordinates BinlogCoordinateTx) *BinlogEntry {
4949
return binlogEntry
5050
}
5151

52+
func (b *BinlogEntry) HasDDL() bool {
53+
for i := range b.Events {
54+
switch b.Events[i].DML {
55+
case NotDML:
56+
return true
57+
default:
58+
}
59+
}
60+
return false
61+
}
62+
5263
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
5364
func (b *BinlogEntry) String() string {
5465
return fmt.Sprintf("[BinlogEntry at %+v]", b.Coordinates)

drivers/mysql/kafka/kafka3.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,6 @@ func NewKafkaRunner(execCtx *common.ExecContext, cfg *common.KafkaConfig, logger
112112
return kr
113113
}
114114

115-
func (kr *KafkaRunner) updateGtidString() {
116-
kr.Gtid = kr.gtidSet.String()
117-
kr.logger.Debug("kafka. updateGtidString", "gtid", kr.Gtid)
118-
}
119115
func (kr *KafkaRunner) updateGtidLoop() {
120116
updateGtidInterval := 15 * time.Second
121117
for !kr.shutdown {
@@ -1056,7 +1052,8 @@ func (kr *KafkaRunner) kafkaTransformDMLEventQueries(dmlEntries []*common.Binlog
10561052
kr.BinlogPos = curDmlEntry.Coordinates.LogPos
10571053

10581054
common.UpdateGtidSet(kr.gtidSet, curDmlEntry.Coordinates.SID, curDmlEntry.Coordinates.GNO)
1059-
kr.updateGtidString()
1055+
kr.Gtid = kr.gtidSet.String()
1056+
kr.logger.Debug("kafka. updateGtidString", "gtid", kr.Gtid)
10601057

10611058
kr.logger.Debug("kafka: after kafkaTransformDMLEventQueries")
10621059
return nil

drivers/mysql/mysql/applier.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type Applier struct {
8484
stubFullApplyDelay time.Duration
8585

8686
gtidSet *gomysql.MysqlGTIDSet
87+
gtidSetLock *sync.RWMutex
8788

8889
storeManager *common.StoreManager
8990
gtidCh chan *common.BinlogCoordinateTx
@@ -109,6 +110,7 @@ func NewApplier(
109110
rowCopyComplete: make(chan struct{}),
110111
copyRowsQueue: make(chan *common.DumpEntry, 24),
111112
waitCh: waitCh,
113+
gtidSetLock: &sync.RWMutex{},
112114
shutdownCh: make(chan struct{}),
113115
storeManager: storeManager,
114116
gtidCh: make(chan *common.BinlogCoordinateTx, 4096),
@@ -191,7 +193,9 @@ func (a *Applier) updateGtidLoop() {
191193
doUpdate()
192194
doUpload()
193195
} else {
196+
a.gtidSetLock.Lock()
194197
common.UpdateGtidSet(a.gtidSet, coord.SID, coord.GNO)
198+
a.gtidSetLock.Unlock()
195199
file = coord.LogFile
196200
pos = coord.LogPos
197201
}
@@ -241,7 +245,7 @@ func (a *Applier) Run() {
241245
}
242246

243247
a.ai, err = NewApplierIncr(a.subject, a.mysqlContext, a.logger, a.gtidSet, a.memory2,
244-
a.db, a.dbs, a.shutdownCh)
248+
a.db, a.dbs, a.shutdownCh, a.gtidSetLock)
245249
if err != nil {
246250
a.onError(TaskStateDead, errors.Wrap(err, "NewApplierIncr"))
247251
return

drivers/mysql/mysql/applier_gtid_executed.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,20 @@ func (a *GtidExecutedCreater) createTableGtidExecutedV4() error {
245245
}
246246
}
247247

248-
func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, intervalStr string) error {
248+
func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, txSid string) error {
249249
a.logger.Debug("incr. cleanup before WaitForExecution")
250250
if !a.mtsManager.WaitForAllCommitted() {
251251
return nil // shutdown
252252
}
253253
a.logger.Debug("incr. cleanup after WaitForExecution")
254254

255+
var intervalStr string
256+
{
257+
a.gtidSetLock.RLock()
258+
intervals := base.GetIntervals(a.gtidSet, txSid)
259+
intervalStr = base.StringInterval(intervals)
260+
a.gtidSetLock.RUnlock()
261+
}
255262
// The TX is unnecessary if we first insert and then delete.
256263
// However, consider `binlog_group_commit_sync_delay > 0`,
257264
// `begin; delete; insert; commit;` (1 TX) is faster than `insert; delete;` (2 TX)

drivers/mysql/mysql/applier_incr.go

+21-37
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/satori/go.uuid"
1616
gomysql "github.com/siddontang/go-mysql/mysql"
1717
"os"
18+
"sync"
1819
"sync/atomic"
1920
"time"
2021
)
@@ -43,6 +44,7 @@ type ApplierIncr struct {
4344
TotalDeltaCopied int64
4445

4546
gtidSet *gomysql.MysqlGTIDSet
47+
gtidSetLock *sync.RWMutex
4648
gtidItemMap base.GtidItemMap
4749
GtidUpdateHook func(*common.BinlogCoordinateTx)
4850

@@ -53,7 +55,8 @@ type ApplierIncr struct {
5355

5456
func NewApplierIncr(subject string, mysqlContext *common.MySQLDriverConfig,
5557
logger hclog.Logger, gtidSet *gomysql.MysqlGTIDSet, memory2 *int64,
56-
db *gosql.DB, dbs []*sql.Conn, shutdownCh chan struct{}) (*ApplierIncr, error) {
58+
db *gosql.DB, dbs []*sql.Conn, shutdownCh chan struct{},
59+
gtidSetLock *sync.RWMutex) (*ApplierIncr, error) {
5760

5861
a := &ApplierIncr{
5962
logger: logger,
@@ -67,6 +70,7 @@ func NewApplierIncr(subject string, mysqlContext *common.MySQLDriverConfig,
6770
memory2: memory2,
6871
printTps: os.Getenv(g.ENV_PRINT_TPS) != "",
6972
gtidSet: gtidSet,
73+
gtidSetLock: gtidSetLock,
7074
tableItems: make(mapSchemaTableItems),
7175
}
7276

@@ -206,11 +210,15 @@ func (a *ApplierIncr) heterogeneousReplay() {
206210
txSid := binlogEntry.Coordinates.GetSid()
207211

208212
gtidSetItem := a.gtidItemMap.GetItem(binlogEntry.Coordinates.SID)
209-
intervals := base.GetIntervals(a.gtidSet, txSid)
210-
if base.IntervalSlicesContainOne(intervals, binlogEntry.Coordinates.GNO) {
211-
// entry executed
212-
a.logger.Debug("skip an executed tx", "sid", txSid, "gno", binlogEntry.Coordinates.GNO)
213-
continue
213+
{
214+
a.gtidSetLock.RLock()
215+
intervals := base.GetIntervals(a.gtidSet, txSid)
216+
if base.IntervalSlicesContainOne(intervals, binlogEntry.Coordinates.GNO) {
217+
// entry executed
218+
a.logger.Info("skip an executed tx", "sid", txSid, "gno", binlogEntry.Coordinates.GNO)
219+
continue
220+
}
221+
a.gtidSetLock.RUnlock()
214222
}
215223
// endregion
216224
// this must be after duplication check
@@ -224,7 +232,7 @@ func (a *ApplierIncr) heterogeneousReplay() {
224232

225233
a.logger.Debug("gtidSetItem", "NRow", gtidSetItem.NRow)
226234
if gtidSetItem.NRow >= cleanupGtidExecutedLimit {
227-
err = a.cleanGtidExecuted(binlogEntry.Coordinates.SID, base.StringInterval(intervals))
235+
err = a.cleanGtidExecuted(binlogEntry.Coordinates.SID, txSid)
228236
if err != nil {
229237
a.OnError(TaskStateDead, err)
230238
return
@@ -256,32 +264,12 @@ func (a *ApplierIncr) heterogeneousReplay() {
256264
a.logger.Warn("DTLE_BUG: len(a.mtsManager.m) should be 0")
257265
}
258266
}
259-
if binlogEntry.Coordinates.SeqenceNumber > a.mtsManager.lastEnqueue + 1 {
260-
if a.mtsManager.lastEnqueue == 0 {
261-
a.logger.Debug("first TX", "seq_num", binlogEntry.Coordinates.SeqenceNumber)
262-
if binlogEntry.Coordinates.SeqenceNumber > 0 {
263-
a.mtsManager.lastEnqueue = binlogEntry.Coordinates.SeqenceNumber - 1
264-
a.mtsManager.lastCommitted = a.mtsManager.lastEnqueue
265-
}
266-
} else {
267-
err := fmt.Errorf("found non-continuous tx seq_num. last %v this %v",
268-
a.mtsManager.lastEnqueue, binlogEntry.Coordinates.SeqenceNumber)
269-
a.logger.Error(err.Error())
270-
a.OnError(TaskStateDead, err)
271-
return
272-
}
267+
// If there are TXs skipped by udup source-side
268+
for a.mtsManager.lastEnqueue+1 < binlogEntry.Coordinates.SeqenceNumber {
269+
a.mtsManager.lastEnqueue += 1
270+
a.mtsManager.chExecuted <- a.mtsManager.lastEnqueue
273271
}
274-
hasDDL := func() bool {
275-
for i := range binlogEntry.Events {
276-
dmlEvent := &binlogEntry.Events[i]
277-
switch dmlEvent.DML {
278-
case common.NotDML:
279-
return true
280-
default:
281-
}
282-
}
283-
return false
284-
}()
272+
hasDDL := binlogEntry.HasDDL()
285273
// DDL must be executed separatedly
286274
if hasDDL || prevDDL {
287275
a.logger.Debug("MTS found DDL. WaitForAllCommitted",
@@ -290,11 +278,7 @@ func (a *ApplierIncr) heterogeneousReplay() {
290278
return // shutdown
291279
}
292280
}
293-
if hasDDL {
294-
prevDDL = true
295-
} else {
296-
prevDDL = false
297-
}
281+
prevDDL = hasDDL
298282

299283
if !a.mtsManager.WaitForExecution(binlogEntry) {
300284
return // shutdown

drivers/mysql/mysql/binlog/binlog_reader.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -484,9 +484,9 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
484484
skipExpandSyntax = isExpandSyntaxQuery(query) || ddlInfo.isExpand
485485
}
486486

487-
schema := b.findSchema(currentSchema)
488487
currentSchemaRename := currentSchema
489-
if schema.TableSchemaRename != "" {
488+
schema := b.findSchema(currentSchema)
489+
if schema != nil && schema.TableSchemaRename != "" {
490490
currentSchemaRename = schema.TableSchemaRename
491491
}
492492

@@ -756,13 +756,8 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
756756
dmlEvent.TableName = table.Table.TableRename
757757
b.logger.Debug("dml table mapping", "from", dmlEvent.TableName, "to", table.Table.TableRename)
758758
}
759-
for _, schema := range b.mysqlContext.ReplicateDoDb {
760-
if schema.TableSchema != schemaName {
761-
continue
762-
}
763-
if schema.TableSchemaRename == "" {
764-
continue
765-
}
759+
schema := b.findSchema(schemaName)
760+
if schema != nil && schema.TableSchemaRename != "" {
766761
b.logger.Debug("dml schema mapping", "from", dmlEvent.DatabaseName, "to", schema.TableSchemaRename)
767762
dmlEvent.DatabaseName = schema.TableSchemaRename
768763
}

0 commit comments

Comments
 (0)