Skip to content

Commit eb82c46

Browse files
committed
Fix migrations.
1 parent 0bb4f28 commit eb82c46

File tree

1 file changed

+114
-17
lines changed

1 file changed

+114
-17
lines changed

crates/core/src/view_admin.rs

+114-17
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,7 @@ create_sqlite_text_fn!(
111111
"powersync_external_table_name"
112112
);
113113

114-
fn powersync_init_impl(
115-
ctx: *mut sqlite::context,
116-
_args: &[*mut sqlite::value],
117-
) -> Result<String, SQLiteError> {
114+
fn powersync_migrate(ctx: *mut sqlite::context, target_version: i32) -> Result<(), SQLiteError> {
118115
let local_db = ctx.db_handle();
119116

120117
// language=SQLite
@@ -131,18 +128,16 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
131128
return Err(SQLiteError::from(ResultCode::ABORT));
132129
}
133130

134-
const CODE_VERSION: i32 = 5;
135-
136131
let mut current_version = current_version_stmt.column_int(0)?;
137132

138-
while current_version > CODE_VERSION {
133+
while current_version > target_version {
139134
// Run down migrations.
140135
// This is rare, we don't worry about optimizing this.
141136

142137
current_version_stmt.reset()?;
143138

144139
let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?;
145-
down_migrations_stmt.bind_int(1, CODE_VERSION)?;
140+
down_migrations_stmt.bind_int(1, target_version)?;
146141

147142
let mut down_sql: Vec<String> = alloc::vec![];
148143

@@ -151,14 +146,71 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
151146
down_sql.push(sql.to_string());
152147
}
153148

149+
if current_version == 5 {
150+
down_sql.push(
151+
"\
152+
ALTER TABLE ps_buckets RENAME TO ps_buckets_5;
153+
ALTER TABLE ps_oplog RENAME TO ps_oplog_5;
154+
155+
CREATE TABLE ps_buckets(
156+
name TEXT PRIMARY KEY,
157+
last_applied_op INTEGER NOT NULL DEFAULT 0,
158+
last_op INTEGER NOT NULL DEFAULT 0,
159+
target_op INTEGER NOT NULL DEFAULT 0,
160+
add_checksum INTEGER NOT NULL DEFAULT 0,
161+
pending_delete INTEGER NOT NULL DEFAULT 0
162+
, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0);
163+
164+
INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)
165+
SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5;
166+
167+
CREATE TABLE ps_oplog(
168+
bucket TEXT NOT NULL,
169+
op_id INTEGER NOT NULL,
170+
op INTEGER NOT NULL,
171+
row_type TEXT,
172+
row_id TEXT,
173+
key TEXT,
174+
data TEXT,
175+
hash INTEGER NOT NULL,
176+
superseded INTEGER NOT NULL);
177+
178+
CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0;
179+
CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id);
180+
CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0;
181+
182+
INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)
183+
SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0
184+
FROM ps_oplog_5 oplog
185+
JOIN ps_buckets_5
186+
ON ps_buckets_5.id = oplog.bucket;
187+
188+
DROP TABLE ps_oplog_5;
189+
DROP TABLE ps_buckets_5;
190+
191+
INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)
192+
SELECT '$local', 1, 4, r.row_type, r.row_id, 0, 0
193+
FROM ps_updated_rows r;
194+
INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES('$local', 1, 0, 9223372036854775807);
195+
196+
DROP TABLE ps_updated_rows
197+
"
198+
.to_string(),
199+
);
200+
}
201+
154202
for sql in down_sql {
155203
let rs = local_db.exec_safe(&sql);
156204
if let Err(code) = rs {
157205
return Err(SQLiteError(
158206
code,
159207
Some(format!(
160-
"Down migration failed for {:} {:}",
161-
current_version, sql
208+
"Down migration failed for {:} {:} {:}",
209+
current_version,
210+
sql,
211+
local_db
212+
.errmsg()
213+
.unwrap_or(String::from("Conversion error"))
162214
)),
163215
));
164216
}
@@ -188,8 +240,6 @@ CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations
188240
}
189241
current_version_stmt.reset()?;
190242

191-
current_version_stmt.reset()?;
192-
193243
if current_version < 1 {
194244
// language=SQLite
195245
local_db
@@ -229,7 +279,7 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL);
229279
.into_db_result(local_db)?;
230280
}
231281

