|
| 1 | +extern crate alloc; |
| 2 | + |
| 3 | +use alloc::format; |
| 4 | +use alloc::string::{String, ToString}; |
| 5 | +use alloc::vec::Vec; |
| 6 | + |
| 7 | +use sqlite::ResultCode; |
| 8 | +use sqlite_nostd as sqlite; |
| 9 | +use sqlite_nostd::{Connection, Context}; |
| 10 | + |
| 11 | +use crate::error::{PSResult, SQLiteError}; |
| 12 | + |
| 13 | +pub fn powersync_migrate( |
| 14 | + ctx: *mut sqlite::context, |
| 15 | + target_version: i32, |
| 16 | +) -> Result<(), SQLiteError> { |
| 17 | + let local_db = ctx.db_handle(); |
| 18 | + |
| 19 | + // language=SQLite |
| 20 | + local_db.exec_safe( |
| 21 | + "\ |
| 22 | +CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)", |
| 23 | + )?; |
| 24 | + |
| 25 | + // language=SQLite |
| 26 | + let current_version_stmt = |
| 27 | + local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; |
| 28 | + let rc = current_version_stmt.step()?; |
| 29 | + if rc != ResultCode::ROW { |
| 30 | + return Err(SQLiteError::from(ResultCode::ABORT)); |
| 31 | + } |
| 32 | + |
| 33 | + let mut current_version = current_version_stmt.column_int(0)?; |
| 34 | + |
| 35 | + while current_version > target_version { |
| 36 | + // Run down migrations. |
| 37 | + // This is rare, we don't worry about optimizing this. |
| 38 | + |
| 39 | + current_version_stmt.reset()?; |
| 40 | + |
| 41 | + let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?; |
| 42 | + down_migrations_stmt.bind_int(1, target_version)?; |
| 43 | + |
| 44 | + let mut down_sql: Vec<String> = alloc::vec![]; |
| 45 | + |
| 46 | + while down_migrations_stmt.step()? == ResultCode::ROW { |
| 47 | + let sql = down_migrations_stmt.column_text(0)?; |
| 48 | + down_sql.push(sql.to_string()); |
| 49 | + } |
| 50 | + |
| 51 | + for sql in down_sql { |
| 52 | + let rs = local_db.exec_safe(&sql); |
| 53 | + if let Err(code) = rs { |
| 54 | + return Err(SQLiteError( |
| 55 | + code, |
| 56 | + Some(format!( |
| 57 | + "Down migration failed for {:} {:} {:}", |
| 58 | + current_version, |
| 59 | + sql, |
| 60 | + local_db |
| 61 | + .errmsg() |
| 62 | + .unwrap_or(String::from("Conversion error")) |
| 63 | + )), |
| 64 | + )); |
| 65 | + } |
| 66 | + } |
| 67 | + |
| 68 | + // Refresh the version |
| 69 | + current_version_stmt.reset()?; |
| 70 | + let rc = current_version_stmt.step()?; |
| 71 | + if rc != ResultCode::ROW { |
| 72 | + return Err(SQLiteError( |
| 73 | + rc, |
| 74 | + Some("Down migration failed - could not get version".to_string()), |
| 75 | + )); |
| 76 | + } |
| 77 | + let new_version = current_version_stmt.column_int(0)?; |
| 78 | + if new_version >= current_version { |
| 79 | + // Database down from version $currentVersion to $version failed - version not updated after dow migration |
| 80 | + return Err(SQLiteError( |
| 81 | + ResultCode::ABORT, |
| 82 | + Some(format!( |
| 83 | + "Down migration failed - version not updated from {:}", |
| 84 | + current_version |
| 85 | + )), |
| 86 | + )); |
| 87 | + } |
| 88 | + current_version = new_version; |
| 89 | + } |
| 90 | + current_version_stmt.reset()?; |
| 91 | + |
| 92 | + if current_version < 1 { |
| 93 | + // language=SQLite |
| 94 | + local_db |
| 95 | + .exec_safe( |
| 96 | + " |
| 97 | +CREATE TABLE ps_oplog( |
| 98 | +bucket TEXT NOT NULL, |
| 99 | +op_id INTEGER NOT NULL, |
| 100 | +op INTEGER NOT NULL, |
| 101 | +row_type TEXT, |
| 102 | +row_id TEXT, |
| 103 | +key TEXT, |
| 104 | +data TEXT, |
| 105 | +hash INTEGER NOT NULL, |
| 106 | +superseded INTEGER NOT NULL); |
| 107 | +
|
| 108 | +CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0; |
| 109 | +CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id); |
| 110 | +CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0; |
| 111 | +
|
| 112 | +CREATE TABLE ps_buckets( |
| 113 | +name TEXT PRIMARY KEY, |
| 114 | +last_applied_op INTEGER NOT NULL DEFAULT 0, |
| 115 | +last_op INTEGER NOT NULL DEFAULT 0, |
| 116 | +target_op INTEGER NOT NULL DEFAULT 0, |
| 117 | +add_checksum INTEGER NOT NULL DEFAULT 0, |
| 118 | +pending_delete INTEGER NOT NULL DEFAULT 0 |
| 119 | +); |
| 120 | +
|
| 121 | +CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)); |
| 122 | +
|
| 123 | +CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT); |
| 124 | +
|
| 125 | +INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL); |
| 126 | +", |
| 127 | + ) |
| 128 | + .into_db_result(local_db)?; |
| 129 | + } |
| 130 | + |
| 131 | + if current_version < 2 && target_version >= 2 { |
| 132 | + // language=SQLite |
| 133 | + local_db.exec_safe("\ |
| 134 | +CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER); |
| 135 | +INSERT INTO ps_tx(id, current_tx, next_tx) VALUES(1, NULL, 1); |
| 136 | +
|
| 137 | +ALTER TABLE ps_crud ADD COLUMN tx_id INTEGER; |
| 138 | +
|
| 139 | +INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array()))); |
| 140 | +").into_db_result(local_db)?; |
| 141 | + } |
| 142 | + |
| 143 | + if current_version < 3 && target_version >= 3 { |
| 144 | + // language=SQLite |
| 145 | + local_db.exec_safe("\ |
| 146 | +CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB); |
| 147 | +INSERT INTO ps_kv(key, value) values('client_id', uuid()); |
| 148 | +
|
| 149 | +INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 3'), json_object('sql', 'DROP TABLE ps_kv'))); |
| 150 | + ").into_db_result(local_db)?; |
| 151 | + } |
| 152 | + |
| 153 | + if current_version < 4 && target_version >= 4 { |
| 154 | + // language=SQLite |
| 155 | + local_db.exec_safe("\ |
| 156 | +ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0; |
| 157 | +ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0; |
| 158 | +
|
| 159 | +UPDATE ps_buckets SET op_checksum = ( |
| 160 | +SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name |
| 161 | +); |
| 162 | +
|
| 163 | +INSERT INTO ps_migration(id, down_migrations) |
| 164 | +VALUES(4, |
| 165 | + json_array( |
| 166 | + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'), |
| 167 | + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'), |
| 168 | + json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations') |
| 169 | + )); |
| 170 | + ").into_db_result(local_db)?; |
| 171 | + } |
| 172 | + |
| 173 | + if current_version < 5 && target_version >= 5 { |
| 174 | + // Start by dropping all existing views and triggers (but not tables). |
| 175 | + // This is because the triggers are restructured in this version, and |
| 176 | + // need to be re-created from scratch. Not dropping them can make it |
| 177 | + // refer to tables or columns not existing anymore, which can case |
| 178 | + // issues later on. |
| 179 | + // The same applies for the down migration. |
| 180 | + |
| 181 | + // language=SQLite |
| 182 | + local_db |
| 183 | + .exec_safe( |
| 184 | + "\ |
| 185 | +SELECT powersync_drop_view(view.name) |
| 186 | +FROM sqlite_master view |
| 187 | +WHERE view.type = 'view' |
| 188 | + AND view.sql GLOB '*-- powersync-auto-generated'; |
| 189 | +
|
| 190 | +ALTER TABLE ps_buckets RENAME TO ps_buckets_old; |
| 191 | +ALTER TABLE ps_oplog RENAME TO ps_oplog_old; |
| 192 | +
|
| 193 | +CREATE TABLE ps_buckets( |
| 194 | + id INTEGER PRIMARY KEY, |
| 195 | + name TEXT NOT NULL, |
| 196 | + last_applied_op INTEGER NOT NULL DEFAULT 0, |
| 197 | + last_op INTEGER NOT NULL DEFAULT 0, |
| 198 | + target_op INTEGER NOT NULL DEFAULT 0, |
| 199 | + add_checksum INTEGER NOT NULL DEFAULT 0, |
| 200 | + op_checksum INTEGER NOT NULL DEFAULT 0, |
| 201 | + pending_delete INTEGER NOT NULL DEFAULT 0 |
| 202 | +) STRICT; |
| 203 | +
|
| 204 | +CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name); |
| 205 | +
|
| 206 | +CREATE TABLE ps_oplog( |
| 207 | + bucket INTEGER NOT NULL, |
| 208 | + op_id INTEGER NOT NULL, |
| 209 | + row_type TEXT, |
| 210 | + row_id TEXT, |
| 211 | + key TEXT, |
| 212 | + data TEXT, |
| 213 | + hash INTEGER NOT NULL) STRICT; |
| 214 | +
|
| 215 | +CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id); |
| 216 | +CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id); |
| 217 | +CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key); |
| 218 | +
|
| 219 | +CREATE TABLE ps_updated_rows( |
| 220 | + row_type TEXT, |
| 221 | + row_id TEXT) STRICT; |
| 222 | +
|
| 223 | +CREATE UNIQUE INDEX ps_updated_rows_row ON ps_updated_rows (row_type, row_id); |
| 224 | +
|
| 225 | +INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) |
| 226 | +SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_old; |
| 227 | +
|
| 228 | +DROP TABLE ps_buckets_old; |
| 229 | +
|
| 230 | +INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) |
| 231 | +SELECT ps_buckets.id, oplog.op_id, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash |
| 232 | + FROM ps_oplog_old oplog |
| 233 | + JOIN ps_buckets |
| 234 | + ON ps_buckets.name = oplog.bucket |
| 235 | + WHERE oplog.superseded = 0 AND oplog.op = 3 |
| 236 | + ORDER BY oplog.bucket, oplog.op_id; |
| 237 | +
|
| 238 | +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) |
| 239 | +SELECT row_type, row_id |
| 240 | + FROM ps_oplog_old oplog |
| 241 | + WHERE oplog.op != 3; |
| 242 | +
|
| 243 | +UPDATE ps_buckets SET add_checksum = 0xffffffff & (add_checksum + ( |
| 244 | +SELECT IFNULL(SUM(oplog.hash), 0) |
| 245 | + FROM ps_oplog_old oplog |
| 246 | + WHERE oplog.bucket = ps_buckets.name |
| 247 | + AND (oplog.superseded = 1 OR oplog.op != 3) |
| 248 | +)); |
| 249 | +
|
| 250 | +UPDATE ps_buckets SET op_checksum = 0xffffffff & (op_checksum - ( |
| 251 | + SELECT IFNULL(SUM(oplog.hash), 0) |
| 252 | + FROM ps_oplog_old oplog |
| 253 | + WHERE oplog.bucket = ps_buckets.name |
| 254 | + AND (oplog.superseded = 1 OR oplog.op != 3) |
| 255 | +)); |
| 256 | +
|
| 257 | +DROP TABLE ps_oplog_old; |
| 258 | +
|
| 259 | +INSERT INTO ps_migration(id, down_migrations) |
| 260 | +VALUES(5, |
| 261 | + json_array( |
| 262 | + -- Drop existing views and triggers if any |
| 263 | + json_object('sql', 'SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated'''), |
| 264 | +
|
| 265 | + json_object('sql', 'ALTER TABLE ps_buckets RENAME TO ps_buckets_5'), |
| 266 | + json_object('sql', 'ALTER TABLE ps_oplog RENAME TO ps_oplog_5'), |
| 267 | + json_object('sql', 'CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)'), |
| 268 | + json_object('sql', 'INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5'), |
| 269 | + json_object('sql', 'CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)'), |
| 270 | + json_object('sql', 'CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0'), |
| 271 | + json_object('sql', 'CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)'), |
| 272 | + json_object('sql', 'CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0'), |
| 273 | + json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket'), |
| 274 | + json_object('sql', 'DROP TABLE ps_oplog_5'), |
| 275 | + json_object('sql', 'DROP TABLE ps_buckets_5'), |
| 276 | + json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r'), |
| 277 | + json_object('sql', 'INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)'), |
| 278 | + json_object('sql', 'DROP TABLE ps_updated_rows'), |
| 279 | +
|
| 280 | + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 5') |
| 281 | + )); |
| 282 | + ", |
| 283 | + ) |
| 284 | + .into_db_result(local_db)?; |
| 285 | + } |
| 286 | + |
| 287 | + Ok(()) |
| 288 | +} |
0 commit comments