Skip to content

datafusion-contrib/datafusion-index-provider

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DataFusion Index Provider

A library that extends Apache DataFusion with index-based query acceleration for TableProvider implementations.

Overview

This crate implements a two-phase execution model for index-accelerated queries:

  1. Index Phase: Scan one or more secondary indexes to identify primary key values matching filter predicates
  2. Fetch Phase: Use primary key values to fetch complete records from underlying storage

This approach reduces I/O by limiting data retrieval to rows that satisfy query predicates, particularly effective for selective queries (typically < 10% of rows).

Installation

Add to your Cargo.toml:

[dependencies]
datafusion-index-provider = "0.1.0"

Core Concepts

Index Trait

Implement the Index trait to define how your index scans for matching primary key values:

use datafusion_index_provider::physical_plan::Index;

impl Index for MyIndex {
    fn name(&self) -> &str { "my_index" }
    fn column_name(&self) -> &str { "indexed_column" }
    fn index_schema(&self) -> SchemaRef {
        // Schema whose columns form the composite primary key
        // e.g. create_index_schema([Field::new("id", DataType::UInt64, false)])
    }
    fn table_name(&self) -> &str { "my_table" }

    fn scan(&self, filters: &[Expr], limit: Option<usize>)
        -> Result<SendableRecordBatchStream> {
        // Return RecordBatch stream with primary key columns matching the filters
    }

    fn statistics(&self) -> Statistics { /* index statistics */ }
}

RecordFetcher Trait

Implement RecordFetcher to define how complete records are retrieved using primary key values:

use datafusion_index_provider::physical_plan::fetcher::RecordFetcher;

#[async_trait]
impl RecordFetcher for MyTable {
    async fn fetch(&self, index_batch: RecordBatch) -> Result<RecordBatch> {
        // index_batch contains primary key columns as defined by index_schema()
        // Return complete records for those primary keys
    }
}

IndexedTableProvider Trait

Implement IndexedTableProvider on your TableProvider to expose available indexes:

use datafusion_index_provider::provider::IndexedTableProvider;

impl IndexedTableProvider for MyTable {
    fn indexes(&self) -> Result<Vec<Arc<dyn Index>>> {
        Ok(vec![
            Arc::new(MyAgeIndex::new()),
            Arc::new(MyDepartmentIndex::new()),
        ])
    }
}

Integration with TableProvider

Use the provided analysis and execution plan methods in your TableProvider::scan:

#[async_trait]
impl TableProvider for MyTable {
    async fn scan(
        &self,
        _state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        // Analyze which filters can use indexes
        let (indexable, remaining) = self.analyze_and_optimize_filters(filters)?;

        if !indexable.is_empty() {
            // Create index-accelerated execution plan
            self.create_execution_plan_with_indexes(
                &indexable,
                projection,
                &remaining,
                limit,
                self.schema(),
                Arc::new(self.clone()) as Arc<dyn RecordFetcher>,
            ).await
        } else {
            // Fall back to regular table scan
            self.create_full_scan_plan(projection, filters, limit).await
        }
    }
}

Query Optimization

Filter Analysis

The analyze_and_optimize_filters method recursively analyzes filter expressions to build an IndexFilter tree:

  • Single predicates: Matched to indexes via Index::supports_predicate()
  • AND expressions: All sub-expressions must be indexable; creates IndexFilter::And
  • OR expressions: All sub-expressions must be indexable; creates IndexFilter::Or
  • Nested combinations: Supports arbitrary nesting depth

All-or-nothing policy: If any part of an AND/OR expression cannot be indexed, the entire expression is treated as non-indexable.

Supported Query Patterns

-- Single index scan
SELECT * FROM employees WHERE age = 30;

-- Multi-index intersection (AND)
SELECT * FROM employees WHERE age > 25 AND department = 'Engineering';

-- Multi-index union (OR) with automatic deduplication
SELECT * FROM employees WHERE age < 25 OR department = 'Sales';

-- Complex nested expressions
SELECT * FROM employees
WHERE (age > 30 AND department = 'Engineering')
   OR (age < 25 AND department = 'Sales');

Execution Plans

The library generates optimized physical plans based on filter structure:

Single Index

RecordFetchExec
└── IndexScanExec(age_index, [age = 30])

AND - Index Intersection

Uses Hash or SortMerge joins to intersect primary key values:

RecordFetchExec
└── Projection(PK columns)
    └── HashJoin(on: PK columns)
        ├── IndexScanExec(age_index, [age > 25])
        └── IndexScanExec(dept_index, [department = 'Engineering'])

Join Selection:

  • SortMergeJoin: When both indexes return ordered primary keys (Index::is_ordered())
  • HashJoin: When inputs are unordered (builds hash table from left side)

OR - Union with Deduplication

Uses UnionExec + AggregateExec to deduplicate overlapping primary key values:

RecordFetchExec
└── AggregateExec(GROUP BY PK columns)
    └── UnionExec
        ├── IndexScanExec(age_index, [age < 25])
        └── IndexScanExec(dept_index, [department = 'Sales'])

This prevents duplicate record fetches when primary key values appear in multiple index results.

Reference Implementation

See tests/common/ for complete working examples:

Single-column primary key:

Composite primary key:

Integration tests in tests/integration_tests.rs and tests/composite_pk_tests.rs demonstrate query scenarios including deeply nested AND/OR combinations with both single and composite primary keys.

Limitations

  1. Projection pushdown: Not currently supported for indexed scans (fetches all columns)
  2. Remaining filters: Non-indexed filters applied after record fetch, not during index scan
  3. Partitioning: Index scans produce single partition output

Testing

Run the test suite:

# Run all tests
cargo test

# Run with logging
RUST_LOG=debug cargo test

# Run integration tests only
cargo test --test integration_tests

The test suite includes:

  • 27 unit tests covering execution plan generation and streaming
  • 36 integration tests covering query scenarios from simple to deeply nested, with both single and composite primary keys

Compatibility

  • DataFusion: 52.x
  • Rust: 1.75+ (MSRV)
  • Arrow: Compatible with DataFusion's Arrow version

Architecture Details

For in-depth technical documentation, see:

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.

About

Prototype approaches to implementing Indexes in DataFusion

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages