Skip to content

Commit 238ac34

Browse files
authored
ddl: fix incorrect query in async ddl check (#3331)
close #3329
1 parent 5ae6d33 commit 238ac34

File tree

6 files changed

+102
-66
lines changed

6 files changed

+102
-66
lines changed

pkg/sink/mysql/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
cerror "github.com/pingcap/ticdc/pkg/errors"
3333
"github.com/pingcap/ticdc/pkg/security"
3434
"github.com/pingcap/ticdc/pkg/util"
35+
"github.com/pingcap/tidb/br/pkg/version"
3536
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
3637
"go.uber.org/zap"
3738
)
@@ -138,6 +139,9 @@ type Config struct {
138139
DryRunBlockInterval time.Duration
139140
// SlowQuery is the threshold time above which the query will be logged.
140141
SlowQuery time.Duration
142+
143+
// ServerInfo is the version info of the downstream
144+
ServerInfo version.ServerInfo
141145
}
142146

143147
// New returns the default mysql backend config.
@@ -294,7 +298,8 @@ func NewMysqlConfigAndDB(
294298
return nil, nil, err
295299
}
296300

297-
cfg.HasVectorType = ShouldFormatVectorType(db, cfg)
301+
cfg.ServerInfo = getTiDBVersion(db)
302+
cfg.HasVectorType = shouldFormatVectorType(cfg)
298303

299304
// By default, cache-prep-stmts=true, an LRU cache is used for prepared statements,
300305
// two connections are required to process a transaction.

pkg/sink/mysql/helper.go

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,29 @@ import (
3434
timodel "github.com/pingcap/tidb/pkg/meta/model"
3535
"github.com/pingcap/tidb/pkg/parser/charset"
3636
"github.com/pingcap/tidb/pkg/parser/mysql"
37-
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
3837
"go.uber.org/zap"
3938
)
4039

4140
const checkRunningAddIndexSQL = `
42-
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, STATE
41+
SELECT *
4342
FROM information_schema.ddl_jobs
44-
WHERE TABLE_ID = "%d"
43+
WHERE DB_NAME = "%s"
44+
AND TABLE_NAME = "%s"
45+
AND JOB_TYPE LIKE "add index%%"
46+
AND (STATE = "running" OR STATE = "queueing")
47+
LIMIT 1;
48+
`
49+
50+
// Ref: https://github.com/pingcap/tidb/issues/55725
51+
const checkRunningAddIndexSQLForOldVersion = `
52+
ADMIN SHOW DDL JOBS 1
53+
WHERE DB_NAME = "%s"
54+
AND TABLE_NAME = "%s"
4555
AND JOB_TYPE LIKE "add index%%"
4656
AND (STATE = "running" OR STATE = "queueing");
4757
`
4858

