From d501c5eb741e507f6c915c9fc04e9d46749cd0e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wiewi=C3=B3rka?= Date: Thu, 16 Jan 2025 17:31:32 +0100 Subject: [PATCH] fix: Workaround for issue-71 (#72) * fix: Workaround for issue-71 * fix: Missing overlap/nearest under pb schema * chore: Performance doc update --- Cargo.lock | 2 +- Cargo.toml | 2 +- docs/performance.md | 96 +++++++++++++++++++--------------- polars_bio/__init__.py | 9 +--- polars_bio/context.py | 11 +++- polars_bio/logging.py | 6 +++ polars_bio/polars_ext.py | 47 +++++++++++++++++ polars_bio/range_op_helpers.py | 9 ++++ polars_bio/range_op_io.py | 5 +- pyproject.toml | 2 +- src/context.rs | 7 ++- src/lib.rs | 4 +- src/scan.rs | 29 ++++++++-- tests/test_polars_ext.py | 70 +++++++++++++++++++++++++ 14 files changed, 236 insertions(+), 63 deletions(-) create mode 100644 polars_bio/logging.py diff --git a/Cargo.lock b/Cargo.lock index 8d670e5..415bb0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5213,7 +5213,7 @@ dependencies = [ [[package]] name = "polars_bio" -version = "0.5.0" +version = "0.5.4" dependencies = [ "arrow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index a632a84..12e9d51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "polars_bio" -version = "0.5.3" +version = "0.5.4" edition = "2021" [lib] diff --git a/docs/performance.md b/docs/performance.md index 5b8e2ee..c001185 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -1038,11 +1038,13 @@ the `parallel` dataset was used (see [Test datasets](#test-datasets)) | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 0.034696 | 0.036409 | 0.035352 | 0.83x | -| polars_bio_pandas_lf | 0.038730 | 0.040108 | 0.039277 | 0.75x | -| polars_bio_pandas_pd | 0.037271 | 0.038367 | 0.037958 | 0.77x | -| polars_bio_polars_eager | 0.029277 | 0.029464 | 0.029374 | **1.00x** | -| polars_bio_polars_lazy | 0.029450 | 0.030083 | 0.029727 | **0.99x** | +| polars_bio | 0.035567 | 0.036777 | 0.035995 | 0.91x | +| polars_bio_pandas_lf | 0.040237 | 0.041256 | 0.040694 | 0.80x | +| polars_bio_pandas_pd | 0.040554 | 0.040888 | 0.040761 | 0.80x | +| polars_bio_polars_eager | 0.032051 | 0.033022 | 0.032693 | **1.00x** | +| polars_bio_polars_lazy | 0.034346 | 0.035225 | 0.034775 | **0.94x** | + + @@ -1050,22 +1052,25 @@ the `parallel` dataset was used (see [Test datasets](#test-datasets)) | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 0.091433 | 0.091646 | 0.091509 | 0.90x | -| polars_bio_pandas_lf | 0.104335 | 0.133236 | 0.114009 | 0.72x | -| polars_bio_pandas_pd | 0.102595 | 0.104520 | 0.103489 | 0.79x | -| polars_bio_polars_eager | 0.082429 | 0.085214 | 0.083439 | **0.98x** | -| polars_bio_polars_lazy | 0.081784 | 0.082521 | 0.082178 | **1.00x** | +| polars_bio | 0.094768 | 0.096217 | 0.095266 | **1.00x** | +| polars_bio_pandas_lf | 0.163054 | 0.164207 | 0.163713 | 0.58x | +| polars_bio_pandas_pd | 0.163245 | 0.166200 | 0.165022 | 0.58x | +| polars_bio_polars_eager | 0.142344 | 0.145895 | 0.144110 | **0.66x** | +| polars_bio_polars_lazy | 0.149738 | 0.150299 | 0.149929 | 0.64x | + + ##### 1-0 | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 0.137830 | 0.161670 | 0.145978 | 0.86x | -| polars_bio_pandas_lf | 0.153229 | 0.158764 | 0.155538 | 0.81x | -| polars_bio_pandas_pd | 0.153903 | 0.161426 | 0.156792 | 0.80x | -| polars_bio_polars_eager | 0.124594 | 0.130227 | 0.127729 | **0.98x** | -| polars_bio_polars_lazy | 0.124429 | 0.126208 | 0.125316 | **1.00x** | +| polars_bio | 0.145564 | 0.151407 | 0.147679 | **1.00x** | +| polars_bio_pandas_lf | 0.238292 | 0.240374 | 0.239504 | 0.62x | +| polars_bio_pandas_pd | 0.239330 | 0.252445 | 0.244414 | 0.60x | +| polars_bio_polars_eager | 0.208421 | 0.214513 | 0.210896 | **0.70x** | +| polars_bio_polars_lazy | 0.219629 | 0.222126 | 0.220908 | 0.67x | + #### M-size @@ -1074,22 +1079,24 @@ the `parallel` dataset was used (see [Test datasets](#test-datasets)) | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 0.218274 | 0.241310 | 0.232544 | **1.00x** | -| polars_bio_pandas_lf | 0.279482 | 0.288486 | 0.283349 | 0.82x | -| polars_bio_pandas_pd | 0.292259 | 0.301428 | 0.295783 | 0.79x | -| polars_bio_polars_eager | 0.239037 | 0.242774 | 0.241256 | 0.96x | -| polars_bio_polars_lazy | 0.236283 | 0.243813 | 0.239054 | **0.97x** | +| polars_bio | 0.224327 | 0.227891 | 0.225606 | **1.00x** | +| polars_bio_pandas_lf | 0.377938 | 0.378380 | 0.378205 | 0.60x | +| polars_bio_pandas_pd | 0.413825 | 0.415470 | 0.414630 | 0.54x | +| polars_bio_polars_eager | 0.332434 | 0.335960 | 0.334393 | **0.67x** | +| polars_bio_polars_lazy | 0.347608 | 0.350382 | 0.349330 | 0.65x | + ##### 7-3 | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 0.199343 | 0.224108 | 0.208181 | **1.00x** | -| polars_bio_pandas_lf | 0.284750 | 0.296619 | 0.290834 | 0.72x | -| polars_bio_pandas_pd | 0.308035 | 0.312373 | 0.309687 | 0.67x | -| polars_bio_polars_eager | 0.248849 | 0.251454 | 0.250025 | **0.83x** | -| polars_bio_polars_lazy | 0.252292 | 0.252924 | 0.252585 | 0.82x | +| polars_bio | 0.206701 | 0.217080 | 0.210280 | **1.00x** | +| polars_bio_pandas_lf | 0.345310 | 0.355560 | 0.349561 | 0.60x | +| polars_bio_pandas_pd | 0.415459 | 0.417442 | 0.416609 | 0.50x | +| polars_bio_polars_eager | 0.311204 | 0.313540 | 0.312487 | **0.67x** | +| polars_bio_polars_lazy | 0.321170 | 0.322826 | 0.321981 | 0.65x | + #### L-size @@ -1098,32 +1105,37 @@ the `parallel` dataset was used (see [Test datasets](#test-datasets)) | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 2.933145 | 3.005864 | 2.958892 | **1.00x** | -| polars_bio_pandas_lf | 6.207847 | 6.345032 | 6.274848 | 0.47x | -| polars_bio_pandas_pd | 7.267690 | 7.322592 | 7.298297 | 0.41x | -| polars_bio_polars_eager | 6.114975 | 6.307444 | 6.194726 | **0.48x** | -| polars_bio_polars_lazy | 6.124255 | 6.229623 | 6.170878 | **0.48x** | +| polars_bio | 2.750666 | 2.895516 | 2.802942 | **1.00x** | +| polars_bio_pandas_lf | 3.525844 | 3.646709 | 3.592018 | 0.78x | +| polars_bio_pandas_pd | 6.455399 | 6.539737 | 6.487919 | 0.43x | +| polars_bio_polars_eager | 3.236083 | 3.428796 | 3.331644 | 0.84x | +| polars_bio_polars_lazy | 3.220374 | 3.251365 | 3.232736 | **0.87x** | + ##### 4-8 + | Library | Min (s) | Max (s) | Mean (s) | Speedup | |-------------------------|----------|----------|----------|-----------| -| polars_bio | 3.663420 | 3.810438 | 3.741928 | **1.00x** | -| polars_bio_pandas_lf | 7.970248 | 8.211973 | 8.109422 | 0.46x | -| polars_bio_pandas_pd | 9.233966 | 9.343848 | 9.288391 | 0.40x | -| polars_bio_polars_eager | 7.920726 | 8.045197 | 7.999649 | **0.47x** | -| polars_bio_polars_lazy | 7.851801 | 8.112940 | 7.952556 | **0.47x** | +| polars_bio | 3.677363 | 3.877014 | 3.749576 | **1.00x** | +| polars_bio_pandas_lf | 4.875777 | 5.007774 | 4.953983 | 0.76x | +| polars_bio_pandas_pd | 8.595318 | 8.809947 | 8.704564 | 0.43x | +| polars_bio_polars_eager | 4.473527 | 4.608746 | 4.561838 | **0.82x** | +| polars_bio_polars_lazy | 4.728077 | 4.786690 | 4.758805 | 0.79x | + ##### 7-8 -| Library | Min (s) | Max (s) | Mean (s) | Speedup | -|-------------------------|-----------|-----------|-----------|-----------| -| polars_bio | 3.931723 | 3.957598 | 3.945474 | **1.00x** | -| polars_bio_pandas_lf | 9.887706 | 10.987020 | 10.317384 | 0.38x | -| polars_bio_pandas_pd | 11.386133 | 11.489356 | 11.428774 | 0.35x | -| polars_bio_polars_eager | 9.334948 | 9.350016 | 9.343123 | **0.42x** | -| polars_bio_polars_lazy | 9.801333 | 10.048262 | 9.912648 | 0.40x | +| Library | Min (s) | Max (s) | Mean (s) | Speedup | +|-------------------------|----------|----------|----------|-----------| +| polars_bio | 3.439489 | 3.917193 | 3.633215 | **1.00x** | +| polars_bio_pandas_lf | 3.930340 | 4.079147 | 3.985301 | 0.91x | +| polars_bio_pandas_pd | 9.646125 | 9.994008 | 9.798255 | 0.37x | +| polars_bio_polars_eager | 3.742098 | 3.995767 | 3.832054 | **0.95x** | +| polars_bio_polars_lazy | 3.767904 | 4.058453 | 3.882342 | 0.94x | + + | Source | Peak Memory (MB)) | Factor | |-------------------------|-------------------|----------| diff --git a/polars_bio/__init__.py b/polars_bio/__init__.py index 1e4e678..154a466 100644 --- a/polars_bio/__init__.py +++ b/polars_bio/__init__.py @@ -1,5 +1,3 @@ -import logging - from polars_bio.polars_bio import InputFormat from .context import ctx @@ -8,15 +6,10 @@ from .range_op import FilterOp, nearest, overlap from .range_viz import visualize_intervals -logging.basicConfig() -logging.getLogger().setLevel(logging.WARN) -logger = logging.getLogger("polars_bio") -logger.setLevel(logging.INFO) - POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions" -__version__ = "0.5.3" +__version__ = "0.5.4" __all__ = [ "overlap", "nearest", diff --git a/polars_bio/context.py b/polars_bio/context.py index 1ec74ed..b7472e1 100644 --- a/polars_bio/context.py +++ b/polars_bio/context.py @@ -1,4 +1,9 @@ +import datetime + from polars_bio.polars_bio import BioSessionContext +from polars_bio.range_op_helpers import tmp_cleanup + +from .logging import logger def singleton(cls): @@ -16,10 +21,14 @@ def get_instance(*args, **kwargs): @singleton class Context: def __init__(self): - self.ctx = BioSessionContext() + logger.info("Creating BioSessionContext") + self.ctx = BioSessionContext(seed=str(datetime.datetime.now().timestamp())) self.ctx.set_option("datafusion.execution.target_partitions", "1") self.ctx.set_option("sequila.interval_join_algorithm", "coitrees") + def __del__(self): + tmp_cleanup(self.ctx.seed) + def set_option(self, key, value): self.ctx.set_option(key, value) diff --git a/polars_bio/logging.py b/polars_bio/logging.py new file mode 100644 index 0000000..e6ad878 --- /dev/null +++ b/polars_bio/logging.py @@ -0,0 +1,6 @@ +import logging + +logging.basicConfig() +logging.getLogger().setLevel(logging.WARN) +logger = logging.getLogger("polars_bio") +logger.setLevel(logging.INFO) diff --git a/polars_bio/polars_ext.py b/polars_bio/polars_ext.py index ed2739b..09d986e 100644 --- a/polars_bio/polars_ext.py +++ b/polars_bio/polars_ext.py @@ -2,12 +2,59 @@ import polars as pl +import polars_bio as pb +from polars_bio.polars_bio import FilterOp + @pl.api.register_lazyframe_namespace("pb") class PolarsRangesOperations: def __init__(self, ldf: pl.LazyFrame) -> None: self._ldf = ldf + def overlap( + self, + other_df: pl.LazyFrame, + suffixes: tuple[str, str] = ("_1", "_2"), + how="inner", + overlap_filter=FilterOp.Strict, + cols1=["chrom", "start", "end"], + cols2=["chrom", "start", "end"], + ) -> pl.LazyFrame: + """ + !!! note + Alias for [overlap](api.md#polars_bio.overlap) + """ + return pb.overlap( + self._ldf, + other_df, + how=how, + overlap_filter=overlap_filter, + suffixes=suffixes, + cols1=cols1, + cols2=cols2, + ) + + def nearest( + self, + other_df: pl.LazyFrame, + suffixes: tuple[str, str] = ("_1", "_2"), + overlap_filter=FilterOp.Strict, + cols1=["chrom", "start", "end"], + cols2=["chrom", "start", "end"], + ) -> pl.LazyFrame: + """ + !!! note + Alias for [nearest](api.md#polars_bio.nearest) + """ + return pb.nearest( + self._ldf, + other_df, + overlap_filter=overlap_filter, + suffixes=suffixes, + cols1=cols1, + cols2=cols2, + ) + def sort( self, cols: Union[tuple[str], None] = ["chrom", "start", "end"] ) -> pl.LazyFrame: diff --git a/polars_bio/range_op_helpers.py b/polars_bio/range_op_helpers.py index 4c5539b..04e7311 100644 --- a/polars_bio/range_op_helpers.py +++ b/polars_bio/range_op_helpers.py @@ -12,6 +12,7 @@ stream_range_operation_scan, ) +from .logging import logger from .range_op_io import _df_to_arrow, _get_schema, _rename_columns, range_lazy_scan @@ -117,3 +118,11 @@ def _validate_overlap_input(col1, col2, on_cols, suffixes, output_type, how): def stream_wrapper(pyldf): return pl.LazyFrame._from_pyldf(pyldf) + + +def tmp_cleanup(seed): + # remove s1, s2 temp parquet files + logger.info(f"Cleaning up temp files for seed: '{seed}'") + for f in ["s1", "s2"]: + path = Path(f"{f}-{seed}.parquet") + path.unlink(missing_ok=True) diff --git a/polars_bio/range_op_io.py b/polars_bio/range_op_io.py index 5420b0f..0c29c85 100644 --- a/polars_bio/range_op_io.py +++ b/polars_bio/range_op_io.py @@ -71,10 +71,11 @@ def _rename_columns_pl(df: pl.DataFrame, suffix: str) -> pl.DataFrame: def _rename_columns( - df: Union[pl.DataFrame, pd.DataFrame], suffix: str + df: Union[pl.DataFrame, pd.DataFrame, pl.LazyFrame], suffix: str ) -> Union[pl.DataFrame, pd.DataFrame]: if isinstance(df, pl.DataFrame) or isinstance(df, pl.LazyFrame): - df = pl.DataFrame(schema=df.schema) + schema = df.collect_schema() if isinstance(df, pl.LazyFrame) else df.schema + df = pl.DataFrame(schema=schema) return _rename_columns_pl(df, suffix) elif isinstance(df, pd.DataFrame): df = pl.from_pandas(pd.DataFrame(columns=df.columns)) diff --git a/pyproject.toml b/pyproject.toml index 54b3947..dffb854 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "polars-bio" -version = "0.5.3" +version = "0.5.4" description = "Blazing fast genomic operations on large Python dataframes" authors = [] requires-python = ">=3.9" diff --git a/src/context.rs b/src/context.rs index 8d6dc4f..19c8030 100644 --- a/src/context.rs +++ b/src/context.rs @@ -13,19 +13,22 @@ use sequila_core::session_context::SequilaConfig; pub struct PyBioSessionContext { pub ctx: ExonSession, pub session_config: HashMap, + #[pyo3(get, set)] + pub seed: String, } #[pymethods] impl PyBioSessionContext { - #[pyo3(signature = ())] + #[pyo3(signature = (seed))] #[new] - pub fn new() -> PyResult { + pub fn new(seed: String) -> PyResult { let ctx = create_context().unwrap(); let session_config: HashMap = HashMap::new(); Ok(PyBioSessionContext { ctx, session_config, + seed, }) } #[pyo3(signature = (key, value, temporary=Some(false)))] diff --git a/src/lib.rs b/src/lib.rs index 161dc70..9509783 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,8 +41,8 @@ fn range_operation_frame( #[allow(clippy::useless_conversion)] let rt = Runtime::new().unwrap(); let ctx = &py_ctx.ctx; - register_frame(ctx, df1, LEFT_TABLE.to_string()); - register_frame(ctx, df2, RIGHT_TABLE.to_string()); + register_frame(py_ctx, df1, LEFT_TABLE.to_string()); + register_frame(py_ctx, df2, RIGHT_TABLE.to_string()); Ok(PyDataFrame::new(do_range_operation( ctx, &rt, diff --git a/src/scan.rs b/src/scan.rs index 30143ee..0786805 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -4,14 +4,18 @@ use arrow::array::RecordBatch; use arrow::error::ArrowError; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::pyarrow::PyArrowType; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use exon::ExonSession; +use crate::context::PyBioSessionContext; use crate::option::InputFormat; +const MAX_IN_MEMORY_ROWS: usize = 1024 * 1024; + pub(crate) fn register_frame( - ctx: &ExonSession, + py_ctx: &PyBioSessionContext, df: PyArrowType, table_name: String, ) { @@ -19,11 +23,30 @@ pub(crate) fn register_frame( df.0.collect::, ArrowError>>() .unwrap(); let schema = batches[0].schema(); - let table = MemTable::try_new(schema, vec![batches]).unwrap(); + let ctx = &py_ctx.ctx; + let rt = tokio::runtime::Runtime::new().unwrap(); + let table_source = MemTable::try_new(schema, vec![batches]).unwrap(); ctx.session.deregister_table(&table_name).unwrap(); ctx.session - .register_table(&table_name, Arc::new(table)) + .register_table(&table_name, Arc::new(table_source)) + .unwrap(); + let df = rt + .block_on(ctx.sql(&format!("SELECT * FROM {}", table_name))) .unwrap(); + let table_size = rt.block_on(df.clone().count()).unwrap(); + if table_size > MAX_IN_MEMORY_ROWS { + let path = format!("{}-{}.parquet", table_name, py_ctx.seed); + ctx.session.deregister_table(&table_name).unwrap(); + rt.block_on(df.write_parquet(&path, DataFrameWriteOptions::new(), None)) + .unwrap(); + ctx.session.deregister_table(&table_name).unwrap(); + rt.block_on(register_table( + ctx, + &path, + &table_name, + InputFormat::Parquet, + )); + } } pub(crate) fn get_input_format(path: &str) -> InputFormat { diff --git a/tests/test_polars_ext.py b/tests/test_polars_ext.py index 6bdf377..11deaf2 100644 --- a/tests/test_polars_ext.py +++ b/tests/test_polars_ext.py @@ -1,4 +1,6 @@ import bioframe as bf +import pandas as pd +import polars as pl from _expected import DATA_DIR import polars_bio as pb @@ -28,3 +30,71 @@ def test_expand_scale(self): df_2 = bf.expand(df_1.to_pandas(), scale=1.5) df_3 = df_1.lazy().pb.expand(scale=1.5).collect().to_pandas() assert df_2.equals(df_3) + + def test_overlap(self): + cols = ("chrom", "start", "end") + df_1 = ( + pb.read_table(self.file, schema="bed9") + .select(cols) + .collect() + .to_pandas() + .reset_index(drop=True) + ) + df_2 = ( + pb.read_table(self.file, schema="bed9") + .select(cols) + .collect() + .to_pandas() + .reset_index(drop=True) + ) + df_3 = ( + bf.overlap(df_1, df_2, suffixes=("_1", "_2")) + .sort_values(by=["chrom_1", "start_1", "end_1"]) + .reset_index(drop=True) + ) + # + df_4 = ( + pl.DataFrame(df_1) + .lazy() + .pb.overlap(pl.DataFrame(df_2).lazy(), suffixes=("_1", "_2")) + .collect() + .to_pandas() + .sort_values(by=["chrom_1", "start_1", "end_1"]) + .reset_index(drop=True) + ) + assert df_3.equals(df_4) + + def test_nearest(self): + cols = ("chrom", "start", "end") + df_1 = ( + pb.read_table(self.file, schema="bed9") + .select(cols) + .collect() + .to_pandas() + .reset_index(drop=True) + ) + df_2 = ( + pb.read_table(self.file, schema="bed9") + .select(cols) + .collect() + .to_pandas() + .reset_index(drop=True) + ) + df_3 = ( + bf.closest(df_1, df_2, suffixes=("_1", "_2")) + .sort_values(by=["chrom_1", "start_1", "end_1"]) + .reset_index(drop=True) + ) + # + df_4 = ( + pl.DataFrame(df_1) + .lazy() + .pb.nearest(pl.DataFrame(df_2).lazy(), suffixes=("_1", "_2")) + .collect() + .to_pandas() + .sort_values(by=["chrom_1", "start_1", "end_1"]) + .reset_index(drop=True) + ) + print(df_3.columns) + print(df_4.columns) + pd.testing.assert_frame_equal(df_3, df_4, check_dtype=False)