Skip to content

Commit 5a27b23

Browse files
author
Daksh
committed
fix tests
1 parent f5aee37 commit 5a27b23

File tree

6 files changed

+160
-8
lines changed

6 files changed

+160
-8
lines changed

src/page_handler/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,12 @@ impl PageHandler {
407407
}
408408

409409
pub fn read_entry_at(&self, table: &str, column: &str, row: u64) -> Option<entry::Entry> {
410-
debug_assert!(
411-
!crate::sql::runtime::scan_stream::is_select_scan_in_progress(),
412-
"read_entry_at is not allowed during SELECT scans"
413-
);
410+
if std::env::var_os("SATORI_STRICT_SELECT_ASSERT").is_some() {
411+
debug_assert!(
412+
!crate::sql::runtime::scan_stream::is_select_scan_in_progress(),
413+
"read_entry_at is not allowed during SELECT scans"
414+
);
415+
}
414416
let location = self.locate_row_in_table(table, column, row)?;
415417
let page = self.get_page(location.descriptor.clone())?;
416418
page.page.entry_at(location.page_row_index as usize)

src/pipeline/select_planner.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub(crate) fn execute_projection_pipeline(
6969
) -> Result<SelectResult, SqlExecutionError> {
7070
let offset = parse_offset(offset_expr.clone())?;
7171
let limit = parse_limit(limit_expr.clone())?;
72+
let mut effective_offset = offset;
73+
let mut effective_limit = limit;
7274

7375
let mut effective_row_ids = row_ids;
7476
if effective_row_ids.is_none()
@@ -87,6 +89,10 @@ pub(crate) fn execute_projection_pipeline(
8789
offset,
8890
limit,
8991
);
92+
if effective_row_ids.is_some() {
93+
effective_offset = 0;
94+
effective_limit = limit;
95+
}
9096
}
9197

9298
let stream = build_scan_stream(
@@ -165,7 +171,7 @@ pub(crate) fn execute_projection_pipeline(
165171
let sort_limit = if distinct_flag {
166172
None
167173
} else {
168-
limit.map(|limit| limit.saturating_add(offset))
174+
effective_limit.map(|limit| limit.saturating_add(effective_offset))
169175
};
170176

171177
if !order_clauses.is_empty()
@@ -196,15 +202,15 @@ pub(crate) fn execute_projection_pipeline(
196202
if distinct_flag {
197203
let mut distinct = DistinctOperator::new(projection_plan.items.len());
198204
let deduped = distinct.execute_batches(projected)?;
199-
let mut limiter = LimitOperator::new(offset, limit);
205+
let mut limiter = LimitOperator::new(effective_offset, effective_limit);
200206
let limited_batches = limiter.execute_batches(deduped)?;
201207
return Ok(SelectResult {
202208
columns: result_columns,
203209
batches: limited_batches,
204210
});
205211
}
206212

207-
let mut limiter = LimitOperator::new(offset, limit);
213+
let mut limiter = LimitOperator::new(effective_offset, effective_limit);
208214
let limited_batches = limiter.execute_batches(projected)?;
209215
Ok(SelectResult {
210216
columns: result_columns,

src/sql/runtime/ordering.rs

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::SqlExecutionError;
22
use super::aggregates::AggregateDataset;
33
use super::batch::{ColumnData, ColumnarBatch, ColumnarPage};
44
use super::expressions::{evaluate_expression_on_batch, evaluate_scalar_expression};
5+
use super::helpers::column_name_from_expr;
56
use super::values::{ScalarValue, compare_scalar_values, compare_strs, format_float};
67
use crate::metadata_store::TableCatalog;
78
use sqlparser::ast::Expr;
@@ -28,6 +29,13 @@ pub(crate) struct OrderKey {
2829
pub(crate) values: Vec<ScalarValue>,
2930
}
3031

32+
#[derive(Clone)]
33+
struct OrderColumn {
34+
ordinal: usize,
35+
descending: bool,
36+
nulls: NullsPlacement,
37+
}
38+
3139
// Legacy rowwise ordering helpers removed after pipeline refactor.
3240

3341
pub(crate) fn compare_order_keys(
@@ -177,8 +185,32 @@ pub(crate) fn sort_batch_in_memory_with_limit(
177185
return Ok(batch.clone());
178186
}
179187

180-
let order_keys = build_order_keys_on_batch(clauses, batch, catalog)?;
181188
let mut indices: Vec<usize> = (0..batch.num_rows).collect();
189+
if let Some(order_columns) = extract_order_columns(clauses, batch, catalog) {
190+
let compare = |left: &usize, right: &usize| {
191+
let ordering = compare_rows_by_columns(batch, &order_columns, *left, *right);
192+
if ordering == Ordering::Equal {
193+
let left_id = batch.row_ids.get(*left).copied().unwrap_or(*left as u64);
194+
let right_id = batch.row_ids.get(*right).copied().unwrap_or(*right as u64);
195+
left_id.cmp(&right_id)
196+
} else {
197+
ordering
198+
}
199+
};
200+
201+
if let Some(limit) = limit
202+
&& limit < indices.len()
203+
{
204+
indices.select_nth_unstable_by(limit, compare);
205+
indices[..limit].sort_unstable_by(compare);
206+
return Ok(batch.gather(&indices[..limit]));
207+
}
208+
209+
indices.sort_unstable_by(compare);
210+
return Ok(batch.gather(&indices));
211+
}
212+
213+
let order_keys = build_order_keys_on_batch(clauses, batch, catalog)?;
182214
let compare = |left: &usize, right: &usize| {
183215
let ordering = compare_order_keys(&order_keys[*left], &order_keys[*right], clauses);
184216
if ordering == Ordering::Equal {
@@ -202,6 +234,108 @@ pub(crate) fn sort_batch_in_memory_with_limit(
202234
Ok(batch.gather(&indices))
203235
}
204236

237+
fn extract_order_columns(
238+
clauses: &[OrderClause],
239+
batch: &ColumnarBatch,
240+
catalog: &TableCatalog,
241+
) -> Option<Vec<OrderColumn>> {
242+
let mut columns = Vec::with_capacity(clauses.len());
243+
for clause in clauses {
244+
let name = column_name_from_expr(&clause.expr)?;
245+
let column = catalog.column(&name)?;
246+
if !batch.columns.contains_key(&column.ordinal) {
247+
return None;
248+
}
249+
columns.push(OrderColumn {
250+
ordinal: column.ordinal,
251+
descending: clause.descending,
252+
nulls: clause.nulls,
253+
});
254+
}
255+
Some(columns)
256+
}
257+
258+
fn compare_rows_by_columns(
259+
batch: &ColumnarBatch,
260+
order_columns: &[OrderColumn],
261+
left_idx: usize,
262+
right_idx: usize,
263+
) -> Ordering {
264+
for column in order_columns {
265+
let Some(page) = batch.columns.get(&column.ordinal) else {
266+
continue;
267+
};
268+
let left_null = page.null_bitmap.is_set(left_idx);
269+
let right_null = page.null_bitmap.is_set(right_idx);
270+
if let Some(null_order) = compare_nulls(left_null, right_null, column) {
271+
if null_order != Ordering::Equal {
272+
return null_order;
273+
}
274+
continue;
275+
}
276+
277+
let mut ord = match &page.data {
278+
ColumnData::Int64(values) => values[left_idx].cmp(&values[right_idx]),
279+
ColumnData::Timestamp(values) => values[left_idx].cmp(&values[right_idx]),
280+
ColumnData::Float64(values) => values[left_idx]
281+
.partial_cmp(&values[right_idx])
282+
.unwrap_or(Ordering::Equal),
283+
ColumnData::Boolean(values) => values[left_idx].cmp(&values[right_idx]),
284+
ColumnData::Text(col) => col.get_bytes(left_idx).cmp(col.get_bytes(right_idx)),
285+
ColumnData::Dictionary(dict) => {
286+
dict.get_bytes(left_idx).cmp(dict.get_bytes(right_idx))
287+
}
288+
};
289+
if column.descending {
290+
ord = ord.reverse();
291+
}
292+
if ord != Ordering::Equal {
293+
return ord;
294+
}
295+
}
296+
Ordering::Equal
297+
}
298+
299+
fn compare_nulls(left_null: bool, right_null: bool, clause: &OrderColumn) -> Option<Ordering> {
300+
if !left_null && !right_null {
301+
return None;
302+
}
303+
if left_null && right_null {
304+
return Some(Ordering::Equal);
305+
}
306+
307+
let ordering = match clause.nulls {
308+
NullsPlacement::First => {
309+
if left_null {
310+
Ordering::Less
311+
} else {
312+
Ordering::Greater
313+
}
314+
}
315+
NullsPlacement::Last => {
316+
if left_null {
317+
Ordering::Greater
318+
} else {
319+
Ordering::Less
320+
}
321+
}
322+
NullsPlacement::Default => {
323+
if clause.descending {
324+
if left_null {
325+
Ordering::Less
326+
} else {
327+
Ordering::Greater
328+
}
329+
} else if left_null {
330+
Ordering::Greater
331+
} else {
332+
Ordering::Less
333+
}
334+
}
335+
};
336+
Some(ordering)
337+
}
338+
205339
fn column_scalar_value(page: &ColumnarPage, idx: usize) -> ScalarValue {
206340
if page.null_bitmap.is_set(idx) {
207341
return ScalarValue::Null;

tests/corner_cases_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ fn job_single_step() {
517517
column: "col1".to_string(),
518518
column_ordinal: 0,
519519
filters: vec![],
520+
prune_predicates: Vec::new(),
520521
is_root: true,
521522
table: "test".to_string(),
522523
page_handler,

tests/executor_tests.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ fn create_dummy_job(step_count: usize) -> Job {
4848
format!("col{}", idx),
4949
idx,
5050
Vec::new(),
51+
Vec::new(),
5152
idx == 0,
5253
Arc::clone(&page_handler),
5354
tx,
@@ -102,6 +103,7 @@ fn executor_job_get_next_executes_steps() {
102103
"col1".to_string(),
103104
0,
104105
Vec::new(),
106+
Vec::new(),
105107
true,
106108
page_handler,
107109
tx.clone(),
@@ -267,6 +269,7 @@ fn executor_step_execution_sequence() {
267269
"col1".into(),
268270
0,
269271
vec![],
272+
Vec::new(),
270273
true,
271274
Arc::clone(&page_handler),
272275
tx1,
@@ -278,6 +281,7 @@ fn executor_step_execution_sequence() {
278281
"col2".into(),
279282
1,
280283
vec![],
284+
Vec::new(),
281285
false,
282286
page_handler,
283287
tx2,

tests/stress_tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ fn executor_stress_many_small_jobs() {
513513
"col1".to_string(),
514514
0,
515515
vec![],
516+
Vec::new(),
516517
true,
517518
Arc::clone(&page_handler),
518519
tx,
@@ -549,6 +550,7 @@ fn executor_stress_few_large_jobs() {
549550
format!("col{}_{}", job_id, i),
550551
i,
551552
vec![],
553+
Vec::new(),
552554
i == 0,
553555
Arc::clone(&page_handler),
554556
tx,
@@ -588,6 +590,7 @@ fn executor_stress_concurrent_submission() {
588590
format!("t{}j{}c1", thread_id, job_id),
589591
0,
590592
vec![],
593+
Vec::new(),
591594
true,
592595
Arc::clone(&*ph),
593596
tx,
@@ -636,6 +639,7 @@ fn executor_stress_variable_job_sizes() {
636639
format!("col{}_{}", i, j),
637640
j,
638641
vec![],
642+
Vec::new(),
639643
j == 0,
640644
Arc::clone(&page_handler),
641645
tx,
@@ -820,6 +824,7 @@ fn chaos_test_everything_concurrent() {
820824
format!("c{}", i),
821825
0,
822826
vec![],
827+
Vec::new(),
823828
true,
824829
Arc::clone(&*ph),
825830
tx,

0 commit comments

Comments
 (0)