232-
if current_version < 2 {
282+
if current_version < 2 && target_version >= 2 {
233283
// language=SQLite
234284
local_db.exec_safe("\
235285
CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER);
@@ -241,7 +291,7 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('
241291
").into_db_result(local_db)?;
242292
}
243293

244-
if current_version < 3 {
294+
if current_version < 3 && target_version >= 3 {
245295
// language=SQLite
246296
local_db.exec_safe("\
247297
CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB);
@@ -251,7 +301,7 @@ INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('
251301
").into_db_result(local_db)?;
252302
}
253303

254-
if current_version < 4 {
304+
if current_version < 4 && target_version >= 4 {
255305
// language=SQLite
256306
local_db.exec_safe("\
257307
ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0;
@@ -271,7 +321,7 @@ INSERT INTO ps_migration(id, down_migrations)
271321
").into_db_result(local_db)?;
272322
}
273323

274-
if current_version < 5 {
324+
if current_version < 5 && target_version >= 5 {
275325
// language=SQLite
276326
local_db
277327
.exec_safe(
@@ -321,7 +371,8 @@ INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash)
321371
FROM ps_oplog_old oplog
322372
JOIN ps_buckets
323373
ON ps_buckets.name = oplog.bucket
324-
WHERE oplog.superseded = 0 AND oplog.op = 3;
374+
WHERE oplog.superseded = 0 AND oplog.op = 3
375+
ORDER BY oplog.bucket, oplog.op_id;
325376
326377
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
327378
SELECT row_type, row_id
@@ -335,6 +386,13 @@ UPDATE ps_buckets SET add_checksum = 0xffffffff & (add_checksum + (
335386
AND (oplog.superseded = 1 OR oplog.op != 3)
336387
));
337388
389+
UPDATE ps_buckets SET op_checksum = 0xffffffff & (op_checksum - (
390+
SELECT IFNULL(SUM(oplog.hash), 0)
391+
FROM ps_oplog_old oplog
392+
WHERE oplog.bucket = ps_buckets.name
393+
AND (oplog.superseded = 1 OR oplog.op != 3)
394+
));
395+
338396
DROP TABLE ps_oplog_old;
339397
340398
INSERT INTO ps_migration(id, down_migrations)
@@ -347,6 +405,17 @@ INSERT INTO ps_migration(id, down_migrations)
347405
.into_db_result(local_db)?;
348406
}
349407

408+
Ok(())
409+
}
410+
411+
fn powersync_init_impl(
412+
ctx: *mut sqlite::context,
413+
_args: &[*mut sqlite::value],
414+
) -> Result<String, SQLiteError> {
415+
let local_db = ctx.db_handle();
416+
417+
powersync_migrate(ctx, 5)?;
418+
350419
setup_internal_views(local_db)?;
351420

352421
Ok(String::from(""))
@@ -355,6 +424,23 @@ INSERT INTO ps_migration(id, down_migrations)
355424
create_auto_tx_function!(powersync_init_tx, powersync_init_impl);
356425
create_sqlite_text_fn!(powersync_init, powersync_init_tx, "powersync_init");
357426

427+
fn powersync_test_migration_impl(
428+
ctx: *mut sqlite::context,
429+
args: &[*mut sqlite::value],
430+
) -> Result<String, SQLiteError> {
431+
let target_version = args[0].int();
432+
powersync_migrate(ctx, target_version)?;
433+
434+
Ok(String::from(""))
435+
}
436+
437+
create_auto_tx_function!(powersync_test_migration_tx, powersync_test_migration_impl);
438+
create_sqlite_text_fn!(
439+
powersync_test_migration,
440+
powersync_test_migration_tx,
441+
"powersync_test_migration"
442+
);
443+
358444
fn powersync_clear_impl(
359445
ctx: *mut sqlite::context,
360446
args: &[*mut sqlite::value],
@@ -537,6 +623,17 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
537623
None,
538624
)?;
539625

626+
db.create_function_v2(
627+
"powersync_test_migration",
628+
1,
629+
sqlite::UTF8,
630+
None,
631+
Some(powersync_test_migration),
632+
None,
633+
None,
634+
None,
635+
)?;
636+
540637
// Initialize the extension internal tables.
541638
db.create_function_v2(
542639
"powersync_clear",

0 commit comments

Comments
 (0)