Skip to content

Commit 394c1ac

Browse files
author
ffffwh
committed
rename currentSchema for unrecognized ddl (#557)
1 parent c012bca commit 394c1ac

File tree

1 file changed

+30
-18
lines changed

1 file changed

+30
-18
lines changed

drivers/mysql/mysql/binlog/binlog_reader.go

+30-18
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,12 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
478478

479479
ddlInfo, err := resolveDDLSQL(currentSchema, query, b.skipQueryDDL)
480480

481+
schema := b.findSchema(currentSchema)
482+
currentSchemaRename := currentSchema
483+
if schema.TableSchemaRename != "" {
484+
currentSchemaRename = schema.TableSchemaRename
485+
}
486+
481487
if skipExpandSyntax || err != nil || !ddlInfo.isDDL {
482488
if err != nil {
483489
b.logger.Warn("Parse query event failed. will execute", "query", query, "err", err, "gno", b.currentCoordinates.GNO)
@@ -488,7 +494,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
488494
b.logger.Warn("skip query", "query", query, "gno", b.currentCoordinates.GNO)
489495
} else {
490496
event := common.NewQueryEvent(
491-
currentSchema,
497+
currentSchemaRename,
492498
query,
493499
common.NotDML,
494500
ev.Header.Timestamp,
@@ -524,7 +530,11 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
524530
b.logger.Info("Skip QueryEvent", "currentSchema", currentSchema, "sql", sql,
525531
"realSchema", realSchema, "tableName", tableName, "gno", b.currentCoordinates.GNO)
526532
} else {
527-
schema, table := b.findSchemaTable(realSchema, tableName)
533+
if realSchema != currentSchema {
534+
schema = b.findSchema(realSchema)
535+
}
536+
table := b.findTable(schema, tableName)
537+
528538

529539
skipEvent = skipBySqlFilter(ddlInfo.ast, b.sqlFilter)
530540
switch realAst := ddlInfo.ast.(type) {
@@ -580,9 +590,6 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
580590
//sql = strings.Replace(sql, realSchema, schema.TableSchemaRename, 1)
581591
sql = loadMapping(sql, realSchema, schema.TableSchemaRename, "schemaRename", " ",
582592
ddlInfo.ast)
583-
if currentSchema == realSchema {
584-
currentSchema = schema.TableSchemaRename
585-
}
586593
realSchema = schema.TableSchemaRename
587594
} else {
588595
// schema == nil means it is not explicit in ReplicateDoDb, thus no renaming
@@ -592,7 +599,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
592599
if table != nil && table.TableRename != "" {
593600
ddlInfo.table.Table = table.TableRename
594601
//sql = strings.Replace(sql, tableName, table.TableRename, 1)
595-
sql = loadMapping(sql, tableName, table.TableRename, "", currentSchema,
602+
sql = loadMapping(sql, tableName, table.TableRename, "", currentSchemaRename,
596603
ddlInfo.ast)
597604
b.logger.Debug("ddl table mapping", "from", tableName, "to", table.TableRename)
598605
}
@@ -606,7 +613,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
606613
}
607614

608615
event := common.NewQueryEventAffectTable(
609-
currentSchema,
616+
currentSchemaRename,
610617
sql,
611618
common.NotDML,
612619
ddlInfo.table,
@@ -1455,20 +1462,25 @@ func (b *BinlogReader) sqleAfterCreateSchema(schema string) {
14551462
b.maybeSqleContext.LoadTables(schema, nil)
14561463
}
14571464
}
1458-
func (b *BinlogReader) findSchemaTable(realSchema string, tableName string) (schema *common.DataSource, table *common.Table) {
1459-
for i := range b.mysqlContext.ReplicateDoDb {
1460-
if b.mysqlContext.ReplicateDoDb[i].TableSchema == realSchema {
1461-
schema = b.mysqlContext.ReplicateDoDb[i]
1462-
for j := range b.mysqlContext.ReplicateDoDb[i].Tables {
1463-
if b.mysqlContext.ReplicateDoDb[i].Tables[j].TableName == tableName {
1464-
table = b.mysqlContext.ReplicateDoDb[i].Tables[j]
1465-
return schema, table
1466-
}
1465+
func (b *BinlogReader) findSchema(schemaName string) *common.DataSource {
1466+
if schemaName != "" {
1467+
for i := range b.mysqlContext.ReplicateDoDb {
1468+
if b.mysqlContext.ReplicateDoDb[i].TableSchema == schemaName {
1469+
return b.mysqlContext.ReplicateDoDb[i]
14671470
}
1468-
return schema, nil
14691471
}
14701472
}
1471-
return nil, nil
1473+
return nil
1474+
}
1475+
func (b *BinlogReader) findTable(maybeSchema *common.DataSource, tableName string) *common.Table {
1476+
if maybeSchema != nil {
1477+
for j := range maybeSchema.Tables {
1478+
if maybeSchema.Tables[j].TableName == tableName {
1479+
return maybeSchema.Tables[j]
1480+
}
1481+
}
1482+
}
1483+
return nil
14721484
}
14731485

14741486
func skipBySqlFilter(ddlAst ast.StmtNode, sqlFilter *SqlFilter) bool {

0 commit comments

Comments
 (0)