Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): option for explicit auto-id-cache handling in create table DDLs #12040

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
@@ -404,6 +404,8 @@ type SyncerConfig struct {
Compact bool `yaml:"compact" toml:"compact" json:"compact"`
MultipleRows bool `yaml:"multiple-rows" toml:"multiple-rows" json:"multiple-rows"`

AutoIDCacheSize uint64 `yaml:"auto-id-cache-size" toml:"auto-id-cache-size" json:"auto-id-cache-size"`

// deprecated
MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"`

@@ -1175,6 +1177,7 @@ type SyncerConfigForDowngrade struct {
SafeModeDuration string `yaml:"safe-mode-duration,omitempty"`
Compact bool `yaml:"compact,omitempty"`
MultipleRows bool `yaml:"multipleRows,omitempty"`
AutoIDCacheSize uint64 `yaml:"auto-id-cache-size,omitempty"`
}

// NewSyncerConfigsForDowngrade converts SyncerConfig to SyncerConfigForDowngrade.
@@ -1195,6 +1198,7 @@ func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[st
EnableANSIQuotes: syncerConfig.EnableANSIQuotes,
Compact: syncerConfig.Compact,
MultipleRows: syncerConfig.MultipleRows,
AutoIDCacheSize: syncerConfig.AutoIDCacheSize,
}
syncerConfigsForDowngrade[configName] = newSyncerConfig
}
8 changes: 6 additions & 2 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
@@ -270,6 +270,9 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *dbconfig.DBConfig,
if incrCfg.ReplBatch != nil {
subTaskCfg.SyncerConfig.Batch = *incrCfg.ReplBatch
}
if incrCfg.AutoIdCacheSize != nil {
subTaskCfg.SyncerConfig.AutoIDCacheSize = *incrCfg.AutoIdCacheSize
}
}
subTaskCfg.ValidatorCfg = defaultValidatorConfig()
// set route,blockAllowList,filter config
@@ -569,8 +572,9 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
taskSourceConfig.FullMigrateConf.Consistency = &consistency
}
taskSourceConfig.IncrMigrateConf = &openapi.TaskIncrMigrateConf{
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
ReplBatch: &oneSubtaskConfig.SyncerConfig.Batch,
ReplThreads: &oneSubtaskConfig.SyncerConfig.WorkerCount,
AutoIdCacheSize: &oneSubtaskConfig.SyncerConfig.AutoIDCacheSize,
}
if oneSubtaskConfig.LoaderConfig.Security != nil {
var certAllowedCN []string
27 changes: 27 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
@@ -1196,3 +1196,30 @@ func TestTaskYamlForDowngrade(t *testing.T) {
require.NoError(t, err)
require.Equal(t, originCfg.TargetDB.Password, decryptedPass)
}

func TestSyncerYamlForDowngrade(t *testing.T) {
syncerConfigName := "syncer-1"
originCfg := TaskConfig{
Name: "test",
TaskMode: ModeFull,
MySQLInstances: []*MySQLInstance{
{
SourceID: "mysql-3306",
SyncerConfigName: syncerConfigName,
},
},
TargetDB: &dbconfig.DBConfig{
Password: "123456",
},
Syncers: map[string]*SyncerConfig{
syncerConfigName: {
AutoIDCacheSize: 100,
},
},
}
content, err := originCfg.YamlForDowngrade()
require.NoError(t, err)
newCfg := &TaskConfig{}
require.NoError(t, newCfg.FromYaml(content))
require.Equal(t, originCfg.Syncers[syncerConfigName].AutoIDCacheSize, newCfg.Syncers[syncerConfigName].AutoIDCacheSize)
}
219 changes: 110 additions & 109 deletions dm/openapi/gen.server.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dm/openapi/gen.types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions dm/openapi/spec/dm.yaml
Original file line number Diff line number Diff line change
@@ -1779,6 +1779,11 @@ components:
type: integer
description: incremental synchronization of batch execution sql quantities
default: 100
auto_id_cache_size:
type: integer
format: uint64
description: "AUTO_ID_CACHE size to use for incremental replication of new tables"
default: 0
TaskSourceConf:
type: object
properties:
37 changes: 37 additions & 0 deletions dm/syncer/ddl.go
Original file line number Diff line number Diff line change
@@ -77,6 +77,7 @@ type DDLWorker struct {
tableRouter *regexprrouter.RouteTable
sourceTableNamesFlavor conn.LowerCaseTableNamesFlavor
collationCompatible string
autoIDCacheSize uint64
charsetAndDefaultCollation map[string]string
idAndCollationMap map[int]string
baList *filter.Filter
@@ -106,6 +107,7 @@ func NewDDLWorker(pLogger *log.Logger, syncer *Syncer) *DDLWorker {
tableRouter: syncer.tableRouter,
sourceTableNamesFlavor: syncer.SourceTableNamesFlavor,
collationCompatible: syncer.cfg.CollationCompatible,
autoIDCacheSize: syncer.cfg.AutoIDCacheSize,
charsetAndDefaultCollation: syncer.charsetAndDefaultCollation,
idAndCollationMap: syncer.idAndCollationMap,
baList: syncer.baList,
@@ -1322,6 +1324,8 @@ func (ddl *DDLWorker) genDDLInfo(qec *queryEventContext, sql string) (*ddlInfo,
ddl.adjustCollation(ddlInfo, qec.eventStatusVars, ddl.charsetAndDefaultCollation, ddl.idAndCollationMap)
}

ddl.adjustAutoIDCache(ddlInfo)

routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.stmtCache, ddlInfo.targetTables)
ddlInfo.routedDDL = routedDDL
return ddlInfo, err
@@ -1487,6 +1491,39 @@ ColumnLoop:
}
}

