Skip to content

Commit

Permalink
fix: Add streaming to reading VCFs (#101)
Browse files Browse the repository at this point in the history
* fix: Add streaming to reading VCFs

* fix: Table names obfuscation

* Fix test and bumping for release
  • Loading branch information
mwiewior authored Feb 28, 2025
1 parent 6f56266 commit defabb3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars_bio"
version = "0.6.2"
version = "0.6.3"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion polars_bio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions"


__version__ = "0.6.2"
__version__ = "0.6.3"
__all__ = [
"overlap",
"nearest",
Expand Down
29 changes: 23 additions & 6 deletions polars_bio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
VcfReadOptions,
py_register_table,
py_scan_table,
py_stream_scan_table,
)

from .context import ctx
from .range_op_helpers import stream_wrapper


def read_bam(path: str) -> pl.LazyFrame:
Expand Down Expand Up @@ -53,19 +55,26 @@ def read_bam(path: str) -> pl.LazyFrame:


def read_vcf(
path: str, info_fields: Union[list[str], None] = None, thread_num: int = 1
) -> pl.LazyFrame:
path: str,
info_fields: Union[list[str], None] = None,
thread_num: int = 1,
streaming: bool = False,
) -> Union[pl.LazyFrame, pl.DataFrame]:
"""
Read a VCF file into a LazyFrame.
Parameters:
path: The path to the VCF file.
info_fields: The fields to read from the INFO column.
thread_num: The number of threads to use for reading the VCF file.
streaming: Whether to read the VCF file in streaming mode.
"""
vcf_read_options = VcfReadOptions(info_fields=info_fields, thread_num=thread_num)
read_options = ReadOptions(vcf_read_options=vcf_read_options)
return file_lazy_scan(path, InputFormat.Vcf, read_options)
if streaming:
return read_file(path, InputFormat.Vcf, read_options, streaming)
else:
return file_lazy_scan(path, InputFormat.Vcf, read_options)


def read_fasta(path: str) -> pl.LazyFrame:
Expand Down Expand Up @@ -143,8 +152,11 @@ def _overlap_source(


def read_file(
path: str, input_format: InputFormat, read_options: ReadOptions
) -> pl.DataFrame:
path: str,
input_format: InputFormat,
read_options: ReadOptions,
streaming: bool = False,
) -> Union[pl.LazyFrame, pl.DataFrame]:
"""
Read a file into a DataFrame.
Expand All @@ -154,14 +166,19 @@ def read_file(
The path to the file.
input_format : InputFormat
The input format of the file.
read_options : ReadOptions, e.g. VcfReadOptions
streaming: Whether to read the file in streaming mode.
Returns
-------
pl.DataFrame
The DataFrame.
"""
table = py_register_table(ctx, path, input_format, read_options)
return py_scan_table(ctx, table.name)
if streaming:
return stream_wrapper(py_stream_scan_table(ctx, table.name))
else:
return py_scan_table(ctx, table.name)


def read_table(path: str, schema: Dict = None, **kwargs) -> pl.LazyFrame:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "polars-bio"
version = "0.6.2"
version = "0.6.3"
description = "Blazing fast genomic operations on large Python dataframes"
authors = []
requires-python = ">=3.9"
Expand Down
73 changes: 44 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,34 +160,6 @@ fn stream_range_operation_scan(
})
}

// #[pyfunction]
// #[pyo3(signature = (py_ctx, df_path1, read_options1=None, limit=None))]
// fn unary_operation_scan(
// py: Python<'_>,
// py_ctx: &PyBioSessionContext,
// df_path1: String,
// read_options1: Option<ReadOptions>,
// limit: Option<usize>
// ) -> PyResult<PyDataFrame> {
// py.allow_threads(|| {
// let rt = Runtime::new().unwrap();
// let ctx = &py_ctx.ctx;
//
// rt.block_on(register_table(
// ctx,
// &df_path1,
// LEFT_TABLE,
// get_input_format(&df_path1),
// read_options1,
// ));
// let df = rt.block_on(ctx.sql(&format!("SELECT * FROM {}", LEFT_TABLE))).unwrap();
// match limit {
// Some(l) => Ok(PyDataFrame::new(df.limit(0, Some(l))?)),
// _ => Ok(PyDataFrame::new(df)),
// }
// })
// }

#[pyfunction]
#[pyo3(signature = (py_ctx, path, input_format, read_options=None))]
fn py_register_table(
Expand All @@ -208,7 +180,8 @@ fn py_register_table(
.unwrap()
.to_string()
.replace(&format!(".{}", input_format).to_string().to_lowercase(), "")
.replace(".", "_");
.replace(".", "_")
.replace("-", "_");
rt.block_on(register_table(
ctx,
&path,
Expand Down Expand Up @@ -253,6 +226,47 @@ fn py_scan_table(
})
}

#[pyfunction]
#[pyo3(signature = (py_ctx, table_name))]
fn py_stream_scan_table(
py: Python<'_>,
py_ctx: &PyBioSessionContext,
table_name: String,
) -> PyResult<PyLazyFrame> {
#[allow(clippy::useless_conversion)]
py.allow_threads(|| {
let rt = Runtime::new().unwrap();
let ctx = &py_ctx.ctx;

let df = rt.block_on(ctx.session.table(&table_name))?;
let schema = df.schema().as_arrow();
let polars_schema = convert_arrow_rb_schema_to_polars_df_schema(schema).unwrap();
debug!("Schema: {:?}", polars_schema);
let args = ScanArgsAnonymous {
schema: Some(Arc::new(polars_schema)),
name: "SCAN polars-bio",
..ScanArgsAnonymous::default()
};
debug!(
"{}",
ctx.session
.state()
.config()
.options()
.execution
.target_partitions
);
let stream = rt.block_on(df.execute_stream()).unwrap();
let scan = RangeOperationScan {
df_iter: Arc::new(Mutex::new(stream)),
rt: Runtime::new().unwrap(),
};
let function = Arc::new(scan);
let lf = LazyFrame::anonymous_scan(function, args).map_err(PyPolarsErr::from)?;
Ok(lf.into())
})
}

//TODO: not exposed Polars used for now
#[pyfunction]
fn py_read_table(
Expand Down Expand Up @@ -281,6 +295,7 @@ fn polars_bio(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(py_scan_table, m)?)?;
m.add_function(wrap_pyfunction!(py_register_table, m)?)?;
m.add_function(wrap_pyfunction!(py_read_table, m)?)?;
m.add_function(wrap_pyfunction!(py_stream_scan_table, m)?)?;
// m.add_function(wrap_pyfunction!(unary_operation_scan, m)?)?;
m.add_class::<PyBioSessionContext>()?;
m.add_class::<FilterOp>()?;
Expand Down

0 comments on commit defabb3

Please sign in to comment.