Skip to content

Commit a7a8f8e

Browse files
authored
Merge pull request #27 from powersync-ja/restructure
More efficient storage storage format
2 parents 4b4cd99 + c11d5ed commit a7a8f8e

16 files changed

+1654
-379
lines changed

.github/workflows/tests.yml

+19-4
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,25 @@ jobs:
99
fail-fast: false
1010
matrix:
1111
include:
12-
- os: ubuntu-latest
12+
- os: ubuntu-24.04
1313
- os: macos-latest
1414
steps:
1515
- uses: actions/checkout@v3
1616
with:
1717
submodules: true
18+
- uses: dart-lang/setup-dart@v1
1819

19-
- name: Build lib
20+
- name: Ubuntu setup
21+
if: matrix.os == 'ubuntu-24.04'
2022
run: |
21-
cargo build -p powersync_loadable --release
23+
sudo apt install libreadline-dev
2224
23-
- name: Build sqlite
25+
- name: Build
2426
run: |
27+
# Need a debug build for the dart tests
28+
cargo build -p powersync_loadable
29+
30+
cargo build -p powersync_loadable --release
2531
cargo build -p powersync_core --release --features static
2632
cargo build -p powersync_sqlite --release
2733
cargo build -p sqlite3 --release
@@ -37,3 +43,12 @@ jobs:
3743
- name: Check loadable extension
3844
run: |
3945
./target/release/sqlite3 ":memory:" ".load ./target/release/libpowersync" "select powersync_rs_version()"
46+
47+
- name: Run dart-based tests
48+
# Extension loading fails on macos currently
49+
if: matrix.os == 'ubuntu-24.04'
50+
run: |
51+
cd dart
52+
dart pub get
53+
dart test
54+
dart analyze

crates/core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod error;
1919
mod ext;
2020
mod kv;
2121
mod macros;
22+
mod migrations;
2223
mod operations;
2324
mod operations_vtab;
2425
mod schema_management;

crates/core/src/migrations.rs