func (ddl *DDLWorker) adjustAutoIDCache(ddlInfo *ddlInfo) {
if ddl.autoIDCacheSize <= 0 {
return
}

if createStmt, ok := ddlInfo.stmtCache.(*ast.CreateTableStmt); ok {
addAutoIDCache := false
for _, col := range createStmt.Cols {
for _, opt := range col.Options {
if opt.Tp == ast.ColumnOptionAutoIncrement {
addAutoIDCache = true
}
}
}

for _, opt := range createStmt.Options {
switch opt.Tp {
case ast.TableOptionAutoIncrement:
addAutoIDCache = true
case ast.TableOptionAutoIdCache:
return
}
}

if addAutoIDCache {
createStmt.Options = append(createStmt.Options, &ast.TableOption{
Tp: ast.TableOptionAutoIdCache,
UintValue: ddl.autoIDCacheSize,
})
}
}
}

type ddlInfo struct {
originDDL string
routedDDL string
60 changes: 60 additions & 0 deletions dm/syncer/ddl_test.go
Original file line number Diff line number Diff line change
@@ -789,6 +789,66 @@ func TestAdjustCollation(t *testing.T) {
}
}

func TestAdjustAutoIDCache(t *testing.T) {
cases := []struct {
sql string
expectedSQL string
cacheSize uint64
}{
{
"CREATE TABLE `test`.`t1` (`id` INT)",
"CREATE TABLE `test`.`t1` (`id` INT)",
0,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
0,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT) /*T![auto_id_cache] AUTO_ID_CACHE = 1 */",
1,
},
{
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT)",
"CREATE TABLE `test`.`t1` (`id` INT AUTO_INCREMENT) /*T![auto_id_cache] AUTO_ID_CACHE = 30 */",
30,
},
}

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustAutoIDCache")))
p := parser.New()
tab := &filter.Table{
Schema: "test",
Name: "t1",
}

for _, cse := range cases {
syncer := NewSyncer(&config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
AutoIDCacheSize: cse.cacheSize,
},
}, nil, nil)
syncer.tctx = tctx
ddlWorker := NewDDLWorker(&tctx.Logger, syncer)
ddlInfo := &ddlInfo{
originDDL: cse.sql,
routedDDL: cse.sql,
sourceTables: []*filter.Table{tab},
targetTables: []*filter.Table{tab},
}
stmt, err := p.ParseOneStmt(cse.sql, "", "")
require.NoError(t, err)
require.NotNil(t, stmt)
ddlInfo.stmtCache = stmt
ddlWorker.adjustAutoIDCache(ddlInfo)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.stmtCache, ddlInfo.targetTables)
require.NoError(t, err)
require.Equal(t, cse.expectedSQL, routedDDL)
}
}

type mockOnlinePlugin struct {
toFinish map[string]struct{}
}
1 change: 1 addition & 0 deletions dm/tests/dmctl_basic/conf/get_task.yaml
Original file line number Diff line number Diff line change
@@ -141,6 +141,7 @@ syncers:
checkpoint-flush-interval: 1
compact: true
multiple-rows: true
auto-id-cache-size: 0
max-retry: 0
auto-fix-gtid: false
enable-gtid: false
2 changes: 2 additions & 0 deletions dm/tests/import_v10x/conf/task.yaml
Original file line number Diff line number Diff line change
@@ -110,6 +110,7 @@ syncers:
checkpoint-flush-interval: 30
compact: false
multiple-rows: false
auto-id-cache-size: 0
max-retry: 0
auto-fix-gtid: false
enable-gtid: false
@@ -125,6 +126,7 @@ syncers:
checkpoint-flush-interval: 30
compact: false
multiple-rows: false
auto-id-cache-size: 0
max-retry: 0
auto-fix-gtid: false
enable-gtid: true