Skip to content

Commit fc23fe3

Browse files
author
ffffwh
committed
shutdown job on src changed in incr stage #680-2
1 parent 5d0b92b commit fc23fe3

File tree

4 files changed

+32
-14
lines changed

4 files changed

+32
-14
lines changed

drivers/mysql/mysql/applier_incr.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ func NewApplierIncr(subject string, mysqlContext *common.MySQLDriverConfig,
9191
}
9292

9393
func (a *ApplierIncr) Run() (err error) {
94-
a.logger.Debug("beging connetion mysql 4 validate serverid")
95-
if err := a.validateServerUUID(); err != nil {
94+
a.logger.Debug("Run. GetServerUUID. before")
95+
a.MySQLServerUuid, err = sql.GetServerUUID(a.db)
96+
if err != nil {
9697
return err
9798
}
9899

@@ -600,11 +601,3 @@ func (a *ApplierIncr) setTableItemForBinlogEntry(binlogEntry *common.BinlogEntry
600601
}
601602
return nil
602603
}
603-
604-
func (a *ApplierIncr) validateServerUUID() error {
605-
query := `SELECT @@SERVER_UUID`
606-
if err := a.db.QueryRow(query).Scan(&a.MySQLServerUuid); err != nil {
607-
return err
608-
}
609-
return nil
610-
}

drivers/mysql/mysql/binlog/binlog_reader.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ package binlog
88

99
import (
1010
"bytes"
11+
gosql "database/sql"
1112
"github.com/actiontech/dtle/drivers/mysql/common"
13+
"github.com/actiontech/dtle/drivers/mysql/mysql/sql"
1214
"github.com/pingcap/parser/format"
1315
"github.com/pkg/errors"
1416
"github.com/shirou/gopsutil/mem"
@@ -91,6 +93,8 @@ type BinlogReader struct {
9193
maybeSqleContext *sqle.Context
9294
memory *int64
9395
extractedTxCount uint32
96+
db *gosql.DB
97+
serverUUID string
9498
}
9599

96100
type SqlFilter struct {
@@ -154,9 +158,7 @@ func parseSqlFilter(strs []string) (*SqlFilter, error) {
154158
return s, nil
155159
}
156160

157-
func NewMySQLReader(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger hclog.Logger,
158-
replicateDoDb []*common.DataSource, sqleContext *sqle.Context, memory *int64,
159-
) (binlogReader *BinlogReader, err error) {
161+
func NewMySQLReader(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger hclog.Logger, replicateDoDb []*common.DataSource, sqleContext *sqle.Context, memory *int64, db *gosql.DB) (binlogReader *BinlogReader, err error) {
160162

161163
sqlFilter, err := parseSqlFilter(cfg.SqlFilter)
162164
if err != nil {
@@ -176,6 +178,12 @@ func NewMySQLReader(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig,
176178
sqlFilter: sqlFilter,
177179
maybeSqleContext: sqleContext,
178180
memory: memory,
181+
db: db,
182+
}
183+
184+
binlogReader.serverUUID, err = sql.GetServerUUID(db)
185+
if err != nil {
186+
return nil, err
179187
}
180188

181189
for _, db := range replicateDoDb {
@@ -1041,6 +1049,15 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *common.BinlogEntr
10411049
b.currentCoordMutex.Unlock()
10421050

10431051
if ev.Header.EventType == replication.ROTATE_EVENT {
1052+
serverUUID, err := sql.GetServerUUID(b.db)
1053+
if err != nil {
1054+
return errors.Wrap(err, "on rotate_event. GetServerUUID")
1055+
}
1056+
if serverUUID != b.serverUUID {
1057+
return fmt.Errorf("serverUUID changed from %v to %v. job should restart",
1058+
b.serverUUID, serverUUID)
1059+
}
1060+
10441061
rotateEvent := ev.Event.(*replication.RotateEvent)
10451062
nextLogName := string(rotateEvent.NextLogName)
10461063
b.currentCoordMutex.Lock()

drivers/mysql/mysql/extractor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ func (e *Extractor) getSchemaTablesAndMeta() error {
669669
// Cooperate with `initiateStreaming()` using `e.streamerReadyCh`. Any err will be sent thru the chan.
670670
func (e *Extractor) initBinlogReader(binlogCoordinates *base.BinlogCoordinatesX) {
671671
binlogReader, err := binlog.NewMySQLReader(e.execCtx, e.mysqlContext, e.logger.ResetNamed("reader"),
672-
e.replicateDoDb, e.context, e.memory2)
672+
e.replicateDoDb, e.context, e.memory2, e.db)
673673
if err != nil {
674674
e.logger.Error("err at initBinlogReader: NewMySQLReader", "err", err)
675675
e.streamerReadyCh <- err

drivers/mysql/mysql/sql/sqlutils.go

+8
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,14 @@ type QueryAble interface {
242242
QueryRow(query string, args ...interface{}) *gosql.Row
243243
}
244244

245+
func GetServerUUID(db QueryAble) (result string, err error) {
246+
err = db.QueryRow(`SELECT @@SERVER_UUID /*dtle*/`).Scan(&result)
247+
if err != nil {
248+
return "", err
249+
}
250+
return result, nil
251+
}
252+
245253
// queryResultData returns a raw array of rows for a given query, optionally reading and returning column names
246254
func queryResultData(db *gosql.DB, query string, retrieveColumns bool, args ...interface{}) (ResultData, []string, error) {
247255
var err error

0 commit comments

Comments
 (0)