Skip to content

Commit d222046

Browse files
authored
fix(migrate): improve migration status detection and prevent checksum conflicts (#54)
1 parent ef2c555 commit d222046

2 files changed

Lines changed: 58 additions & 28 deletions

File tree

dtc_test.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ import (
1010
)
1111

1212
func TestDTCWithDB(t *testing.T) {
13-
os.Remove("dtc_db.db")
13+
os.Remove("dtc_1.db")
1414

1515
d, err := sql.Open("sqlite3", "file:dtc_1.db?cache=shared&mode=rwc")
1616
require.NoError(t, err)
1717

18+
_, err = d.Exec("DROP TABLE IF EXISTS `dtc_1`")
19+
require.NoError(t, err)
20+
1821
_, err = d.Exec("CREATE TABLE `dtc_1` (`id` int , `email` varchar(50),`created_at` DATETIME, PRIMARY KEY (`id`))")
1922
require.NoError(t, err)
2023

@@ -30,19 +33,19 @@ func TestDTCWithDB(t *testing.T) {
3033
setup: func() *DTC {
3134
dtc := NewDTC(context.Background(), nil)
3235

33-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
36+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
3437
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 1, "1@mail.com")
3538

3639
return err
3740
}, nil)
3841

39-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
42+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
4043
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 2, "2@mail.com")
4144

4245
return err
4346
}, nil)
4447

45-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
48+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
4649
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 3, "3@mail.com")
4750

4851
return err
@@ -69,19 +72,19 @@ func TestDTCWithDB(t *testing.T) {
6972
name: "multiple_txs_rollback_should_work",
7073
setup: func() *DTC {
7174
dtc := NewDTC(context.Background(), nil)
72-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
75+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
7376
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 11, "1@mail.com")
7477

7578
return err
7679
}, nil)
7780

78-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
81+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
7982
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 12, "2@mail.com")
8083

8184
return err
8285
}, nil)
8386

