βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β "SELECT name FROM users WHERE age > 25 AND city = 'NYC'" β
β β
β β β
β β sql/parser.rs (sqlparser crate) β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β AST β β
β β β β
β β Select { β β
β β projection: [Column("name")], β β
β β from: Table("users"), β β
β β where: BinaryOp( β β
β β And, β β
β β BinaryOp(Gt, Column("age"), Literal(25)), β β
β β BinaryOp(Eq, Column("city"), Literal("NYC")) β β
β β ) β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β sql/planner.rs β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β QueryPlan β β
β β β β
β β QueryPlan { β β
β β table: "users", β β
β β columns: [ β β
β β ColumnSpec { name: "age", ordinal: 2, filters: [Gt(25)] }, β β
β β ColumnSpec { name: "city", ordinal: 3, filters: [Eq("NYC")] }, β β
β β ColumnSpec { name: "name", ordinal: 1, filters: [] }, β β
β β ], β β
β β projections: ["name"], β β
β β } β β
β β β β
β β Note: Columns with filters are processed first (predicate pushdown) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β pipeline/builder.rs β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Job β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Step 0 β β Step 1 β β Step 2 β β β
β β β col: age βββββββΆβ col: city βββββββΆβ col: name βββββββΆ output β β
β β β filter: >25β β filter: = β β filter: - β β β
β β β is_root β β β β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Predicates form a tree that gets evaluated per-batch:
WHERE age > 25 AND (city = 'NYC' OR city = 'LA')
βββββββββββ
β AND β
ββββββ¬βββββ
β
ββββββββββββ΄βββββββββββ
β β
ββββββ΄βββββ βββββββ΄βββββ
β Leaf β β OR β
β age > 25β ββββββ¬ββββββ
βββββββββββ β
ββββββββββββ΄βββββββββββ
β β
ββββββ΄βββββ βββββββ΄βββββ
β Leaf β β Leaf β
βcity=NYC β β city=LA β
βββββββββββ ββββββββββββ
FilterExpr::And(
FilterExpr::Leaf { col: "age", op: Gt, val: 25 },
FilterExpr::Or(
FilterExpr::Leaf { col: "city", op: Eq, val: "NYC" },
FilterExpr::Leaf { col: "city", op: Eq, val: "LA" },
)
)
Each step is a self-contained operator:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PipelineStep β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Configuration β β
β β β β
β β table: String "users" β β
β β column: String "age" β β
β β column_ordinal: usize 2 β β
β β filters: Vec<FilterExpr> [Gt(25)] β β
β β is_root: bool true (first step) / false (subsequent) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Communication β β
β β β β
β β previous_receiver: Receiver<ColumnarBatch> βββ from upstream step β β
β β current_producer: Sender<ColumnarBatch> βββΆ to downstream step β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Resources β β
β β β β
β β page_handler: Arc<PageHandler> shared access to cache/disk β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The root step scans pages and initiates the data flow:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β execute_root() β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Get all pages for this column β β
β β β β
β β descriptors = page_handler.list_pages("users", "age") β β
β β β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β βPage 0 β βPage 1 β βPage 2 β βPage 3 β ... β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. For each page (morsel): β β
β β β β
β β page = page_handler.get_page(descriptor) // cache or disk β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β ColumnarPage β β β
β β β β β β
β β β data: ColumnData::Int64([23, 45, 19, 67, 31, ...]) β β β
β β β null_bitmap: Bitmap { bits: [...], len: 50000 } β β β
β β β num_rows: 50000 β β β
β β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 3. Create batch and evaluate filters β β
β β β β
β β batch = ColumnarBatch { β β
β β columns: { 2: page }, // ordinal β data β β
β β row_ids: [0, 1, 2, ...], // global row identifiers β β
β β num_rows: 50000, β β
β β } β β
β β β β
β β bitmap = evaluate_filters([age > 25], batch) β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Bitmap evaluation (vectorized): β β β
β β β β β β
β β β int_data: [23, 45, 19, 67, 31, 28, 15, 52, ...] β β β
β β β Γ β Γ β β β Γ β β β β
β β β β β β
β β β bitmap: [ 0, 1, 0, 1, 1, 1, 0, 1, ...] β β β
β β β β β β
β β β Packed as u64 words: [0b01011101..., ...] β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 4. Filter batch and PUSH downstream β β
β β β β
β β filtered = batch.filter_by_bitmap(bitmap) β β
β β β β
β β // Gather only matching rows (compact representation) β β
β β filtered.num_rows: 31247 (was 50000) β β
β β filtered.row_ids: [1, 3, 4, 5, 7, ...] // original positions β β
β β β β
β β β β β
β β current_producer.send(filtered) β β
β β β β β
β β βΌ β β
β β ββββββββββββββ β β
β β Channel β β
β β ββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 5. Send termination signal β β
β β β β
β β current_producer.send(ColumnarBatch { num_rows: 0 }) // empty = done β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Subsequent steps receive batches, enrich with their column, filter, and push:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β execute_non_root() β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β while let Ok(batch) = previous_receiver.recv() β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. Check for termination β β
β β β β
β β if batch.num_rows == 0 { β β
β β current_producer.send(empty_batch); // propagate termination β β
β β break; β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. Materialize this step's column (lazy loading) β β
β β β β
β β Incoming batch has: { columns: {2: age_data}, row_ids: [1,3,4,5,7,...] } β β
β β β β
β β This step needs column 3 (city). Load only the rows we need: β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β For each row_id in batch.row_ids: β β β
β β β page_group = row_id / rows_per_page_group β β β
β β β page = page_handler.get_page("users.city.{page_group}") β β β
β β β offset = row_id % rows_per_page_group β β β
β β β value = page.data.get_string(offset) β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β batch.columns.insert(3, city_data) β β
β β β β
β β Batch now has: { columns: {2: age, 3: city}, row_ids: [...] } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 3. Evaluate filters for this column β β
β β β β
β β bitmap = evaluate_filters([city = 'NYC'], batch) β β
β β filtered = batch.filter_by_bitmap(bitmap) β β
β β β β
β β // Further reduces row count β β
β β filtered.num_rows: 8234 (was 31247) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 4. PUSH to next step β β
β β β β
β β current_producer.send(filtered) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Empty batch signals end-of-data, propagates through entire pipeline:
Root Step Step 1 Step 2 Output
β β β β
β [batch 0] β β β
βββββββββββββββββββββββΆ β β
β β [batch 0] β β
β ββββββββββββββββββββββββΆ β
β β β [batch 0] β
β β ββββββββββββββββββββββΆ
β [batch 1] β β β
βββββββββββββββββββββββΆ β β
β β [batch 1] β β
β ββββββββββββββββββββββββΆ β
β β β [batch 1] β
β β ββββββββββββββββββββββΆ
β β β β
β [EMPTY] ββββββββ termination signal βββββββββββββββββββββββ β
βββββββββββββββββββββββΆ β β
β β [EMPTY] β β
β ββββββββββββββββββββββββΆ β
β β β [EMPTY] β
β β ββββββββββββββββββββββΆ
β β β β
done done done all done
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ColumnarBatch β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β columns: HashMap<usize, ColumnarPage> β β
β β β β
β β ordinal 2 (age) βββΆ ColumnarPage { data: Int64([...]), ... } β β
β β ordinal 3 (city) βββΆ ColumnarPage { data: Text({...}), ... } β β
β β ordinal 1 (name) βββΆ ColumnarPage { data: Text({...}), ... } β β
β β β β
β β Sparse: only columns that have been loaded are present β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β row_ids: Vec<u64> β β
β β β β
β β [1, 3, 4, 5, 7, 12, 15, ...] β β
β β β β
β β Global row identifiers - tracks which original rows are in this batch β β
β β Used for: joining columns, tracking lineage, late materialization β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β num_rows: usize β β
β β β β
β β 8234 β β
β β β β
β β Current row count (decreases as filters are applied) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ColumnarPage β
β β
β struct ColumnarPage { β
β page_metadata: String, // page identifier β
β data: ColumnData, // type-specific data (see enum below) β
β null_bitmap: Bitmap, // which rows are null β
β num_rows: usize, // row count in this page β
β } β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ColumnData Enum - One Variant Active Per Page β β
β β β β
β β enum ColumnData { β β
β β Boolean(Vec<bool>), β β
β β Int64(Vec<i64>), β β
β β Float64(Vec<f64>), β β
β β Timestamp(Vec<i64>), // epoch millis β β
β β Text(BytesColumn), // Arrow-style variable-length β β
β β Dictionary(DictionaryColumn), // dictionary-encoded strings β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Example: Int64 Column β β
β β β β
β β data: ColumnData::Int64(vec![23, 45, 19, 67, 31, 28, 15, 52, ...]) β β
β β βββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ β β
β β β 23 β 45 β 19 β 67 β 31 β 28 β 15 β 52 β ... β β
β β βββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ β β
β β β β
β β Direct array access: O(1) per element β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Example: Text Column (Arrow-style BytesColumn) β β
β β β β
β β data: ColumnData::Text(BytesColumn { offsets, data }) β β
β β β β
β β offsets: Vec<u32> β β
β β βββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ¬ββββββ β β
β β β 0 β 5 β 8 β 16 β 23 β 28 β 35 β ... β β
β β βββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ΄ββββββ β β
β β β β
β β data: Vec<u8> β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β A l i c e B o b C h a r l o t t e D a v i d ... β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β βββββββ€βββββ€βββββββββββββββββ€βββββββββ€ β β
β β [0] [1] [2] [3] β β
β β β β
β β Row i = data[offsets[i]..offsets[i+1]] β β
β β Length of row i = offsets[i+1] - offsets[i] (no strlen needed) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Null Bitmap β β
β β β β
β β null_bitmap: Bitmap { bits: Vec<u64>, len: usize } β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1111111011111110111111111111111111111111111111111111111111111110... β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββ 0 = NULL, 1 = present β β
β β β β
β β Packed: 64 rows per u64 word β β
β β Memory: 50,000 rows = 782 u64s = 6.25 KB β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Bitmap Operations β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Predicate: age > 25 β β
β β β β
β β ages: [23, 45, 19, 67, 31, 28, 15, 52] β β
β β result: [ 0, 1, 0, 1, 1, 1, 0, 1] β β
β β β β
β β Packed: 0b11101101 (little-endian bit order) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Combining predicates: bitmap.and(&other) β β
β β β β
β β age > 25: [0, 1, 0, 1, 1, 1, 0, 1] β β
β β city = NYC: [1, 1, 0, 0, 1, 0, 1, 1] β β
β β βββββββββββββββββββββββββ β β
β β AND result: [0, 1, 0, 0, 1, 0, 0, 1] β β
β β β β
β β Single CPU instruction per 64 rows: word1 &= word2 β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Filtering: batch.filter_by_bitmap(bitmap) β β
β β β β
β β Before: β β
β β row_ids: [0, 1, 2, 3, 4, 5, 6, 7] β β
β β ages: [23, 45, 19, 67, 31, 28, 15, 52] β β
β β bitmap: [ 0, 1, 0, 0, 1, 0, 0, 1] β β
β β β β
β β After (gather selected rows): β β
β β row_ids: [1, 4, 7] β β
β β ages: [45, 31, 52] β β
β β num_rows: 3 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
TableCatalog {
name: "users",
columns: [
ColumnDef { name: "id", ordinal: 0, data_type: Int64, nullable: false },
ColumnDef { name: "name", ordinal: 1, data_type: String, nullable: false },
ColumnDef { name: "age", ordinal: 2, data_type: Int64, nullable: true },
ColumnDef { name: "city", ordinal: 3, data_type: String, nullable: true },
ColumnDef { name: "email", ordinal: 4, data_type: String, nullable: true },
],
sort_keys: ["id"], // data sorted by this column
rows_per_page_group: 50000, // rows per physical page
}
PageDescriptor {
id: "users.age.00042", // unique identifier
disk_path: "data/users/age.dat", // file location
offset: 2097152, // byte offset in file (page 42 * ~50KB)
alloc_len: 65536, // space allocated
actual_len: 48234, // actual compressed size
entry_count: 50000, // rows in this page
data_type: Int64, // column type
stats: Some(ColumnStats {
min: 18,
max: 95,
null_count: 127,
}),
}
The writer uses sharded parallelism - each table maps to exactly one shard via FNV-1a hash, ensuring writes to the same table are serialized while different tables can be written in parallel.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SHARDED WRITER β
β β
β writer.submit(UpdateJob { table: "users", ... }) β
β β β
β β shard_index = fnv_hash("users") % num_shards β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Shard 0 Shard 1 Shard 2 Shard N β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β β Worker β β Worker β β Worker β β Worker β β β
β β β Thread β β Thread β β Thread β β Thread β β β
β β βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ βββββ¬βββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β βChannel β βChannel β βChannel β βChannel β β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ β β
β β β β
β β Tables routed by hash: β β
β β "users" β Shard 2 β β
β β "orders" β Shard 0 β β
β β "products"β Shard 1 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Benefits: β
β β’ Same-table writes serialized (no races) β
β β’ Different tables write in parallel β
β β’ Work distributed evenly via hash β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Every write operation follows a strict three-phase protocol to ensure durability and consistency:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β THREE-PHASE COMMIT β
β β
β UpdateJob arrives at worker shard β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHASE 1: PERSIST DATA TO DISK β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ β
β β β β
β β for each column in staged_columns: β β
β β allocation = allocator.allocate(serialized.len()) β β
β β page_io.write_to_path(allocation.path, allocation.offset, serialized) β β
β β fsync() βββ Data is durable on disk β β
β β β β
β β If ANY persist fails β abort entire commit (no partial writes) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β All data safely on disk β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHASE 2: COMMIT METADATA (Atomic Switch) β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ β
β β β β
β β metadata_client.commit(table, updates) β β
β β β β β
β β βββ Reserve descriptor IDs β β
β β βββ Append to MetaJournal (durability) β β
β β βββ Register in PageDirectory (visibility) β β
β β β β
β β After this point: new pages are visible to readers β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β β Metadata committed β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHASE 3: UPDATE IN-MEMORY CACHE β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£ β
β β β β
β β for each (page, descriptor) in staged: β β
β β page.page_metadata = descriptor.id β β
β β page_handler.write_back_uncompressed(descriptor.id, page) β β
β β β β
β β Hot cache now has latest data (avoids re-reading from disk) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β ACK WAL entry (mark as processed) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
For bulk inserts, rows are buffered until a full page group (50k rows) accumulates:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ROW BUFFERING FLOW β
β β
β BufferRow operations accumulate in worker: β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β buffered_rows: HashMap<String, Vec<Vec<String>>> β β
β β β β
β β "users" β [ β β
β β ["1", "Alice", "30"], β β
β β ["2", "Bob", "25"], β β
β β ["3", "Carol", "35"], β β
β β ... β β
β β ] β β
β β β β
β β When len >= 50,000 (ROWS_PER_PAGE_GROUP): β β
β β flush_page_group(table, rows) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β flush_page_group(): β β
β β β β
β β 1. SORT BY SORT KEY β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β rows.sort_by(|a, b| compare_by_sort_key_ordinals(a, b)) β β β
β β β β β β
β β β Before: [("3","Carol"), ("1","Alice"), ("2","Bob")] β β β
β β β After: [("1","Alice"), ("2","Bob"), ("3","Carol")] β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β 2. EXTEND PARTIAL TAIL (if last page not full) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Last page has 45,000 rows, can fit 5,000 more β β β
β β β Take first 5,000 sorted rows β append to existing page β β β
β β β Replace page with updated version β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β 3. CREATE NEW PAGE GROUPS (for remaining full chunks) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β while rows.len() >= 50,000: β β β
β β β chunk = rows.drain(..50,000) β β β
β β β stage_rows_as_new_group(table, columns, chunk) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β 4. HANDLE REMAINDER (partial page group) β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β if rows.len() > 0: β β β
β β β stage_rows_as_new_group(table, columns, rows) β β β
β β β // Will become partial tail for next flush β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The writer integrates with WAL for crash recovery:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WAL INTEGRATION β
β β
β WRITE PATH: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β writer.submit(job) β β
β β β β β
β β βββ 1. Serialize job with rkyv β β
β β β bytes = rkyv::to_bytes::<_, 512>(job) β β
β β β β β
β β βββ 2. Append to WAL (per-table topic) β β
β β β wal.append_for_topic(&job.table, &bytes) β β
β β β β β
β β βββ 3. Send to shard worker β β
β β shards[shard_index].tx.send(Job(job)) β β
β β β β
β β After successful commit: β β
β β wal.read_next(table, true) // ACK and advance β β
β β wal.mark_topic_clean(table) // Mark for cleanup β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β RECOVERY PATH (on startup): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Writer::new() calls replay_pending_jobs() β β
β β β β β
β β βββ for each table in metadata.table_names(): β β
β β β if !wal.topic_is_clean(table): β β
β β β loop: β β
β β β entry = wal.read_next(table, true) β β
β β β job = rkyv::deserialize(entry.data) β β
β β β worker_context.handle_job(job) // Re-apply β β
β β β flush_pending(table) β β
β β β wal.mark_topic_clean(table) β β
β β β β β
β β Guarantees: β β
β β β’ Jobs in WAL but not committed β will be replayed β β
β β β’ Jobs committed but not ACKed β safe to replay (idempotent) β β
β β β’ Clean topics β no replay needed β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The writer supports multiple operation types:
enum UpdateOp {
Overwrite { row: u64, entry: Entry }, // Replace existing row
Append { entry: Entry }, // Add to end
InsertAt { row: u64, entry: Entry }, // Insert at position
BufferRow { row: Vec<String> }, // Buffer for batch flush
}On Linux, all disk I/O uses io_uring with O_DIRECT for maximum throughput:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IO_URING BATCHED READS β
β β
β read_batch_from_path(path, offsets: [0, 65536, 131072]) β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHASE 1: Read First 4KB Block (contains metadata + start of data) β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β io_uring Ring ββ β
β β β ββ β
β β β Submission Queue: ββ β
β β β βββββββββββ¬ββββββββββ¬ββββββββββ ββ β
β β β β Read @0 βRead @64KβRead @128Kβ βββ 3 reads submitted in parallel ββ β
β β β βββββββββββ΄ββββββββββ΄ββββββββββ ββ β
β β β ββ β
β β β ring.submit_and_wait(3) βββ Single syscall for all 3 reads ββ β
β β β ββ β
β β β Completion Queue: ββ β
β β β βββββββββββ¬ββββββββββ¬ββββββββββ ββ β
β β β β Done: 0 β Done: 1 β Done: 2 β ββ β
β β β βββββββββββ΄ββββββββββ΄ββββββββββ ββ β
β β β ββ β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Each 4KB block: [64B metadata][data...] β β
β β Extract read_size from metadata β know if more data needed β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PHASE 2: Read Remaining Data (if page > 4KB) β β
β β β β
β β Page 0: total_len = 64 + 3500 = 3564 β fits in first 4KB (no extra read) β β
β β Page 1: total_len = 64 + 8000 = 8064 β need 4KB more at offset 65536+4096 β β
β β Page 2: total_len = 64 + 12000 = 12064 β need 8KB more at offset 131072+4096β β
β β β β
β β Submit only the extra reads needed (2 in this example) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Assemble Results β β
β β β β
β β For each page: β β
β β data = first_block[64..] + extra_buffer[..] β β
β β results.push(PageCacheEntryCompressed { page: data }) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β O_DIRECT ALIGNMENT β
β β
β O_DIRECT bypasses OS page cache, reading directly to user buffers. β
β Requirements: β
β β
β β’ Buffer address must be 4KB aligned β
β β’ Read/write offset must be 4KB aligned β
β β’ Read/write length must be 4KB aligned β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β AlignedBuffer Implementation: β β
β β β β
β β fn new(capacity: usize) -> Self { β β
β β let aligned_capacity = capacity.div_ceil(4096) * 4096; β β
β β let layout = Layout::from_size_align(aligned_capacity, 4096); β β
β β let ptr = alloc(layout); // 4KB-aligned allocation β β
β β ptr::write_bytes(ptr, 0, aligned_capacity); // Zero-init β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Benefits: β
β β’ No double-buffering (OS cache β user buffer) β
β β’ Predictable memory usage (cache is ours to manage) β
β β’ No page cache pollution from scan workloads β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DIRECT BLOCK ALLOCATOR β
β β
β Constants: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β ALIGN_4K = 4096 // O_DIRECT alignment requirement β β
β β BLOCK_SIZE = 256 * 1024 // 256KB allocation unit β β
β β FILE_MAX = 4 * 1024^3 // 4GB per data file β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Allocation Strategy: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β compute_alloc_len(actual_len): β β
β β β β
β β if actual_len <= 256KB: β β
β β return round_up_4k(actual_len) // Small pages: 4KB aligned β β
β β β β
β β else: β β
β β full_blocks = actual_len / 256KB β β
β β tail = actual_len % 256KB β β
β β return (full_blocks * 256KB) + round_up_4k(tail) β β
β β β β
β β Examples: β β
β β actual_len = 1000 β alloc_len = 4096 (4KB) β β
β β actual_len = 50000 β alloc_len = 53248 (13 Γ 4KB) β β
β β actual_len = 300000 β alloc_len = 262144+49152 (256KB + 12Γ4KB) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β File Layout: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β storage/ β β
β β βββ data.00000 βββ First 4GB of allocations β β
β β βββ data.00001 βββ Next 4GB (auto-rotated) β β
β β βββ data.00002 β β
β β βββ ... β β
β β β β
β β Within each file: β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β offset 0 β offset 4096 β offset 8192 β ... β β β
β β β [Page A: 4KB] β [Page B: 8KB ββββββββββββββββββ [Page C: 4KB] β ... β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Allocator tracks: { file_id, current_offset } β β
β β When current_offset + alloc_len > 4GB: rotate to next file β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The PageHandler spawns a background thread for speculative page loading:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PREFETCH ARCHITECTURE β
β β
β PageHandler::new() spawns prefetch thread: β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Main Thread Prefetch Thread β β
β β ββββββββββββ βββββββββββββββ β β
β β β β
β β get_pages_with_prefetch( β β
β β page_ids: ["p0","p1","p2","p3","p4"], β β
β β k: 2 // fetch first 2 immediately β β
β β ) β β
β β β β β
β β βββ prefetch_tx.send(["p2","p3","p4"]) βββββββββββββΆ prefetch_rx β β
β β β (non-blocking) β β β
β β β βΌ β β
β β βββ get_pages(["p0","p1"]) process_prefetch() β β
β β (blocking, returns immediately) β β β
β β β β β
β β βββββββββββββ΄βββββββββββ β β
β β β 1. Dedupe IDs β β β
β β β 2. Filter cached β β β
β β β 3. Batch fetch disk β β β
β β β 4. Decompress all β β β
β β β 5. Store in UPC β β β
β β ββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Prefetch Loop: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β loop { β β
β β match rx.recv_timeout(1ms) { β β
β β Ok(batch) => { β β
β β // Drain channel to batch pending requests β β
β β while let Ok(more) = rx.try_recv() { β β
β β batch.extend(more); β β
β β } β β
β β process_prefetch_batch(batch); β β
β β } β β
β β Err(Timeout) => continue, β β
β β Err(Disconnected) => break, β β
β β } β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Benefits: β
β β’ Query gets first K pages immediately β
β β’ Remaining pages loaded in background β β
β β’ By the time query needs page K+1, it's likely cached β
β β’ Batched I/O reduces syscall overhead β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
For columns with few unique values (e.g., country codes, status flags), dictionary encoding dramatically reduces memory:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DICTIONARY ENCODING β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β struct DictionaryColumn { β β
β β keys: Vec<u16>, // Integer references (2 bytes each) β β
β β values: BytesColumn, // Unique string dictionary β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Example: 1 million rows with values ["US", "EU", "AP"] β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WITHOUT Dictionary (BytesColumn): β β
β β β β
β β offsets: [0, 2, 4, 6, 8, ...] (4 bytes Γ 1M = 4MB) β β
β β data: "USEUAPUSEUUS..." (2 bytes Γ 1M = 2MB) β β
β β β β
β β Total: ~6MB + overhead β β
β β Filtering: strcmp() per row β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β WITH Dictionary: β β
β β β β
β β values (dictionary): β β
β β offsets: [0, 2, 4, 6] (16 bytes) β β
β β data: "USEUAP" (6 bytes) β β
β β β β
β β keys: [0, 1, 2, 0, 1, 0, ...] (2 bytes Γ 1M = 2MB) β β
β β β β β β β
β β β β βββ "AP" β β
β β β ββββββ "EU" β β
β β βββββββββ "US" β β
β β β β
β β Total: ~2MB β β
β β Filtering: integer comparison (much faster) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Memory Savings: ~67% reduction β
β Filter Speedup: Integer compare vs string compare β
β β
β Auto-Detection: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β During page creation from disk: β β
β β β β
β β let unique_ratio = unique_values.len() / total_rows; β β
β β β β
β β if unique_ratio < 0.5 && unique_values.len() <= u16::MAX { β β
β β ColumnData::Dictionary(...) // Use dictionary β β
β β } else { β β
β β ColumnData::Text(...) // Use BytesColumn β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Each PageDescriptor stores statistics for query optimization:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β COLUMN STATISTICS β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β struct ColumnStats { β β
β β min_value: Option<String>, // Minimum value in page β β
β β max_value: Option<String>, // Maximum value in page β β
β β null_count: u64, // Number of NULL values β β
β β kind: ColumnStatsKind, // Int64 | Float64 | Text β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Stats Derivation (during write): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β derive_column_stats_from_page(page): β β
β β β β
β β For Int64: β β
β β min_val = values.iter().filter(not_null).min() β β
β β max_val = values.iter().filter(not_null).max() β β
β β β β
β β For Text/Dictionary: β β
β β min_val = values.iter().filter(not_null).min_by(bytes_compare) β β
β β max_val = values.iter().filter(not_null).max_by(bytes_compare) β β
β β β β
β β null_count = null_bitmap.count_ones() β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Potential Use (predicate pruning): β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Query: SELECT * FROM users WHERE age > 50 β β
β β β β
β β Page 0: stats.max = 45 β SKIP (max < 50, no rows can match) β β
β β Page 1: stats.min = 30, max = 60 β SCAN (might have matches) β β
β β Page 2: stats.min = 55 β SCAN (all rows match) β β
β β β β
β β Result: Skip entire pages without reading data β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LRU Cache with Lifecycle β
β β
β trait CacheLifecycle<T> { β
β fn on_evict(&self, id: &str, data: Arc<T>); β
β } β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β UncompressedCache β β
β β on_evict: compress β store in CompressedCache β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CompressedCache β β
β β on_evict: write to disk (if dirty) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Disk β β
β β Final resting place β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β This creates automatic data flow: hot data stays uncompressed, β
β warm data gets compressed, cold data goes to disk. β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
External API for database operations:
// DDL
create_table_from_plan(plan: &QueryPlan) -> Result<()>
// Point operations
read_row(table: &str, row_id: u64) -> Result<Row>
insert_sorted_row(table: &str, row: Row) -> Result<()>
delete_row(table: &str, row_id: u64) -> Result<()>
// Column operations
upsert_data_into_column(table: &str, col: &str, data: Vec<Value>) -> Result<()>
update_column_entry(table: &str, col: &str, row_id: u64, val: Value) -> Result<()>
// Range operations
range_scan_table_column_entry(
table: &str,
column: &str,
start: Bound<Value>,
end: Bound<Value>,
) -> Result<Vec<Entry>>