Skip to content

Commit 4292240

Browse files
author
ffffwh
committed
event filter (#360, #392-1)
Having considered the code structure, we opt for a blacklist mode. New job task config field: "SqlFilter" - Type array of string - E.g. `SqlFilter`: ["NoDMLDelete", "NoDMLUpdate"] - Current valid items - NoDML, NoDMLDelete, NoDMLInsert, NoDMLUpdate, NoDDL - case insensitive
1 parent a3163bf commit 4292240

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

internal/client/driver/mysql/binlog/binlog_reader.go

+64-2
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,50 @@ type BinlogReader struct {
6868
shutdown bool
6969
shutdownCh chan struct{}
7070
shutdownLock sync.Mutex
71+
72+
sqlFilter *SqlFilter
73+
}
74+
75+
type SqlFilter struct {
76+
NoDML bool
77+
NoDMLDelete bool
78+
NoDMLInsert bool
79+
NoDMLUpdate bool
80+
NoDDL bool
81+
//NoDDLCreateTable bool
82+
//NoDDLAlterTable bool
83+
}
84+
func parseSqlFilter(strs []string) (*SqlFilter, error) {
85+
s := &SqlFilter{}
86+
for i := range strs {
87+
switch strings.ToLower(strs[i]) {
88+
case "nodml":
89+
s.NoDML = true
90+
case "nodmldelete":
91+
s.NoDMLDelete = true
92+
case "nodmlinsert":
93+
s.NoDMLInsert = true
94+
case "nodmlupdate":
95+
s.NoDMLUpdate = true
96+
case "noddl":
97+
s.NoDDL = true
98+
//case "noddlcreatetable":
99+
// s.NoDDLCreateTable = true
100+
//case "noddlaltertable":
101+
// s.NoDDLAlterTable = true
102+
default:
103+
return nil, fmt.Errorf("unknown sql filter item: %v", strs[i])
104+
}
105+
}
106+
return s, nil
71107
}
72108

73109
func NewMySQLReader(cfg *config.MySQLDriverConfig, logger *log.Entry, replicateDoDb []*config.DataSource) (binlogReader *BinlogReader, err error) {
110+
sqlFilter, err := parseSqlFilter(cfg.SqlFilter)
111+
if err != nil {
112+
return nil, err
113+
}
114+
74115
binlogReader = &BinlogReader{
75116
logger: logger,
76117
currentCoordinates: base.BinlogCoordinateTx{},
@@ -80,6 +121,7 @@ func NewMySQLReader(cfg *config.MySQLDriverConfig, logger *log.Entry, replicateD
80121
ReMap: make(map[string]*regexp.Regexp),
81122
shutdownCh: make(chan struct{}),
82123
tables: make(map[string](map[string]*config.TableContext)),
124+
sqlFilter: sqlFilter,
83125
}
84126

85127
for _, db := range replicateDoDb {
@@ -297,6 +339,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
297339
return nil
298340
}
299341
}
342+
300343
if !ddlInfo.isDDL {
301344
event := NewQueryEvent(
302345
currentSchema,
@@ -307,6 +350,13 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
307350
entriesChannel <- b.currentBinlogEntry
308351
b.LastAppliedRowsEventHint = b.currentCoordinates
309352
return nil
353+
} else {
354+
// it is a ddl
355+
}
356+
357+
if b.sqlFilter.NoDDL {
358+
b.logger.Debugf("mysql.reader. skipped_a_query_event. query: %v", query)
359+
return nil
310360
}
311361

312362
for i, sql := range ddlInfo.sqls {
@@ -381,12 +431,24 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c
381431
return nil
382432
}
383433

434+
schemaName := string(rowsEvent.Table.Schema)
435+
tableName := string(rowsEvent.Table.Table)
436+
437+
if b.sqlFilter.NoDML ||
438+
(b.sqlFilter.NoDMLDelete && dml == DeleteDML) ||
439+
(b.sqlFilter.NoDMLInsert && dml == InsertDML) ||
440+
(b.sqlFilter.NoDMLUpdate && dml == UpdateDML) {
441+
442+
b.logger.Debugf("mysql.reader. skipped_a_dml_event. type: %v, table: %v.%v", dml, schemaName, tableName)
443+
return nil
444+
}
445+
384446
if dml == NotDML {
385447
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
386448
}
387449
dmlEvent := NewDataEvent(
388-
string(rowsEvent.Table.Schema),
389-
string(rowsEvent.Table.Table),
450+
schemaName,
451+
tableName,
390452
dml,
391453
int(rowsEvent.ColumnCount),
392454
)

internal/config/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ type MySQLDriverConfig struct {
136136
SkipRenamedColumns bool
137137
MaxRetries int64
138138
ChunkSize int64
139+
SqlFilter []string
139140
niceRatio float64
140141
MaxLagMillisecondsThrottleThreshold int64
141142
maxLoad umconf.LoadMap

0 commit comments

Comments
 (0)