@@ -420,11 +420,12 @@ func ToColumnValuesV2(abstractValues []interface{}) *common.ColumnValues {
420
420
421
421
// If isDDL, a sql correspond to a table item, aka len(tables) == len(sqls).
422
422
type parseDDLResult struct {
423
- isDDL bool
424
- table common.SchemaTable
423
+ isDDL bool
424
+ table common.SchemaTable
425
425
extraTables []common.SchemaTable
426
- sql string
427
- ast ast.StmtNode
426
+ sql string
427
+ ast ast.StmtNode
428
+ isExpand bool
428
429
}
429
430
430
431
// StreamEvents
@@ -474,15 +475,15 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
474
475
b .hasBeginQuery = true
475
476
} else {
476
477
if strings .ToUpper (query ) == "COMMIT" || ! b .hasBeginQuery {
478
+ ddlInfo , err := resolveDDLSQL (currentSchema , query , b .skipQueryDDL )
479
+
477
480
var skipExpandSyntax bool
478
481
if b .mysqlContext .ExpandSyntaxSupport {
479
482
skipExpandSyntax = false
480
483
} else {
481
- skipExpandSyntax = isExpandSyntaxQuery (query )
484
+ skipExpandSyntax = isExpandSyntaxQuery (query ) || ddlInfo . isExpand
482
485
}
483
486
484
- ddlInfo , err := resolveDDLSQL (currentSchema , query , b .skipQueryDDL )
485
-
486
487
schema := b .findSchema (currentSchema )
487
488
currentSchemaRename := currentSchema
488
489
if schema .TableSchemaRename != "" {
@@ -1026,6 +1027,15 @@ func resolveDDLSQL(currentSchema string, sql string,
1026
1027
setTable (v .Table .Schema .O , v .Table .Name .O )
1027
1028
case * ast.AlterTableStmt :
1028
1029
setTable (v .Table .Schema .O , v .Table .Name .O )
1030
+ case * ast.RevokeStmt , * ast.RevokeRoleStmt :
1031
+ result .isExpand = true
1032
+ case * ast.SetPwdStmt :
1033
+ result .isExpand = true
1034
+ case * ast.FlushStmt :
1035
+ switch v .Tp {
1036
+ case ast .FlushPrivileges :
1037
+ result .isExpand = true
1038
+ }
1029
1039
case * ast.DropTableStmt :
1030
1040
var newTables []* ast.TableName
1031
1041
for i , t := range v .Tables {
@@ -1060,6 +1070,7 @@ func resolveDDLSQL(currentSchema string, sql string,
1060
1070
}
1061
1071
case * ast.CreateUserStmt , * ast.GrantStmt , * ast.DropUserStmt , * ast.AlterUserStmt :
1062
1072
setTable ("mysql" , "user" )
1073
+ result .isExpand = true
1063
1074
case * ast.RenameTableStmt :
1064
1075
setTable (v .OldTable .Schema .O , v .OldTable .Name .O )
1065
1076
// TODO handle extra tables in v.TableToTables[1:]
@@ -1094,39 +1105,17 @@ func (b *BinlogReader) skipQueryDDL(schema string, tableName string) bool {
1094
1105
func isExpandSyntaxQuery (sql string ) bool {
1095
1106
sql = strings .ToLower (sql )
1096
1107
1097
- if strings .HasPrefix (sql , "flush privileges" ) {
1098
- return true
1099
- }
1100
- if strings .HasPrefix (sql , "alter user" ) {
1101
- return true
1102
- }
1103
- if strings .HasPrefix (sql , "create user" ) {
1104
- return true
1105
- }
1108
+ // TODO mod pingcap/parser to support these DDLs
1109
+ // and use ast instead of string-matching
1106
1110
if strings .HasPrefix (sql , "create function" ) {
1107
1111
return true
1108
1112
}
1109
1113
if strings .HasPrefix (sql , "create procedure" ) {
1110
1114
return true
1111
1115
}
1112
- if strings .HasPrefix (sql , "drop user" ) {
1113
- return true
1114
- }
1115
- if strings .HasPrefix (sql , "delete from mysql.user" ) {
1116
- return true
1117
- }
1118
- if strings .HasPrefix (sql , "grant" ) {
1119
- return true
1120
- }
1121
1116
if strings .HasPrefix (sql , "rename user" ) {
1122
1117
return true
1123
1118
}
1124
- if strings .HasPrefix (sql , "revoke" ) {
1125
- return true
1126
- }
1127
- if strings .HasPrefix (sql , "set password" ) {
1128
- return true
1129
- }
1130
1119
1131
1120
return false
1132
1121
}
0 commit comments