84-
dtc.Prepare(db.dbs[0], func(ctx context.Context, conn Connector) error {
87+
dtc.Prepare(db.dbs[0], func(_ context.Context, conn Connector) error {
8588
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 13)
8689

8790
return err
@@ -154,19 +157,19 @@ func TestDTCWithDBs(t *testing.T) {
154157
setup: func() *DTC {
155158
dtc := NewDTC(context.Background(), nil)
156159

157-
dtc.Prepare(db1.dbs[0], func(ctx context.Context, conn Connector) error {
160+
dtc.Prepare(db1.dbs[0], func(_ context.Context, conn Connector) error {
158161
_, err := conn.Exec("INSERT INTO `dtc_1` (`id`,`email`) VALUES(?,?)", 1, "1@mail.com")
159162

160163
return err
161164
}, nil)
162165

163-
dtc.Prepare(db2.dbs[0], func(ctx context.Context, conn Connector) error {
166+
dtc.Prepare(db2.dbs[0], func(_ context.Context, conn Connector) error {
164167
_, err := conn.Exec("INSERT INTO `dtc_2` (`id`,`email`) VALUES(?,?)", 2, "2@mail.com")
165168

166169
return err
167170
}, nil)
168171

169-
dtc.Prepare(db3.dbs[0], func(ctx context.Context, conn Connector) error {
172+
dtc.Prepare(db3.dbs[0], func(_ context.Context, conn Connector) error {
170173
_, err := conn.Exec("INSERT INTO `dtc_3` (`id`,`email`) VALUES(?,?)", 3, "3@mail.com")
171174

172175
return err
@@ -193,19 +196,19 @@ func TestDTCWithDBs(t *testing.T) {
193196
name: "multiple_txs_rollback_should_work",
194197
setup: func() *DTC {
195198
dtc := NewDTC(context.Background(), nil)
196-
dtc.Prepare(db1.dbs[0], func(ctx context.Context, conn Connector) error {
199+
dtc.Prepare(db1.dbs[0], func(_ context.Context, conn Connector) error {
197200
_, err := conn.Exec("INSERT INTO `dtc_1`(`id`,`email`) VALUES(?,?)", 11, "1@mail.com")
198201

199202
return err
200203
}, nil)
201204

202-
dtc.Prepare(db2.dbs[0], func(ctx context.Context, conn Connector) error {
205+
dtc.Prepare(db2.dbs[0], func(_ context.Context, conn Connector) error {
203206
_, err := conn.Exec("INSERT INTO `dtc_2`(`id`,`email`) VALUES(?,?)", 12, "2@mail.com")
204207

205208
return err
206209
}, nil)
207210

208-
dtc.Prepare(db3.dbs[0], func(ctx context.Context, conn Connector) error {
211+
dtc.Prepare(db3.dbs[0], func(_ context.Context, conn Connector) error {
209212
_, err := conn.Exec("INSERT INTO `dtc_3`(`id`,`email`) VALUES(?,?)", 13)
210213

211214
return err
@@ -270,28 +273,28 @@ func TestDTCRevert(t *testing.T) {
270273

271274
dtc := NewDTC(context.Background(), nil)
272275

273-
dtc.Prepare(db1.dbs[0], func(ctx context.Context, conn Connector) error {
276+
dtc.Prepare(db1.dbs[0], func(_ context.Context, conn Connector) error {
274277
_, err := conn.Exec("INSERT INTO `dtc_1` (`id`,`email`) VALUES(?,?)", 1, "1@mail.com")
275278
return err
276-
}, func(ctx context.Context, c Connector) error {
279+
}, func(_ context.Context, c Connector) error {
277280
_, err := c.Exec("DELETE FROM `dtc_1` WHERE id=?", 1)
278281
return err
279282
})
280283

281-
dtc.Prepare(db2.dbs[0], func(ctx context.Context, conn Connector) error {
284+
dtc.Prepare(db2.dbs[0], func(_ context.Context, conn Connector) error {
282285
_, err := conn.Exec("INSERT INTO `dtc_2` (`id`,`email`) VALUES(?,?)", 2, "2@mail.com")
283286

284287
return err
285-
}, func(ctx context.Context, c Connector) error {
288+
}, func(_ context.Context, c Connector) error {
286289
_, err := c.Exec("DELETE FROM `dtc_2` WHERE id=?", 2)
287290
return err
288291
})
289292

290-
dtc.Prepare(db3.dbs[0], func(ctx context.Context, conn Connector) error {
293+
dtc.Prepare(db3.dbs[0], func(_ context.Context, conn Connector) error {
291294
_, err := conn.Exec("INSERT INTO `dtc_3` (`id`,`email`) VALUES(?,?)", 3, "3@mail.com")
292295

293296
return err
294-
}, func(ctx context.Context, c Connector) error {
297+
}, func(_ context.Context, c Connector) error {
295298
_, err := c.Exec("DELETE FROM `dtc_3` WHERE id=?", 3)
296299
return err
297300
})

migrate/migrator.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ const TABLE_ROTATIONS = "CREATE TABLE IF NOT EXISTS sqle_rotations(" +
4141
"execution_time varchar(25) NOT NULL," +
4242
"PRIMARY KEY (checksum, rotated_name));"
4343

44+
type MigrationStatus int
45+
46+
const (
47+
MigrationStatusNew MigrationStatus = iota // new migration, needs to be executed
48+
MigrationStatusExecuted // already executed, checksum matches
49+
MigrationStatusModified // already executed but checksum changed
50+
)
51+
4452
type Migrator struct {
4553
dbs []*sqle.DB
4654
suffix string
@@ -238,16 +246,21 @@ func (m *Migrator) startMigrate(ctx context.Context, db *sqle.DB) error {
238246
err = db.Transaction(ctx, nil, func(ctx context.Context, tx *sqle.Tx) error {
239247

240248
for i, s := range v.Migrations {
241-
yes, err := m.isMigrated(tx, s)
249+
status, err := m.getMigrationStatus(tx, v.Name, s)
242250
if err != nil {
243251
return err
244252
}
245253

246-
if yes {
254+
if status == MigrationStatusExecuted {
247255
log.Printf("│ »[%*d/%d] %-35s %-10s [✔]", w, i+1, n, s.Name, "")
248256
continue
249257
}
250258

259+
if status == MigrationStatusModified {
260+
log.Printf("│ »[%*d/%d] %-35s %-10s [!]", w, i+1, n, s.Name, "")
261+
continue
262+
}
263+
251264
rotations := m.buildRotations(s.Rotate, s.RotateBegin, s.RotateEnd)
252265

253266
now := time.Now()
@@ -309,18 +322,32 @@ func (m *Migrator) startMigrate(ctx context.Context, db *sqle.DB) error {
309322
return nil
310323
}
311324

312-
func (*Migrator) isMigrated(tx *sqle.Tx, s Migration) (bool, error) {
325+
func (m *Migrator) getMigrationStatus(tx *sqle.Tx, version string, s Migration) (MigrationStatus, error) {
326+
// First check if checksum already exists (most common case: script already executed)
313327
var checksum string
314-
err := tx.QueryRow("SELECT checksum FROM sqle_migrations WHERE checksum = ?", s.Checksum).Scan(&checksum)
328+
err := tx.QueryRow("SELECT checksum FROM sqle_migrations WHERE checksum = ?",
329+
s.Checksum).Scan(&checksum)
330+
if err == nil {
331+
// Checksum exists, meaning a script with same content was already executed
332+
return MigrationStatusExecuted, nil
333+
}
334+
if !errors.Is(err, sql.ErrNoRows) {
335+
return MigrationStatusNew, err
336+
}
337+
338+
// Checksum doesn't exist, check if a script with same name and rank was modified
339+
err = tx.QueryRow("SELECT checksum FROM sqle_migrations WHERE module = ? AND version = ? AND name = ? AND rank = ?",
340+
m.module, version, s.Name, s.Rank).Scan(&checksum)
315341
if err != nil {
316342
if errors.Is(err, sql.ErrNoRows) {
317-
return false, nil
343+
// No record with same name and rank exists, it's a new script
344+
return MigrationStatusNew, nil
318345
}
319-
320-
return false, err
321-
346+
return MigrationStatusNew, err
322347
}
323-
return checksum != "", nil
348+
349+
// Record with same name and rank exists but checksum is different, script content was modified
350+
return MigrationStatusModified, nil
324351
}
325352

326353
func (*Migrator) buildRotations(r shardid.TableRotate, begin, end time.Time) []string {

0 commit comments

Comments
 (0)