49-
const checkRunningSQL = `SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
59+
const checkRunningSQL = `SELECT * FROM information_schema.ddl_jobs
5060
WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`
5161

5262
// CheckIfBDRModeIsSupported checks if the downstream supports set tidb_cdc_write_source variable
@@ -396,28 +406,42 @@ func needWaitAsyncExecDone(t timodel.ActionType) bool {
396406
}
397407
}
398408

399-
// ShouldFormatVectorType return true if vector type should be converted to longtext.
400-
func ShouldFormatVectorType(db *sql.DB, cfg *Config) bool {
409+
func getTiDBVersion(db *sql.DB) version.ServerInfo {
410+
versionInfo, err := export.SelectVersion(db)
411+
if err != nil {
412+
log.Warn("fail to get version", zap.Error(err))
413+
return version.ParseServerInfo("")
414+
}
415+
return version.ParseServerInfo(versionInfo)
416+
}
417+
418+
// shouldFormatVectorType return true if vector type should be converted to longtext.
419+
func shouldFormatVectorType(cfg *Config) bool {
401420
if !cfg.HasVectorType {
402421
log.Warn("please set `has-vector-type` to be true if a column is vector type when the downstream is not TiDB or TiDB version less than specify version",
403422
zap.Any("hasVectorType", cfg.HasVectorType), zap.Any("supportVectorVersion", defaultSupportVectorVersion))
404423
return false
405424
}
406-
versionInfo, err := export.SelectVersion(db)
407-
if err != nil {
408-
log.Warn("fail to get version", zap.Error(err), zap.Bool("isTiDB", cfg.IsTiDB))
409-
return false
410-
}
411-
serverInfo := version.ParseServerInfo(versionInfo)
412425
ver := semver.New(defaultSupportVectorVersion)
413-
if !cfg.IsTiDB || serverInfo.ServerVersion.LessThan(*ver) {
426+
if !cfg.IsTiDB || cfg.ServerInfo.ServerVersion.LessThan(*ver) {
414427
log.Error("downstream unsupport vector type. it will be converted to longtext",
415-
zap.String("version", serverInfo.ServerVersion.String()), zap.String("supportVectorVersion", defaultSupportVectorVersion), zap.Bool("isTiDB", cfg.IsTiDB))
428+
zap.String("version", cfg.ServerInfo.ServerVersion.String()), zap.String("supportVectorVersion", defaultSupportVectorVersion), zap.Bool("isTiDB", cfg.IsTiDB))
416429
return true
417430
}
418431
return false
419432
}
420433

434+
// getCheckRunningAddIndexSQL return different sql according to tidb version
435+
func getCheckRunningAddIndexSQL(cfg *Config) string {
436+
ver := semver.New(defaultRunningAddIndexNewSQLVersion)
437+
if cfg.ServerInfo.ServerVersion.LessThan(*ver) {
438+
log.Info("it will check running AddIndex SQL with old version",
439+
zap.String("version", cfg.ServerInfo.ServerVersion.String()))
440+
return checkRunningAddIndexSQLForOldVersion
441+
}
442+
return checkRunningAddIndexSQL
443+
}
444+
421445
func isRetryableDMLError(err error) bool {
422446
if !cerror.IsRetryableError(err) {
423447
return false
@@ -538,15 +562,15 @@ func AdjustSQLModeCompatible(sqlModes string) (string, error) {
538562
disable := strings.Join(needDisable, ",")
539563
enable := strings.Join(needEnable, ",")
540564

541-
mode, err := tmysql.GetSQLMode(sqlModes)
565+
mode, err := mysql.GetSQLMode(sqlModes)
542566
if err != nil {
543567
return sqlModes, err
544568
}
545-
disableMode, err2 := tmysql.GetSQLMode(disable)
569+
disableMode, err2 := mysql.GetSQLMode(disable)
546570
if err2 != nil {
547571
return sqlModes, err2
548572
}
549-
enableMode, err3 := tmysql.GetSQLMode(enable)
573+
enableMode, err3 := mysql.GetSQLMode(enable)
550574
if err3 != nil {
551575
return sqlModes, err3
552576
}
@@ -558,9 +582,9 @@ func AdjustSQLModeCompatible(sqlModes string) (string, error) {
558582
}
559583

560584
// GetSQLModeStrBySQLMode get string represent of sql_mode by sql_mode.
561-
func GetSQLModeStrBySQLMode(sqlMode tmysql.SQLMode) string {
585+
func GetSQLModeStrBySQLMode(sqlMode mysql.SQLMode) string {
562586
var sqlModeStr []string
563-
for str, SQLMode := range tmysql.Str2SQLMode {
587+
for str, SQLMode := range mysql.Str2SQLMode {
564588
if sqlMode&SQLMode != 0 {
565589
sqlModeStr = append(sqlModeStr, str)
566590
}

pkg/sink/mysql/mysql_writer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ const (
3838

3939
defaultSupportVectorVersion = "8.4.0"
4040

41+
defaultRunningAddIndexNewSQLVersion = "8.5.0"
42+
4143
defaultErrorCausedSafeModeDuration = 5 * time.Second
4244
)
4345

@@ -108,10 +110,10 @@ func (w *Writer) SetTableSchemaStore(tableSchemaStore *commonEvent.TableSchemaSt
108110
}
109111

110112
func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error {
111-
// if w.cfg.IsTiDB {
112-
// // first we check whether there is some async ddl executed now.
113-
// w.waitAsyncDDLDone(event)
114-
// }
113+
if w.cfg.IsTiDB {
114+
// first we check whether there is some async ddl executed now.
115+
w.waitAsyncDDLDone(event)
116+
}
115117
if w.cfg.IsTiDB || !event.TiDBOnly {
116118
// we write ddl ts before ddl first, and update the ddl ts item after ddl executed,
117119
// to ensure the atomic with ddl writing when server is restarted.

pkg/sink/mysql/mysql_writer_ddl.go

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
2626
"github.com/pingcap/ticdc/pkg/errors"
2727
"github.com/pingcap/ticdc/pkg/retry"
28+
"github.com/pingcap/tidb/dumpling/export"
2829
timodel "github.com/pingcap/tidb/pkg/meta/model"
2930
"go.uber.org/zap"
3031
)
@@ -189,23 +190,16 @@ func (w *Writer) waitAsyncDDLDone(event *commonEvent.DDLEvent) {
189190
return
190191
}
191192

192-
var relatedTableIDs []int64
193193
switch event.GetBlockedTables().InfluenceType {
194-
case commonEvent.InfluenceTypeNormal:
195-
relatedTableIDs = event.GetBlockedTables().TableIDs
196194
// db-class, all-class ddl with not affect by async ddl, just return
197195
case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll:
198196
return
199197
}
200198

201-
for _, tableID := range relatedTableIDs {
202-
// tableID 0 means table trigger, which can't do async ddl
203-
if tableID == 0 {
204-
continue
205-
}
199+
for _, blockedTable := range event.GetBlockedTableNames() {
206200
// query the downstream,
207201
// if the ddl is still running, we should wait for it.
208-
err := w.checkAndWaitAsyncDDLDoneDownstream(tableID)
202+
err := w.checkAndWaitAsyncDDLDoneDownstream(blockedTable.SchemaName, blockedTable.TableName)
209203
if err != nil {
210204
log.Error("check previous asynchronous ddl failed",
211205
zap.String("keyspace", w.ChangefeedID.Keyspace()),
@@ -216,51 +210,45 @@ func (w *Writer) waitAsyncDDLDone(event *commonEvent.DDLEvent) {
216210
}
217211

218212
// true means the async ddl is still running, false means the async ddl is done.
219-
func (w *Writer) doQueryAsyncDDL(tableID int64, query string) (bool, error) {
213+
func (w *Writer) doQueryAsyncDDL(query string) (bool, error) {
220214
start := time.Now()
221215
rows, err := w.db.QueryContext(w.ctx, query)
222216
log.Debug("query duration", zap.Any("duration", time.Since(start)), zap.Any("query", query))
223217
if err != nil {
224218
return false, errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query)))
225219
}
226-
227-
defer rows.Close()
228-
var jobID int64
229-
var jobType string
230-
var schemaState string
231-
var state string
232-
233-
noRows := true
234-
for rows.Next() {
235-
noRows = false
236-
err := rows.Scan(&jobID, &jobType, &schemaState, &state)
237-
if err != nil {
238-
return false, errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query)))
239-
}
240-
241-
log.Info("async ddl is still running",
220+
rets, err := export.GetSpecifiedColumnValuesAndClose(rows, "JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "STATE", "QUERY")
221+
if err != nil {
222+
log.Error("check previous asynchronous ddl failed",
242223
zap.String("changefeed", w.ChangefeedID.String()),
243-
zap.Duration("checkDuration", time.Since(start)),
244-
zap.Any("tableID", tableID),
245-
zap.Any("jobID", jobID),
246-
zap.String("jobType", jobType),
247-
zap.String("schemaState", schemaState),
248-
zap.String("state", state))
249-
break
224+
zap.Error(err))
225+
return false, errors.Trace(err)
250226
}
251227

252-
if noRows {
228+
if len(rets) == 0 {
253229
return false, nil
254230
}
231+
ret := rets[0]
232+
jobID, jobType, schemaState, state, runningDDL := ret[0], ret[1], ret[2], ret[3], ret[4]
233+
log.Info("async ddl is still running",
234+
zap.String("changefeed", w.ChangefeedID.String()),
235+
zap.Duration("checkDuration", time.Since(start)),
236+
zap.String("runningDDL", runningDDL),
237+
zap.String("query", query),
238+
zap.Any("jobID", jobID),
239+
zap.String("jobType", jobType),
240+
zap.String("schemaState", schemaState),
241+
zap.String("state", state))
255242

256243
return true, nil
257244
}
258245

259246
// query the ddl jobs to find the state of the async ddl
260247
// if the ddl is still running, we should wait for it.
261-
func (w *Writer) checkAndWaitAsyncDDLDoneDownstream(tableID int64) error {
262-
query := fmt.Sprintf(checkRunningAddIndexSQL, tableID)
263-
running, err := w.doQueryAsyncDDL(tableID, query)
248+
func (w *Writer) checkAndWaitAsyncDDLDoneDownstream(schemaName, tableName string) error {
249+
checkSQL := getCheckRunningAddIndexSQL(w.cfg)
250+
query := fmt.Sprintf(checkSQL, schemaName, tableName)
251+
running, err := w.doQueryAsyncDDL(query)
264252
if err != nil {
265253
return err
266254
}
@@ -276,7 +264,7 @@ func (w *Writer) checkAndWaitAsyncDDLDoneDownstream(tableID int64) error {
276264
case <-w.ctx.Done():
277265
return nil
278266
case <-ticker.C:
279-
running, err = w.doQueryAsyncDDL(tableID, query)
267+
running, err = w.doQueryAsyncDDL(query)
280268
if err != nil {
281269
return err
282270
}

pkg/sink/mysql/mysql_writer_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@ import (
2121
"time"
2222

2323
"github.com/DATA-DOG/go-sqlmock"
24+
"github.com/go-sql-driver/mysql"
2425
lru "github.com/hashicorp/golang-lru"
2526
"github.com/pingcap/errors"
27+
"github.com/pingcap/log"
2628
"github.com/pingcap/ticdc/heartbeatpb"
2729
"github.com/pingcap/ticdc/pkg/common"
2830
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
2931
"github.com/pingcap/ticdc/pkg/config/kerneltype"
3032
cerror "github.com/pingcap/ticdc/pkg/errors"
3133
"github.com/pingcap/ticdc/pkg/metrics"
34+
"github.com/pingcap/tidb/br/pkg/version"
3235
ticonfig "github.com/pingcap/tidb/pkg/config"
3336
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
37+
timodel "github.com/pingcap/tidb/pkg/meta/model"
3438
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
3539
"github.com/stretchr/testify/require"
3640
)
@@ -65,6 +69,8 @@ func newTestMysqlWriterForTiDB(t *testing.T) (*Writer, *sql.DB, sqlmock.Sqlmock)
6569
cfg.SyncPointRetention = 100 * time.Second
6670
cfg.IsTiDB = true
6771
cfg.EnableDDLTs = defaultEnableDDLTs
72+
cfg.ServerInfo = version.ParseServerInfo(defaultRunningAddIndexNewSQLVersion)
73+
6874
changefeedID := common.NewChangefeedID4Test("test", "test")
6975
statistics := metrics.NewStatistics(changefeedID, "mysqlSink")
7076
writer := NewWriter(ctx, 0, db, cfg, changefeedID, statistics)
@@ -439,7 +445,6 @@ func TestMysqlWriter_RemoveDDLTsTable(t *testing.T) {
439445
require.NoError(t, err)
440446
}
441447

442-
/*
443448
// Test the async ddl can be write successfully
444449
func TestMysqlWriter_AsyncDDL(t *testing.T) {
445450
writer, db, mock := newTestMysqlWriterForTiDB(t)
@@ -502,7 +507,7 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
502507
err = mock.ExpectationsWereMet()
503508
require.NoError(t, err)
504509

505-
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, 1)).WillReturnError(sqlmock.ErrCancelled)
510+
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "t")).WillReturnError(sqlmock.ErrCancelled)
506511
mock.ExpectBegin()
507512
mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 1, 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), ddl_ts=VALUES(ddl_ts), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1))
508513
mock.ExpectCommit()
@@ -538,7 +543,7 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
538543
mock.ExpectCommit()
539544

540545
// for add column ddl for table t
541-
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, 1)).WillReturnError(sqlmock.ErrCancelled)
546+
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "t")).WillReturnError(sqlmock.ErrCancelled)
542547
mock.ExpectBegin()
543548
mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, finished, is_syncpoint) VALUES ('default', 'test/test', '3', 1, 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), ddl_ts=VALUES(ddl_ts), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1))
544549
mock.ExpectCommit()
@@ -567,6 +572,9 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
567572
InfluenceType: commonEvent.InfluenceTypeNormal,
568573
TableIDs: []int64{1},
569574
},
575+
BlockedTableNames: []commonEvent.SchemaTableName{
576+
{SchemaName: addIndexjob.SchemaName, TableName: addIndexjob.TableName},
577+
},
570578
}
571579

572580
err = writer.FlushDDLEvent(addIndexddlEvent)
@@ -598,6 +606,9 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
598606
InfluenceType: commonEvent.InfluenceTypeNormal,
599607
TableIDs: []int64{0},
600608
},
609+
BlockedTableNames: []commonEvent.SchemaTableName{
610+
{SchemaName: job2.SchemaName, TableName: job2.TableName},
611+
},
601612
NeedAddedTables: []commonEvent.Table{{TableID: 2, SchemaID: 1}},
602613
}
603614

@@ -619,6 +630,9 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
619630
InfluenceType: commonEvent.InfluenceTypeNormal,
620631
TableIDs: []int64{1},
621632
},
633+
BlockedTableNames: []commonEvent.SchemaTableName{
634+
{SchemaName: job.SchemaName, TableName: job.TableName},
635+
},
622636
}
623637

624638
err = writer.FlushDDLEvent(ddlEvent)
@@ -628,7 +642,6 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) {
628642
err = mock.ExpectationsWereMet()
629643
require.NoError(t, err)
630644
}
631-
*/
632645

633646
func TestCheckIsDuplicateEntryError(t *testing.T) {
634647
writer, db, _ := newTestMysqlWriter(t)

tests/integration_tests/ddl_wait/run.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ function run() {
3333

3434
run_sql "alter table test.t modify column col decimal(30,10);"
3535
run_sql "alter table test.t add index (col);"
36+
run_sql "alter table test.t add index (col);"
37+
sleep 3
38+
cleanup_process $CDC_BINARY
39+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
3640
# make sure all tables are equal in upstream and downstream
3741
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
3842

0 commit comments

Comments
 (0)