Conversation
Benchmarks: PolarSignals ProfilingSummary
Detailed Results Table
|
Benchmarks: TPC-H SF=1 on NVMESummary
Detailed Results Table
|
Benchmarks: FineWeb NVMeSummary
Detailed Results Table
|
Benchmarks: TPC-H SF=1 on S3Summary
Detailed Results Table
|
Benchmarks: TPC-DS SF=1 on NVMESummary
Detailed Results Table
|
Benchmarks: TPC-H SF=10 on NVMESummary
Detailed Results Table
|
Benchmarks: FineWeb S3Summary
Detailed Results Table
|
Benchmarks: Statistical and Population GeneticsSummary
Detailed Results Table
|
Benchmarks: TPC-H SF=10 on S3Summary
Detailed Results Table
|
Benchmarks: Clickbench on NVMESummary
Detailed Results Table
|
Polar Signals Profiling ResultsLatest Run
Previous Runs (7)
Powered by Polar Signals Cloud |
Benchmarks: Random AccessSummary
Detailed Results Table
|
|
Shall we make a second PR with the CI flag switched |
|
Yeah I have lots of follow ups before we even thinking about cutting over. Some queries are 15x slower due to lack of stats for example |
| // Vortex handles parallelism internally — always use a single partition. | ||
| let mut this = self.clone(); | ||
| this.num_partitions = NonZero::new(target_partitions) | ||
| .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?; | ||
| Ok(Some(Arc::new(this))) |
There was a problem hiding this comment.
I want to store the updated num_partitions and use that as a concurrency hint for ourselves
| } | ||
|
|
||
| /// Convert a Vortex [`Option<Precision>`] to a DataFusion [`Precision`](DFPrecision). | ||
| fn estimate_to_df_precision(est: &Option<Precision<u64>>) -> DFPrecision<usize> { |
There was a problem hiding this comment.
we have the PrecisionExt trait for this
| _state: &dyn Session, | ||
| // Unlike filters and limit, we _do_ apply the projection at this stage since DataFusion's | ||
| // physical projection expression push-down is still in its early stages. In theory, we | ||
| // could also wait to apply the projection until we can push down over the physical plan. | ||
| projection: Option<&Vec<usize>>, |
There was a problem hiding this comment.
you can, not sure what you mean here
| if let Some(size) = file.size { | ||
| options = options.with_file_size(size); | ||
| } | ||
| if let Some(footer) = session.multi_file().get_footer(&cache_key) { |
There was a problem hiding this comment.
storing this on the session will cause the same issue we had with metrics, where memory usage can grow indefinitely I see its capped
| impl Default for MultiFileSession { | ||
| fn default() -> Self { | ||
| Self { | ||
| footer_cache: moka::sync::Cache::builder().max_capacity(10_000).build(), |
There was a problem hiding this comment.
10k footers is ~640MB, why not use the approximated size?
| //! # Future Work | ||
| //! | ||
| //! - **Schema union**: Allow missing columns (filled with nulls) and compatible type upcasts | ||
| //! across sources instead of requiring exact dtype matches. | ||
| //! - **Hive-style partitioning**: Extract partition values from file paths (e.g. `year=2024/month=01/`) | ||
| //! and expose them as virtual columns. | ||
| //! - **Virtual columns**: `filename`, `file_row_number`, `file_index`. | ||
| //! - **Per-file statistics**: Merge column statistics across sources for planner hints. | ||
| //! - **Error resilience**: Skip failed sources instead of aborting the entire scan. |
There was a problem hiding this comment.
nit - seems like this should be an issue and not docs?
| for (i, child) in children.iter().enumerate() { | ||
| match child { | ||
| MultiChild::Opened(ds) => ready.push_back(ds.clone()), | ||
| MultiChild::Deferred(factory) => deferred.push_back((i, factory.clone())), | ||
| } | ||
| } | ||
| drop(children); |
There was a problem hiding this comment.
can't you std::mem::take the children and then children.into_iter()?
| use vortex::file::multi::MultiFileDataSource; | ||
| use vortex::io::object_store::ObjectStoreFileSystem; | ||
| use vortex::io::session::RuntimeSessionExt; | ||
| use vortex::scan::api::DataSource as _; | ||
| use vortex_datafusion::v2::VortexTable; |
There was a problem hiding this comment.
move to top? also - lets log here so its visible which API is used
| /// in an array is guaranteed to be less than or equal to the inexact value, but equal to the exact | ||
| /// value. | ||
| /// | ||
| // TODO(ngates): should we model Unknown as a variant of Precision? Or have Option<Precision<T>>? |
| } | ||
|
|
||
| /// Approximate the in-memory size of a footer. | ||
| fn estimate_footer_size(footer: &Footer) -> usize { |
| &self, | ||
| target_partitions: usize, | ||
| _repartition_file_min_size: usize, | ||
| _output_ordering: Option<LexOrdering>, |
There was a problem hiding this comment.
if output ordering is provided, we might need to do something else (potentially, keep a single overall partition?).
| let fs = self | ||
| .fs | ||
| .map(Ok) | ||
| .unwrap_or_else(|| create_local_filesystem(&self.session))?; |
There was a problem hiding this comment.
| let fs = self | |
| .fs | |
| .map(Ok) | |
| .unwrap_or_else(|| create_local_filesystem(&self.session))?; | |
| let fs = match self.fs { | |
| Some(fs) => fs, | |
| None => create_local_filesystem(&self.session)?, | |
| }; |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| #![allow(clippy::unwrap_used)] |
There was a problem hiding this comment.
thought its already allowed in tests?
| /// Serialize a split from this data source. | ||
| fn serialize_split(&self, split: &dyn Split) -> VortexResult<Vec<u8>>; | ||
| /// Serialize the [`DataSource`] to pass to a remote worker. | ||
| fn serialize(&self) -> VortexResult<Option<Vec<u8>>> { |
There was a problem hiding this comment.
do we need to include serialization in this PR? Is it used anywhere?
| fn row_count_estimate(&self) -> Option<Precision<u64>> { | ||
| None | ||
| } | ||
|
|
||
| /// Returns a scan over the source. | ||
| async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef>; | ||
| /// Returns an estimate of the byte size of the un-filtered source. | ||
| fn byte_size_estimate(&self) -> Option<Precision<u64>> { | ||
| None |
There was a problem hiding this comment.
nit - estimate in the name is redundant as the type itself communicates that
| /// Sets the number of deferred sources to open concurrently during scanning. | ||
| /// | ||
| /// Higher values overlap more file-opening I/O with split execution but use more memory | ||
| /// for in-flight metadata. Defaults to 8. |
There was a problem hiding this comment.
nit - make the const public and point to it here, so docs don't go stale.
| /// opened source without re-opening. | ||
| pub struct MultiDataSource { | ||
| dtype: DType, | ||
| children: Arc<Mutex<Vec<MultiChild>>>, |
There was a problem hiding this comment.
I don't think the Mutex is need, if you remove it here and make MultiChild Clone everything works, cloning the vec a couple of times seems really minor and way less dangerous that running into an exclusive lock
| found | ||
| } | ||
|
|
||
| fn is_decimal(dt: &DataType) -> bool { |
There was a problem hiding this comment.
nit - this is not needed anymore 🥳 there should be DataType::is_decimal
|
I don't quite know how to review this. My preference is starting with something fast that then could have adjusted api instead of starting with a desired api that then cannot be made fast |
| // expression (i.e. we require that coercion semantics have already been performed by the | ||
| // engine). Instead, DataFusion will push down filters through the physical plan via | ||
| // the VortexScanSource DataSource. | ||
| _filters: &[Expr], |
There was a problem hiding this comment.
we can extract column references from the filter here, which will tell us which statistics are worth fetching
| // Similarly for limit, we wait until we can push down over the physical plan. | ||
| _limit: Option<usize>, |
There was a problem hiding this comment.
I think we can use limit if there are no expressions
| pub trait DataSourceFactory: 'static + Send + Sync { | ||
| /// Opens the data source, or returns `None` if it should be skipped. | ||
| async fn open(&self) -> VortexResult<Option<DataSourceRef>>; | ||
| } |
There was a problem hiding this comment.
I might be missing some detail here, but DataSourceFactory and DataSourceOpener seems really similar, not sure why we want both
| } | ||
|
|
||
| fn splits(self: Box<Self>) -> SplitStream { | ||
| stream::unfold( |
There was a problem hiding this comment.
unfold is so useful but I really hate it 😢
|
Is there maybe a way to set these up as a additional benchmark variant for one of the benchmarks so we can see the constant comparison? |
|
+1 for @robert3005 idea to benchmark both variants |
Experiment to use the Scan API from DuckDB and DataFusion integrations.
This PR introduces:
vortex::file::MultiFileDataSource- a data source builder that takes a listing of files, (in the future) unifies schemas, adds support for hive-style partitioning columns, etc, etc.vortex::scan::api- improvements to the Scan API as have become apparent while implementing support in two engines. I'd like to implement more engines before we decide on a path to stability.