+288
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
extern crate alloc;
2+
3+
use alloc::format;
4+
use alloc::string::{String, ToString};
5+
use alloc::vec::Vec;
6+
7+
use sqlite::ResultCode;
8+
use sqlite_nostd as sqlite;
9+
use sqlite_nostd::{Connection, Context};
10+
11+
use crate::error::{PSResult, SQLiteError};
12+
13+
pub fn powersync_migrate(
14+
ctx: *mut sqlite::context,
15+
target_version: i32,
16+
) -> Result<(), SQLiteError> {
17+
let local_db = ctx.db_handle();
18+
19+
// language=SQLite
20+
local_db.exec_safe(
21+
"\
22+
CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)",
23+
)?;
24+
25+
// language=SQLite
26+
let current_version_stmt =
27+
local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?;
28+
let rc = current_version_stmt.step()?;
29+
if rc != ResultCode::ROW {
30+
return Err(SQLiteError::from(ResultCode::ABORT));
31+
}
32+
33+
let mut current_version = current_version_stmt.column_int(0)?;
34+
35+
while current_version > target_version {
36+
// Run down migrations.
37+
// This is rare, we don't worry about optimizing this.
38+
39+
current_version_stmt.reset()?;
40+
41+
let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?;
42+
down_migrations_stmt.bind_int(1, target_version)?;
43+
44+
let mut down_sql: Vec<String> = alloc::vec![];
45+
46+
while down_migrations_stmt.step()? == ResultCode::ROW {
47+
let sql = down_migrations_stmt.column_text(0)?;
48+
down_sql.push(sql.to_string());
49+
}
50+
51+
for sql in down_sql {
52+
let rs = local_db.exec_safe(&sql);
53+
if let Err(code) = rs {
54+
return Err(SQLiteError(
55+
code,
56+
Some(format!(
57+
"Down migration failed for {:} {:} {:}",
58+
current_version,
59+
sql,
60+
local_db
61+
.errmsg()
62+
.unwrap_or(String::from("Conversion error"))
63+
)),
64+
));
65+
}
66+
}
67+
68+
// Refresh the version
69+
current_version_stmt.reset()?;
70+
let rc = current_version_stmt.step()?;
71+
if rc != ResultCode::ROW {
72+
return Err(SQLiteError(
73+
rc,
74+
Some("Down migration failed - could not get version".to_string()),
75+
));
76+
}
77+
let new_version = current_version_stmt.column_int(0)?;
78+
if new_version >= current_version {
79+
// Database down from version $currentVersion to $version failed - version not updated after dow migration
80+
return Err(SQLiteError(
81+
ResultCode::ABORT,
82+
Some(format!(
83+
"Down migration failed - version not updated from {:}",
84+
current_version
85+
)),
86+
));
87+
}
88+
current_version = new_version;
89+
}
90+
current_version_stmt.reset()?;
91+
92+
if current_version < 1 {
93+
// language=SQLite
94+
local_db
95+
.exec_safe(
96+
"
97+
CREATE TABLE ps_oplog(
98+
bucket TEXT NOT NULL,
99+
op_id INTEGER NOT NULL,
100+
op INTEGER NOT NULL,
101+
row_type TEXT,
102+
row_id TEXT,
103+
key TEXT,
104+
data TEXT,
105+
hash INTEGER NOT NULL,
106+
superseded INTEGER NOT NULL);
107+
108+
CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0;
109+
CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id);
110+
CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0;
111+
112+
CREATE TABLE ps_buckets(
113+
name TEXT PRIMARY KEY,
114+
last_applied_op INTEGER NOT NULL DEFAULT 0,
115+
last_op INTEGER NOT NULL DEFAULT 0,
116+
target_op INTEGER NOT NULL DEFAULT 0,
117+
add_checksum INTEGER NOT NULL DEFAULT 0,
118+
pending_delete INTEGER NOT NULL DEFAULT 0
119+
);
120+
121+
CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id));
122+
123+
CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT);
124+
125+
INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL);
126+
",
127+
)
128+
.into_db_result(local_db)?;
129+
}
130+
131+
if current_version < 2 && target_version >= 2 {
132+
// language=SQLite
133+
local_db.exec_safe("\
134+
CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER);
135+
INSERT INTO ps_tx(id, current_tx, next_tx) VALUES(1, NULL, 1);
136+
137+
ALTER TABLE ps_crud ADD COLUMN tx_id INTEGER;
138+
139+
INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array())));
140+
").into_db_result(local_db)?;
141+
}
142+
143+
if current_version < 3 && target_version >= 3 {
144+
// language=SQLite
145+
local_db.exec_safe("\
146+
CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB);
147+
INSERT INTO ps_kv(key, value) values('client_id', uuid());
148+
149+
INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 3'), json_object('sql', 'DROP TABLE ps_kv')));
150+
").into_db_result(local_db)?;
151+
}
152+
153+
if current_version < 4 && target_version >= 4 {
154+
// language=SQLite
155+
local_db.exec_safe("\
156+
ALTER TABLE ps_buckets ADD COLUMN op_checksum INTEGER NOT NULL DEFAULT 0;
157+
ALTER TABLE ps_buckets ADD COLUMN remove_operations INTEGER NOT NULL DEFAULT 0;
158+
159+
UPDATE ps_buckets SET op_checksum = (
160+
SELECT IFNULL(SUM(ps_oplog.hash), 0) & 0xffffffff FROM ps_oplog WHERE ps_oplog.bucket = ps_buckets.name
161+
);
162+
163+
INSERT INTO ps_migration(id, down_migrations)
164+
VALUES(4,
165+
json_array(
166+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 4'),
167+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN op_checksum'),
168+
json_object('sql', 'ALTER TABLE ps_buckets DROP COLUMN remove_operations')
169+
));
170+
").into_db_result(local_db)?;
171+
}
172+
173+
if current_version < 5 && target_version >= 5 {
174+
// Start by dropping all existing views and triggers (but not tables).
175+
// This is because the triggers are restructured in this version, and
176+
// need to be re-created from scratch. Not dropping them can make it
177+
// refer to tables or columns not existing anymore, which can case
178+
// issues later on.
179+
// The same applies for the down migration.
180+
181+
// language=SQLite
182+
local_db
183+
.exec_safe(
184+
"\
185+
SELECT powersync_drop_view(view.name)
186+
FROM sqlite_master view
187+
WHERE view.type = 'view'
188+
AND view.sql GLOB '*-- powersync-auto-generated';
189+
190+
ALTER TABLE ps_buckets RENAME TO ps_buckets_old;
191+
ALTER TABLE ps_oplog RENAME TO ps_oplog_old;
192+
193+
CREATE TABLE ps_buckets(
194+
id INTEGER PRIMARY KEY,
195+
name TEXT NOT NULL,
196+
last_applied_op INTEGER NOT NULL DEFAULT 0,
197+
last_op INTEGER NOT NULL DEFAULT 0,
198+
target_op INTEGER NOT NULL DEFAULT 0,
199+
add_checksum INTEGER NOT NULL DEFAULT 0,
200+
op_checksum INTEGER NOT NULL DEFAULT 0,
201+
pending_delete INTEGER NOT NULL DEFAULT 0
202+
) STRICT;
203+
204+
CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name);
205+
206+
CREATE TABLE ps_oplog(
207+
bucket INTEGER NOT NULL,
208+
op_id INTEGER NOT NULL,
209+
row_type TEXT,
210+
row_id TEXT,
211+
key TEXT,
212+
data TEXT,
213+
hash INTEGER NOT NULL) STRICT;
214+
215+
CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id);
216+
CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id);
217+
CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key);
218+
219+
CREATE TABLE ps_updated_rows(
220+
row_type TEXT,
221+
row_id TEXT) STRICT;
222+
223+
CREATE UNIQUE INDEX ps_updated_rows_row ON ps_updated_rows (row_type, row_id);
224+
225+
INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)
226+
SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_old;
227+
228+
DROP TABLE ps_buckets_old;
229+
230+
INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash)
231+
SELECT ps_buckets.id, oplog.op_id, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash
232+
FROM ps_oplog_old oplog
233+
JOIN ps_buckets
234+
ON ps_buckets.name = oplog.bucket
235+
WHERE oplog.superseded = 0 AND oplog.op = 3
236+
ORDER BY oplog.bucket, oplog.op_id;
237+
238+
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
239+
SELECT row_type, row_id
240+
FROM ps_oplog_old oplog
241+
WHERE oplog.op != 3;
242+
243+
UPDATE ps_buckets SET add_checksum = 0xffffffff & (add_checksum + (
244+
SELECT IFNULL(SUM(oplog.hash), 0)
245+
FROM ps_oplog_old oplog
246+
WHERE oplog.bucket = ps_buckets.name
247+
AND (oplog.superseded = 1 OR oplog.op != 3)
248+
));
249+
250+
UPDATE ps_buckets SET op_checksum = 0xffffffff & (op_checksum - (
251+
SELECT IFNULL(SUM(oplog.hash), 0)
252+
FROM ps_oplog_old oplog
253+
WHERE oplog.bucket = ps_buckets.name
254+
AND (oplog.superseded = 1 OR oplog.op != 3)
255+
));
256+
257+
DROP TABLE ps_oplog_old;
258+
259+
INSERT INTO ps_migration(id, down_migrations)
260+
VALUES(5,
261+
json_array(
262+
-- Drop existing views and triggers if any
263+
json_object('sql', 'SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated'''),
264+
265+
json_object('sql', 'ALTER TABLE ps_buckets RENAME TO ps_buckets_5'),
266+
json_object('sql', 'ALTER TABLE ps_oplog RENAME TO ps_oplog_5'),
267+
json_object('sql', 'CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)'),
268+
json_object('sql', 'INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5'),
269+
json_object('sql', 'CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)'),
270+
json_object('sql', 'CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0'),
271+
json_object('sql', 'CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)'),
272+
json_object('sql', 'CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0'),
273+
json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket'),
274+
json_object('sql', 'DROP TABLE ps_oplog_5'),
275+
json_object('sql', 'DROP TABLE ps_buckets_5'),
276+
json_object('sql', 'INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r'),
277+
json_object('sql', 'INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)'),
278+
json_object('sql', 'DROP TABLE ps_updated_rows'),
279+
280+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 5')
281+
));
282+
",
283+
)
284+
.into_db_result(local_db)?;
285+
}
286+
287+
Ok(())
288+
}

0 commit comments

Comments
 (0)