Skip to content

Commit 64e1011

Browse files
author
ffffwh
committed
refactor around replicateDoDb and move GetCandidateUniqueKeys()
1 parent f5835a1 commit 64e1011

File tree

4 files changed

+122
-139
lines changed

4 files changed

+122
-139
lines changed

drivers/mysql/common/tabletypes.go

+7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ func NewSchemaContext(name string) *SchemaContext {
2222
TableMap: map[string]*TableContext{},
2323
}
2424
}
25+
func (sc *SchemaContext) AddTable(table *Table) (err error) {
26+
sc.TableMap[table.TableName], err = NewTableContext(table)
27+
if err != nil {
28+
return err
29+
}
30+
return nil
31+
}
2532
func (sc *SchemaContext) AddTables(tables []*Table) (err error) {
2633
for _, table := range tables {
2734
sc.TableMap[table.TableName], err = NewTableContext(table)

drivers/mysql/mysql/base/utils.go

+94
Original file line numberDiff line numberDiff line change
@@ -529,3 +529,97 @@ func GetTableColumnsSqle(sqleContext *sqle.Context, schema string,
529529
//r.SetCharset() // TODO
530530
return r, fkParents, nil
531531
}
532+
533+
func GetCandidateUniqueKeys(logger g.LoggerType, db usql.QueryAble, databaseName, tableName string) (uniqueKeys [](*common.UniqueKey), err error) {
534+
query := `
535+
SELECT UNIQUES.INDEX_NAME, UNIQUES.COLUMN_NAMES, LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment, has_nullable
536+
FROM INFORMATION_SCHEMA.COLUMNS
537+
INNER JOIN
538+
(SELECT TABLE_SCHEMA, TABLE_NAME, INDEX_NAME,
539+
GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC) AS COLUMN_NAMES,
540+
SUBSTRING_INDEX(GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC), ',', 1) AS FIRST_COLUMN_NAME,
541+
SUM(NULLABLE='YES') > 0 AS has_nullable
542+
FROM INFORMATION_SCHEMA.STATISTICS
543+
WHERE NON_UNIQUE=0 AND TABLE_SCHEMA = ? AND TABLE_NAME = ?
544+
GROUP BY TABLE_SCHEMA,TABLE_NAME,INDEX_NAME) AS UNIQUES
545+
ON (COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA
546+
AND COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME
547+
AND COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME)
548+
WHERE COLUMNS.TABLE_SCHEMA = ? AND COLUMNS.TABLE_NAME = ?`
549+
/*query := `
550+
SELECT
551+
COLUMNS.TABLE_SCHEMA,
552+
COLUMNS.TABLE_NAME,
553+
COLUMNS.COLUMN_NAME,
554+
UNIQUES.INDEX_NAME,
555+
UNIQUES.COLUMN_NAMES,
556+
UNIQUES.COUNT_COLUMN_IN_INDEX,
557+
COLUMNS.DATA_TYPE,
558+
COLUMNS.CHARACTER_SET_NAME,
559+
LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment,
560+
has_nullable
561+
FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN (
562+
SELECT
563+
TABLE_SCHEMA,
564+
TABLE_NAME,
565+
INDEX_NAME,
566+
COUNT(*) AS COUNT_COLUMN_IN_INDEX,
567+
GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC) AS COLUMN_NAMES,
568+
SUBSTRING_INDEX(GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC), ',', 1) AS FIRST_COLUMN_NAME,
569+
SUM(NULLABLE='YES') > 0 AS has_nullable
570+
FROM INFORMATION_SCHEMA.STATISTICS
571+
WHERE
572+
NON_UNIQUE=0
573+
AND TABLE_SCHEMA = ?
574+
AND TABLE_NAME = ?
575+
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
576+
) AS UNIQUES
577+
ON (
578+
COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA AND
579+
COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME AND
580+
COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
581+
)
582+
WHERE
583+
COLUMNS.TABLE_SCHEMA = ?
584+
AND COLUMNS.TABLE_NAME = ?
585+
ORDER BY
586+
COLUMNS.TABLE_SCHEMA, COLUMNS.TABLE_NAME,
587+
CASE UNIQUES.INDEX_NAME
588+
WHEN 'PRIMARY' THEN 0
589+
ELSE 1
590+
END,
591+
CASE has_nullable
592+
WHEN 0 THEN 0
593+
ELSE 1
594+
END,
595+
CASE IFNULL(CHARACTER_SET_NAME, '')
596+
WHEN '' THEN 0
597+
ELSE 1
598+
END,
599+
CASE DATA_TYPE
600+
WHEN 'tinyint' THEN 0
601+
WHEN 'smallint' THEN 1
602+
WHEN 'int' THEN 2
603+
WHEN 'bigint' THEN 3
604+
ELSE 100
605+
END,
606+
COUNT_COLUMN_IN_INDEX
607+
`*/
608+
err = usql.QueryRowsMap(db, query, func(m usql.RowMap) error {
609+
columns := common.ParseColumnList(m.GetString("COLUMN_NAMES"))
610+
uniqueKey := &common.UniqueKey{
611+
Name: m.GetString("INDEX_NAME"),
612+
Columns: *columns,
613+
HasNullable: m.GetBool("has_nullable"),
614+
IsAutoIncrement: m.GetBool("is_auto_increment"),
615+
LastMaxVals: make([]string, len(columns.Columns)),
616+
}
617+
uniqueKeys = append(uniqueKeys, uniqueKey)
618+
return nil
619+
}, databaseName, tableName, databaseName, tableName)
620+
if err != nil {
621+
return uniqueKeys, err
622+
}
623+
logger.Debug("Potential unique keys.", "schema", databaseName, "table", tableName, "uniqueKeys", uniqueKeys)
624+
return uniqueKeys, nil
625+
}

drivers/mysql/mysql/extractor.go

+17-39
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,8 @@ func (e *Extractor) inspectTables() (err error) {
400400
var doDbs []*common.DataSource
401401
// Get all db from TableSchemaRegex regex and get all tableSchemaRename
402402
for _, doDb := range e.mysqlContext.ReplicateDoDb {
403-
var regex string
404403
if doDb.TableSchemaRegex != "" && doDb.TableSchemaRename != "" {
405-
regex = doDb.TableSchemaRegex
404+
regex := doDb.TableSchemaRegex
406405
schemaRenameRegex := doDb.TableSchemaRename
407406
for _, db := range dbsFiltered {
408407
newdb := &common.DataSource{}
@@ -433,19 +432,17 @@ func (e *Extractor) inspectTables() (err error) {
433432
}
434433
}
435434
for _, doDb := range doDbs {
436-
db := &common.DataSource{
437-
TableSchema: doDb.TableSchema,
438-
TableSchemaRegex: doDb.TableSchemaRegex,
439-
TableSchemaRename: doDb.TableSchemaRename,
440-
}
435+
schemaCtx := common.NewSchemaContext(doDb.TableSchema)
436+
e.replicateDoDb[doDb.TableSchema] = schemaCtx
441437

442438
existedTables, err := sql.ShowTables(e.db, doDb.TableSchema, e.mysqlContext.ExpandSyntaxSupport)
443439
if err != nil {
444440
return err
445441
}
446442
tbsFiltered := []*common.Table{}
447443
for _, tb := range existedTables {
448-
if len(e.mysqlContext.ReplicateIgnoreDb) > 0 && common.IgnoreTbByReplicateIgnoreDb(e.mysqlContext.ReplicateIgnoreDb, db.TableSchema, tb.TableName) {
444+
if len(e.mysqlContext.ReplicateIgnoreDb) > 0 &&
445+
common.IgnoreTbByReplicateIgnoreDb(e.mysqlContext.ReplicateIgnoreDb, doDb.TableSchema, tb.TableName) {
449446
continue
450447
}
451448
tbsFiltered = append(tbsFiltered, tb)
@@ -460,7 +457,10 @@ func (e *Extractor) inspectTables() (err error) {
460457
"schema", doDb.TableSchema, "table", doTb.TableName)
461458
continue
462459
}
463-
db.Tables = append(db.Tables, doTb)
460+
err = schemaCtx.AddTable(doTb)
461+
if err != nil {
462+
return err
463+
}
464464
}
465465
} else { // replicate selected tables
466466
for _, doTb := range doDb.Tables {
@@ -488,7 +488,10 @@ func (e *Extractor) inspectTables() (err error) {
488488
e.logger.Warn("ValidateOriginalTable error", "TableSchema", doDb.TableSchema, "TableName", doTb.TableName, "err", err)
489489
continue
490490
}
491-
db.Tables = append(db.Tables, newTable)
491+
err = schemaCtx.AddTable(newTable)
492+
if err != nil {
493+
return err
494+
}
492495
}
493496
//if db.Tables == nil {
494497
// return fmt.Errorf("src table was nil")
@@ -505,22 +508,17 @@ func (e *Extractor) inspectTables() (err error) {
505508
}
506509
newTable := &common.Table{}
507510
*newTable = *doTb
508-
db.Tables = append(db.Tables, newTable)
511+
err = schemaCtx.AddTable(newTable)
512+
if err != nil {
513+
return err
514+
}
509515
}
510516
} else {
511517
return fmt.Errorf("table configuration error")
512518
}
513-
514519
}
515520
}
516-
schemaCtx := common.NewSchemaContext(db.TableSchema)
517-
err = schemaCtx.AddTables(db.Tables)
518-
if err != nil {
519-
return err
520-
}
521-
e.replicateDoDb[db.TableSchema] = schemaCtx
522521
}
523-
// e.mysqlContext.ReplicateDoDb = e.replicateDoDb
524522
} else { // empty DoDB. replicate all db/tb
525523
for _, dbName := range dbsFiltered {
526524
ds := &common.DataSource{
@@ -551,26 +549,6 @@ func (e *Extractor) inspectTables() (err error) {
551549
e.replicateDoDb[ds.TableSchema] = schemaCtx
552550
}
553551
}
554-
/*if e.mysqlContext.ExpandSyntaxSupport {
555-
db_mysql := &config.DataSource{
556-
TableSchema: "mysql",
557-
}
558-
db_mysql.Tables = append(db_mysql.Tables,
559-
&config.Table{
560-
TableSchema: "mysql",
561-
TableName: "user",
562-
},
563-
&config.Table{
564-
TableSchema: "mysql",
565-
TableName: "proc",
566-
},
567-
&config.Table{
568-
TableSchema: "mysql",
569-
TableName: "func",
570-
},
571-
)
572-
e.replicateDoDb = append(e.replicateDoDb, db_mysql)
573-
}*/
574552

575553
return nil
576554
}

drivers/mysql/mysql/inspect.go

+4-100
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,10 @@ func (i *Inspector) ValidateOriginalTable(databaseName, tableName string, table
174174
return nil
175175
}
176176

177-
func (i *Inspector) InspectTableColumnsAndUniqueKeys(databaseName, tableName string) (columns *common.ColumnList, uniqueKeys [](*common.UniqueKey), err error) {
178-
uniqueKeys, err = i.getCandidateUniqueKeys(databaseName, tableName)
177+
func (i *Inspector) InspectTableColumnsAndUniqueKeys(databaseName, tableName string) (
178+
columns *common.ColumnList, uniqueKeys []*common.UniqueKey, err error) {
179+
180+
uniqueKeys, err = ubase.GetCandidateUniqueKeys(i.logger, i.db, databaseName, tableName)
179181
if err != nil {
180182
return columns, uniqueKeys, err
181183
}
@@ -368,101 +370,3 @@ func (i *Inspector) validateTableTriggers(databaseName, tableName string) error
368370
}
369371
return nil
370372
}
371-
372-
// getCandidateUniqueKeys investigates a table and returns the list of unique keys
373-
// candidate for chunking
374-
func (i *Inspector) getCandidateUniqueKeys(databaseName, tableName string) (uniqueKeys [](*common.UniqueKey), err error) {
375-
query := `SELECT
376-
UNIQUES.INDEX_NAME,UNIQUES.COLUMN_NAMES,LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment,has_nullable
377-
FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN (
378-
SELECT
379-
TABLE_SCHEMA,TABLE_NAME,INDEX_NAME,GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC) AS COLUMN_NAMES,
380-
SUBSTRING_INDEX(GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC), ',', 1) AS FIRST_COLUMN_NAME,
381-
SUM(NULLABLE='YES') > 0 AS has_nullable
382-
FROM INFORMATION_SCHEMA.STATISTICS
383-
WHERE
384-
NON_UNIQUE=0 AND TABLE_SCHEMA = ? AND TABLE_NAME = ?
385-
GROUP BY TABLE_SCHEMA,TABLE_NAME,INDEX_NAME
386-
) AS UNIQUES
387-
ON (
388-
COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA AND COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME AND COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
389-
)
390-
WHERE
391-
COLUMNS.TABLE_SCHEMA = ? AND COLUMNS.TABLE_NAME = ?`
392-
/*query := `
393-
SELECT
394-
COLUMNS.TABLE_SCHEMA,
395-
COLUMNS.TABLE_NAME,
396-
COLUMNS.COLUMN_NAME,
397-
UNIQUES.INDEX_NAME,
398-
UNIQUES.COLUMN_NAMES,
399-
UNIQUES.COUNT_COLUMN_IN_INDEX,
400-
COLUMNS.DATA_TYPE,
401-
COLUMNS.CHARACTER_SET_NAME,
402-
LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment,
403-
has_nullable
404-
FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN (
405-
SELECT
406-
TABLE_SCHEMA,
407-
TABLE_NAME,
408-
INDEX_NAME,
409-
COUNT(*) AS COUNT_COLUMN_IN_INDEX,
410-
GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC) AS COLUMN_NAMES,
411-
SUBSTRING_INDEX(GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC), ',', 1) AS FIRST_COLUMN_NAME,
412-
SUM(NULLABLE='YES') > 0 AS has_nullable
413-
FROM INFORMATION_SCHEMA.STATISTICS
414-
WHERE
415-
NON_UNIQUE=0
416-
AND TABLE_SCHEMA = ?
417-
AND TABLE_NAME = ?
418-
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
419-
) AS UNIQUES
420-
ON (
421-
COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA AND
422-
COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME AND
423-
COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
424-
)
425-
WHERE
426-
COLUMNS.TABLE_SCHEMA = ?
427-
AND COLUMNS.TABLE_NAME = ?
428-
ORDER BY
429-
COLUMNS.TABLE_SCHEMA, COLUMNS.TABLE_NAME,
430-
CASE UNIQUES.INDEX_NAME
431-
WHEN 'PRIMARY' THEN 0
432-
ELSE 1
433-
END,
434-
CASE has_nullable
435-
WHEN 0 THEN 0
436-
ELSE 1
437-
END,
438-
CASE IFNULL(CHARACTER_SET_NAME, '')
439-
WHEN '' THEN 0
440-
ELSE 1
441-
END,
442-
CASE DATA_TYPE
443-
WHEN 'tinyint' THEN 0
444-
WHEN 'smallint' THEN 1
445-
WHEN 'int' THEN 2
446-
WHEN 'bigint' THEN 3
447-
ELSE 100
448-
END,
449-
COUNT_COLUMN_IN_INDEX
450-
`*/
451-
err = usql.QueryRowsMap(i.db, query, func(m usql.RowMap) error {
452-
columns := common.ParseColumnList(m.GetString("COLUMN_NAMES"))
453-
uniqueKey := &common.UniqueKey{
454-
Name: m.GetString("INDEX_NAME"),
455-
Columns: *columns,
456-
HasNullable: m.GetBool("has_nullable"),
457-
IsAutoIncrement: m.GetBool("is_auto_increment"),
458-
LastMaxVals: make([]string, len(columns.Columns)),
459-
}
460-
uniqueKeys = append(uniqueKeys, uniqueKey)
461-
return nil
462-
}, databaseName, tableName, databaseName, tableName)
463-
if err != nil {
464-
return uniqueKeys, err
465-
}
466-
i.logger.Debug("Potential unique keys.", "schema", databaseName, "table", tableName, "uniqueKeys", uniqueKeys)
467-
return uniqueKeys, nil
468-
}

0 commit comments

Comments
 (0)